All checks were successful
continuous-integration/drone/push Build is passing
Implement Python-based generator that creates static HTML index pages for browsing S3 bucket contents. The generator produces nginx-style directory listings with hierarchical navigation.
437 lines
14 KiB
Python
437 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
S3 Static Page Generator for happyDomain
|
|
|
|
Generates static HTML index pages for browsing an S3 bucket.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import shutil
|
|
from datetime import datetime
|
|
from typing import List, Dict, Tuple, Optional
|
|
from pathlib import Path
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError, NoCredentialsError
|
|
from jinja2 import Environment, FileSystemLoader, TemplateNotFound
|
|
from dateutil import parser as date_parser
|
|
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=os.getenv('LOG_LEVEL', 'INFO'),
|
|
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
|
)
|
|
logger = logging.getLogger('s3-generator')
|
|
|
|
|
|
class S3Client:
|
|
"""Client for interacting with S3-compatible storage."""
|
|
|
|
def __init__(self, endpoint_url: str, region_name: str,
|
|
aws_access_key_id: str, aws_secret_access_key: str):
|
|
"""Initialize S3 client with custom endpoint."""
|
|
self.endpoint_url = endpoint_url
|
|
self.region_name = region_name
|
|
|
|
try:
|
|
self.client = boto3.client(
|
|
's3',
|
|
endpoint_url=endpoint_url,
|
|
region_name=region_name,
|
|
aws_access_key_id=aws_access_key_id,
|
|
aws_secret_access_key=aws_secret_access_key
|
|
)
|
|
logger.debug(f"S3 client initialized: {endpoint_url}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize S3 client: {e}")
|
|
raise
|
|
|
|
def validate_connection(self, bucket: str) -> bool:
|
|
"""Test S3 connectivity by attempting to access the bucket."""
|
|
try:
|
|
self.client.head_bucket(Bucket=bucket)
|
|
logger.info(f"Successfully connected to bucket: {bucket}")
|
|
return True
|
|
except ClientError as e:
|
|
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
|
|
logger.error(f"Failed to access bucket {bucket}: {error_code}")
|
|
return False
|
|
except NoCredentialsError:
|
|
logger.error("No AWS credentials found")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error validating connection: {e}")
|
|
return False
|
|
|
|
def list_all_objects(self, bucket: str) -> List[Dict]:
|
|
"""Fetch all objects from bucket with pagination support."""
|
|
objects = []
|
|
continuation_token = None
|
|
|
|
try:
|
|
while True:
|
|
params = {'Bucket': bucket}
|
|
if continuation_token:
|
|
params['ContinuationToken'] = continuation_token
|
|
|
|
logger.debug(f"Fetching objects (token: {continuation_token})")
|
|
response = self.client.list_objects_v2(**params)
|
|
|
|
if 'Contents' in response:
|
|
objects.extend(response['Contents'])
|
|
logger.debug(f"Fetched {len(response['Contents'])} objects")
|
|
|
|
if not response.get('IsTruncated', False):
|
|
break
|
|
|
|
continuation_token = response.get('NextContinuationToken')
|
|
|
|
logger.info(f"Total objects fetched: {len(objects)}")
|
|
return objects
|
|
|
|
except ClientError as e:
|
|
logger.error(f"Error listing objects: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error listing objects: {e}")
|
|
raise
|
|
|
|
|
|
class DirectoryTree:
|
|
"""Build and manage directory tree structure from S3 objects."""
|
|
|
|
def __init__(self):
|
|
self.tree = {}
|
|
self.files = []
|
|
|
|
def build_tree(self, s3_objects: List[Dict]) -> None:
|
|
"""Parse S3 object keys into directory tree structure."""
|
|
logger.info("Building directory tree...")
|
|
|
|
for obj in s3_objects:
|
|
key = obj['Key']
|
|
parts = key.split('/')
|
|
|
|
# Skip keys ending with / (directory markers)
|
|
if key.endswith('/'):
|
|
logger.debug(f"Skipping directory marker: {key}")
|
|
continue
|
|
|
|
# Handle files at root
|
|
if len(parts) == 1:
|
|
self.files.append({
|
|
'path': '/',
|
|
'name': parts[0],
|
|
'size': obj.get('Size', 0),
|
|
'last_modified': obj.get('LastModified', datetime.now()),
|
|
})
|
|
continue
|
|
|
|
# Build nested directory structure
|
|
current = self.tree
|
|
path_parts = []
|
|
for part in parts[:-1]:
|
|
path_parts.append(part)
|
|
if part not in current:
|
|
current[part] = {}
|
|
current = current[part]
|
|
|
|
# Add file to its directory
|
|
file_path = '/' + '/'.join(path_parts)
|
|
self.files.append({
|
|
'path': file_path,
|
|
'name': parts[-1],
|
|
'size': obj.get('Size', 0),
|
|
'last_modified': obj.get('LastModified', datetime.now()),
|
|
})
|
|
|
|
logger.info(f"Tree built with {len(self.files)} files")
|
|
|
|
def get_all_paths(self) -> List[str]:
|
|
"""Return list of all unique directory paths."""
|
|
paths = ['/']
|
|
|
|
def traverse(node: Dict, current_path: str):
|
|
for dirname in node.keys():
|
|
new_path = f"{current_path}{dirname}/" if current_path == '/' else f"{current_path}/{dirname}/"
|
|
paths.append(new_path)
|
|
traverse(node[dirname], new_path)
|
|
|
|
traverse(self.tree, '/')
|
|
return paths
|
|
|
|
def get_directory_listing(self, path: str) -> Tuple[List[Dict], List[Dict]]:
|
|
"""Get subdirectories and files for a given path."""
|
|
# Normalize path
|
|
path = path.rstrip('/')
|
|
if not path:
|
|
path = '/'
|
|
|
|
# Find subdirectories
|
|
if path == '/':
|
|
subdirs = list(self.tree.keys())
|
|
else:
|
|
parts = path.strip('/').split('/')
|
|
current = self.tree
|
|
for part in parts:
|
|
if part in current:
|
|
current = current[part]
|
|
else:
|
|
current = {}
|
|
break
|
|
subdirs = list(current.keys())
|
|
|
|
# Find files in this directory
|
|
dir_files = [f for f in self.files if f['path'] == path]
|
|
|
|
# Calculate last modified for directories (most recent file)
|
|
dir_metadata = []
|
|
for dirname in sorted(subdirs, key=str.lower):
|
|
dir_path = f"{path}/{dirname}" if path != '/' else f"/{dirname}"
|
|
files_in_dir = [f for f in self.files if f['path'].startswith(dir_path)]
|
|
|
|
if files_in_dir:
|
|
last_mod = max([f['last_modified'] for f in files_in_dir])
|
|
else:
|
|
last_mod = datetime.now()
|
|
|
|
dir_metadata.append({
|
|
'name': dirname,
|
|
'last_modified': last_mod
|
|
})
|
|
|
|
# Format file metadata
|
|
file_metadata = []
|
|
for file in sorted(dir_files, key=lambda x: x['name'].lower()):
|
|
file_url = f"{path}/{file['name']}" if path != '/' else f"/{file['name']}"
|
|
file_metadata.append({
|
|
'name': file['name'],
|
|
'size': file['size'],
|
|
'last_modified': file['last_modified'],
|
|
'url': file_url
|
|
})
|
|
|
|
return dir_metadata, file_metadata
|
|
|
|
|
|
class HTMLGenerator:
|
|
"""Generate HTML pages from directory listings."""
|
|
|
|
def __init__(self, template_path: str):
|
|
"""Initialize with Jinja2 template."""
|
|
try:
|
|
template_dir = os.path.dirname(template_path)
|
|
template_name = os.path.basename(template_path)
|
|
|
|
env = Environment(loader=FileSystemLoader(template_dir))
|
|
self.template = env.get_template(template_name)
|
|
logger.debug(f"Template loaded: {template_path}")
|
|
except TemplateNotFound:
|
|
logger.error(f"Template not found: {template_path}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error loading template: {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def format_size(size_bytes: int) -> str:
|
|
"""Convert bytes to human-readable format."""
|
|
if size_bytes == 0:
|
|
return "0"
|
|
|
|
units = ['', 'K', 'M', 'G', 'T', 'P']
|
|
size = float(size_bytes)
|
|
unit_index = 0
|
|
|
|
while size >= 1024 and unit_index < len(units) - 1:
|
|
size /= 1024
|
|
unit_index += 1
|
|
|
|
if unit_index == 0:
|
|
return str(int(size))
|
|
else:
|
|
return f"{size:.1f}{units[unit_index]}"
|
|
|
|
@staticmethod
|
|
def format_date(dt: datetime) -> str:
|
|
"""Format datetime as 'Jan 6, 2026 16:34'."""
|
|
return dt.strftime('%b %-d, %Y %H:%M')
|
|
|
|
def generate_page(self, current_path: str, directories: List[Dict],
|
|
files: List[Dict]) -> str:
|
|
"""Render HTML page for a directory."""
|
|
# Normalize path for display
|
|
display_path = current_path if current_path != '/' else '/'
|
|
|
|
# Determine if parent link should be shown
|
|
show_parent = current_path != '/'
|
|
|
|
# Format directories
|
|
formatted_dirs = []
|
|
for d in directories:
|
|
formatted_dirs.append({
|
|
'name': d['name'],
|
|
'last_modified': self.format_date(d['last_modified'])
|
|
})
|
|
|
|
# Format files
|
|
formatted_files = []
|
|
for f in files:
|
|
formatted_files.append({
|
|
'name': f['name'],
|
|
'url': f['url'],
|
|
'last_modified': self.format_date(f['last_modified']),
|
|
'size': self.format_size(f['size'])
|
|
})
|
|
|
|
# Render template
|
|
try:
|
|
html = self.template.render(
|
|
current_path=display_path,
|
|
parent_link=show_parent,
|
|
directories=formatted_dirs,
|
|
files=formatted_files
|
|
)
|
|
return html
|
|
except Exception as e:
|
|
logger.error(f"Error rendering template for {current_path}: {e}")
|
|
raise
|
|
|
|
|
|
def load_config() -> Dict[str, str]:
|
|
"""Load configuration from environment variables."""
|
|
config = {
|
|
'endpoint': os.getenv('S3_ENDPOINT_URL'),
|
|
'bucket': os.getenv('S3_BUCKET'),
|
|
'region': os.getenv('S3_REGION', 'us-east-1'),
|
|
'access_key': os.getenv('AWS_ACCESS_KEY_ID') or os.getenv('S3_ACCESS_KEY'),
|
|
'secret_key': os.getenv('AWS_SECRET_ACCESS_KEY') or os.getenv('S3_SECRET_KEY'),
|
|
}
|
|
|
|
# Validate required config
|
|
required = ['endpoint', 'bucket', 'access_key', 'secret_key']
|
|
missing = [k for k in required if not config.get(k)]
|
|
|
|
if missing:
|
|
logger.error(f"Missing required environment variables: {', '.join(missing)}")
|
|
logger.error("Required: S3_ENDPOINT_URL, S3_BUCKET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
|
|
sys.exit(1)
|
|
|
|
return config
|
|
|
|
|
|
def cleanup_output_dir(output_dir: str) -> None:
|
|
"""Clean output directory before generation."""
|
|
if os.path.exists(output_dir):
|
|
logger.info(f"Cleaning output directory: {output_dir}")
|
|
shutil.rmtree(output_dir)
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
logger.debug(f"Output directory created: {output_dir}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the S3 static page generator."""
|
|
logger.info("=" * 60)
|
|
logger.info("S3 Static Page Generator for happyDomain")
|
|
logger.info("=" * 60)
|
|
|
|
# 1. Load configuration
|
|
config = load_config()
|
|
logger.info(f"Configuration loaded:")
|
|
logger.info(f" Endpoint: {config['endpoint']}")
|
|
logger.info(f" Bucket: {config['bucket']}")
|
|
logger.info(f" Region: {config['region']}")
|
|
|
|
# 2. Initialize S3 client
|
|
try:
|
|
s3_client = S3Client(
|
|
endpoint_url=config['endpoint'],
|
|
region_name=config['region'],
|
|
aws_access_key_id=config['access_key'],
|
|
aws_secret_access_key=config['secret_key']
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize S3 client: {e}")
|
|
sys.exit(1)
|
|
|
|
# 3. Validate connection
|
|
if not s3_client.validate_connection(config['bucket']):
|
|
logger.error("Failed to connect to S3 bucket")
|
|
sys.exit(1)
|
|
|
|
# 4. List all objects
|
|
logger.info("Fetching object list from S3...")
|
|
try:
|
|
objects = s3_client.list_all_objects(config['bucket'])
|
|
except Exception as e:
|
|
logger.error(f"Failed to list objects: {e}")
|
|
sys.exit(1)
|
|
|
|
logger.info(f"Found {len(objects)} objects in bucket")
|
|
|
|
# 5. Build directory tree
|
|
tree = DirectoryTree()
|
|
tree.build_tree(objects)
|
|
all_paths = tree.get_all_paths()
|
|
logger.info(f"Identified {len(all_paths)} unique directories")
|
|
|
|
# 6. Initialize HTML generator
|
|
template_path = 'templates/index_template.html'
|
|
try:
|
|
html_gen = HTMLGenerator(template_path)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize HTML generator: {e}")
|
|
sys.exit(1)
|
|
|
|
# 7. Clean output directory
|
|
cleanup_output_dir('output')
|
|
|
|
# 8. Generate index.html for each directory
|
|
logger.info("Generating HTML pages...")
|
|
for i, path in enumerate(sorted(all_paths), 1):
|
|
logger.info(f"[{i}/{len(all_paths)}] Generating {path}index.html")
|
|
|
|
try:
|
|
dirs, files = tree.get_directory_listing(path)
|
|
html_content = html_gen.generate_page(
|
|
current_path=path,
|
|
directories=dirs,
|
|
files=files
|
|
)
|
|
|
|
# Write to output directory
|
|
if path == '/':
|
|
output_path = 'output/index.html'
|
|
else:
|
|
output_path = os.path.join('output', path.strip('/'), 'index.html')
|
|
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(html_content)
|
|
|
|
logger.debug(f"Written: {output_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating page for {path}: {e}")
|
|
sys.exit(1)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Generation complete!")
|
|
logger.info(f"Output directory: {os.path.abspath('output')}")
|
|
logger.info("=" * 60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}", exc_info=True)
|
|
sys.exit(1)
|