get-sources/generator.py
Pierre-Olivier Mercier 26383b76cb
All checks were successful
continuous-integration/drone/push Build is passing
Filter out index.html files from directory listings
Exclude generated index.html files when building the directory tree to prevent them from appearing in the listings. Since the generator deploys index.html files to the same bucket, they would otherwise show up as regular files in subsequent runs.
2026-01-06 19:01:58 +07:00

442 lines
14 KiB
Python

#!/usr/bin/env python3
"""
S3 Static Page Generator for happyDomain
Generates static HTML index pages for browsing an S3 bucket.
"""
import os
import sys
import logging
import shutil
from datetime import datetime
from typing import List, Dict, Tuple, Optional
from pathlib import Path
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from jinja2 import Environment, FileSystemLoader, TemplateNotFound
from dateutil import parser as date_parser
# Configure logging
logging.basicConfig(
level=os.getenv('LOG_LEVEL', 'INFO'),
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger('s3-generator')
class S3Client:
"""Client for interacting with S3-compatible storage."""
def __init__(self, endpoint_url: str, region_name: str,
aws_access_key_id: str, aws_secret_access_key: str):
"""Initialize S3 client with custom endpoint."""
self.endpoint_url = endpoint_url
self.region_name = region_name
try:
self.client = boto3.client(
's3',
endpoint_url=endpoint_url,
region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
logger.debug(f"S3 client initialized: {endpoint_url}")
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
raise
def validate_connection(self, bucket: str) -> bool:
"""Test S3 connectivity by attempting to access the bucket."""
try:
self.client.head_bucket(Bucket=bucket)
logger.info(f"Successfully connected to bucket: {bucket}")
return True
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
logger.error(f"Failed to access bucket {bucket}: {error_code}")
return False
except NoCredentialsError:
logger.error("No AWS credentials found")
return False
except Exception as e:
logger.error(f"Unexpected error validating connection: {e}")
return False
def list_all_objects(self, bucket: str) -> List[Dict]:
"""Fetch all objects from bucket with pagination support."""
objects = []
continuation_token = None
try:
while True:
params = {'Bucket': bucket}
if continuation_token:
params['ContinuationToken'] = continuation_token
logger.debug(f"Fetching objects (token: {continuation_token})")
response = self.client.list_objects_v2(**params)
if 'Contents' in response:
objects.extend(response['Contents'])
logger.debug(f"Fetched {len(response['Contents'])} objects")
if not response.get('IsTruncated', False):
break
continuation_token = response.get('NextContinuationToken')
logger.info(f"Total objects fetched: {len(objects)}")
return objects
except ClientError as e:
logger.error(f"Error listing objects: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error listing objects: {e}")
raise
class DirectoryTree:
"""Build and manage directory tree structure from S3 objects."""
def __init__(self):
self.tree = {}
self.files = []
def build_tree(self, s3_objects: List[Dict]) -> None:
"""Parse S3 object keys into directory tree structure."""
logger.info("Building directory tree...")
for obj in s3_objects:
key = obj['Key']
parts = key.split('/')
# Skip keys ending with / (directory markers)
if key.endswith('/'):
logger.debug(f"Skipping directory marker: {key}")
continue
# Skip generated index.html files
if parts[-1] == 'index.html':
logger.debug(f"Skipping generated index file: {key}")
continue
# Handle files at root
if len(parts) == 1:
self.files.append({
'path': '/',
'name': parts[0],
'size': obj.get('Size', 0),
'last_modified': obj.get('LastModified', datetime.now()),
})
continue
# Build nested directory structure
current = self.tree
path_parts = []
for part in parts[:-1]:
path_parts.append(part)
if part not in current:
current[part] = {}
current = current[part]
# Add file to its directory
file_path = '/' + '/'.join(path_parts)
self.files.append({
'path': file_path,
'name': parts[-1],
'size': obj.get('Size', 0),
'last_modified': obj.get('LastModified', datetime.now()),
})
logger.info(f"Tree built with {len(self.files)} files")
def get_all_paths(self) -> List[str]:
"""Return list of all unique directory paths."""
paths = ['/']
def traverse(node: Dict, current_path: str):
for dirname in node.keys():
new_path = f"{current_path}{dirname}/" if current_path == '/' else f"{current_path}/{dirname}/"
paths.append(new_path)
traverse(node[dirname], new_path)
traverse(self.tree, '/')
return paths
def get_directory_listing(self, path: str) -> Tuple[List[Dict], List[Dict]]:
"""Get subdirectories and files for a given path."""
# Normalize path
path = path.rstrip('/')
if not path:
path = '/'
# Find subdirectories
if path == '/':
subdirs = list(self.tree.keys())
else:
parts = path.strip('/').split('/')
current = self.tree
for part in parts:
if part in current:
current = current[part]
else:
current = {}
break
subdirs = list(current.keys())
# Find files in this directory
dir_files = [f for f in self.files if f['path'] == path]
# Calculate last modified for directories (most recent file)
dir_metadata = []
for dirname in sorted(subdirs, key=str.lower):
dir_path = f"{path}/{dirname}" if path != '/' else f"/{dirname}"
files_in_dir = [f for f in self.files if f['path'].startswith(dir_path)]
if files_in_dir:
last_mod = max([f['last_modified'] for f in files_in_dir])
else:
last_mod = datetime.now()
dir_metadata.append({
'name': dirname,
'last_modified': last_mod
})
# Format file metadata
file_metadata = []
for file in sorted(dir_files, key=lambda x: x['name'].lower()):
file_url = f"{path}/{file['name']}" if path != '/' else f"/{file['name']}"
file_metadata.append({
'name': file['name'],
'size': file['size'],
'last_modified': file['last_modified'],
'url': file_url
})
return dir_metadata, file_metadata
class HTMLGenerator:
"""Generate HTML pages from directory listings."""
def __init__(self, template_path: str):
"""Initialize with Jinja2 template."""
try:
template_dir = os.path.dirname(template_path)
template_name = os.path.basename(template_path)
env = Environment(loader=FileSystemLoader(template_dir))
self.template = env.get_template(template_name)
logger.debug(f"Template loaded: {template_path}")
except TemplateNotFound:
logger.error(f"Template not found: {template_path}")
raise
except Exception as e:
logger.error(f"Error loading template: {e}")
raise
@staticmethod
def format_size(size_bytes: int) -> str:
"""Convert bytes to human-readable format."""
if size_bytes == 0:
return "0"
units = ['', 'K', 'M', 'G', 'T', 'P']
size = float(size_bytes)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index == 0:
return str(int(size))
else:
return f"{size:.1f}{units[unit_index]}"
@staticmethod
def format_date(dt: datetime) -> str:
"""Format datetime as 'Jan 6, 2026 16:34'."""
return dt.strftime('%b %-d, %Y %H:%M')
def generate_page(self, current_path: str, directories: List[Dict],
files: List[Dict]) -> str:
"""Render HTML page for a directory."""
# Normalize path for display
display_path = current_path if current_path != '/' else '/'
# Determine if parent link should be shown
show_parent = current_path != '/'
# Format directories
formatted_dirs = []
for d in directories:
formatted_dirs.append({
'name': d['name'],
'last_modified': self.format_date(d['last_modified'])
})
# Format files
formatted_files = []
for f in files:
formatted_files.append({
'name': f['name'],
'url': f['url'],
'last_modified': self.format_date(f['last_modified']),
'size': self.format_size(f['size'])
})
# Render template
try:
html = self.template.render(
current_path=display_path,
parent_link=show_parent,
directories=formatted_dirs,
files=formatted_files
)
return html
except Exception as e:
logger.error(f"Error rendering template for {current_path}: {e}")
raise
def load_config() -> Dict[str, str]:
"""Load configuration from environment variables."""
config = {
'endpoint': os.getenv('S3_ENDPOINT_URL'),
'bucket': os.getenv('S3_BUCKET'),
'region': os.getenv('S3_REGION', 'us-east-1'),
'access_key': os.getenv('AWS_ACCESS_KEY_ID') or os.getenv('S3_ACCESS_KEY'),
'secret_key': os.getenv('AWS_SECRET_ACCESS_KEY') or os.getenv('S3_SECRET_KEY'),
}
# Validate required config
required = ['endpoint', 'bucket', 'access_key', 'secret_key']
missing = [k for k in required if not config.get(k)]
if missing:
logger.error(f"Missing required environment variables: {', '.join(missing)}")
logger.error("Required: S3_ENDPOINT_URL, S3_BUCKET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
sys.exit(1)
return config
def cleanup_output_dir(output_dir: str) -> None:
"""Clean output directory before generation."""
if os.path.exists(output_dir):
logger.info(f"Cleaning output directory: {output_dir}")
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
logger.debug(f"Output directory created: {output_dir}")
def main():
"""Main entry point for the S3 static page generator."""
logger.info("=" * 60)
logger.info("S3 Static Page Generator for happyDomain")
logger.info("=" * 60)
# 1. Load configuration
config = load_config()
logger.info(f"Configuration loaded:")
logger.info(f" Endpoint: {config['endpoint']}")
logger.info(f" Bucket: {config['bucket']}")
logger.info(f" Region: {config['region']}")
# 2. Initialize S3 client
try:
s3_client = S3Client(
endpoint_url=config['endpoint'],
region_name=config['region'],
aws_access_key_id=config['access_key'],
aws_secret_access_key=config['secret_key']
)
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
sys.exit(1)
# 3. Validate connection
if not s3_client.validate_connection(config['bucket']):
logger.error("Failed to connect to S3 bucket")
sys.exit(1)
# 4. List all objects
logger.info("Fetching object list from S3...")
try:
objects = s3_client.list_all_objects(config['bucket'])
except Exception as e:
logger.error(f"Failed to list objects: {e}")
sys.exit(1)
logger.info(f"Found {len(objects)} objects in bucket")
# 5. Build directory tree
tree = DirectoryTree()
tree.build_tree(objects)
all_paths = tree.get_all_paths()
logger.info(f"Identified {len(all_paths)} unique directories")
# 6. Initialize HTML generator
template_path = 'templates/index_template.html'
try:
html_gen = HTMLGenerator(template_path)
except Exception as e:
logger.error(f"Failed to initialize HTML generator: {e}")
sys.exit(1)
# 7. Clean output directory
cleanup_output_dir('output')
# 8. Generate index.html for each directory
logger.info("Generating HTML pages...")
for i, path in enumerate(sorted(all_paths), 1):
logger.info(f"[{i}/{len(all_paths)}] Generating {path}index.html")
try:
dirs, files = tree.get_directory_listing(path)
html_content = html_gen.generate_page(
current_path=path,
directories=dirs,
files=files
)
# Write to output directory
if path == '/':
output_path = 'output/index.html'
else:
output_path = os.path.join('output', path.strip('/'), 'index.html')
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
logger.debug(f"Written: {output_path}")
except Exception as e:
logger.error(f"Error generating page for {path}: {e}")
sys.exit(1)
logger.info("=" * 60)
logger.info("Generation complete!")
logger.info(f"Output directory: {os.path.abspath('output')}")
logger.info("=" * 60)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logger.info("Interrupted by user")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)