#!/usr/bin/env python3
"""
EPUB Converter - Compiles translated HTML files into EPUB format
Supports extraction of translated titles from chapter content
"""
import os
import sys
import io
import json
import mimetypes
import re
import zipfile
import unicodedata
import html as html_module
from xml.etree import ElementTree as ET
from typing import Dict, List, Tuple, Optional, Callable
from ebooklib import epub, ITEM_DOCUMENT
from bs4 import BeautifulSoup
from metadata_batch_translator import enhance_epub_compiler
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from unified_api_client import UnifiedClient
except ImportError:
UnifiedClient = None
try:
from translate_headers_standalone import run_translation as run_standalone_header_translation
except ImportError:
run_standalone_header_translation = None
# Configure stdout for UTF-8
def configure_utf8_output():
"""Configure stdout for UTF-8 encoding"""
try:
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='ignore')
except AttributeError:
if sys.stdout is None:
devnull = open(os.devnull, "wb")
sys.stdout = io.TextIOWrapper(devnull, encoding='utf-8', errors='ignore')
elif hasattr(sys.stdout, 'buffer'):
try:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='ignore')
except:
pass
# Global configuration
configure_utf8_output()
_global_log_callback = None
_stop_flag = False
def set_stop_flag(value: bool):
"""Set the stop flag for EPUB converter"""
global _stop_flag
_stop_flag = value
def is_stop_requested() -> bool:
"""Check if stop has been requested"""
global _stop_flag
return _stop_flag
def set_global_log_callback(callback: Optional[Callable]):
"""Set the global log callback for module-level functions"""
global _global_log_callback
_global_log_callback = callback
def log(message: str):
"""Module-level logging that works with or without callback"""
if _global_log_callback:
_global_log_callback(message)
else:
print(message)
class HTMLEntityDecoder:
"""Handles comprehensive HTML entity decoding with full Unicode support"""
# Comprehensive entity replacement dictionary
ENTITY_MAP = {
# Quotation marks and apostrophes
'"': '"', '"': '"',
''': "'", ''': "'",
'‘': '\u2018', '’': '\u2019',
'“': '\u201c', '”': '\u201d',
'‚': '‚', '„': '„',
'‹': '‹', '›': '›',
'«': '«', '»': '»',
# Spaces and dashes
' ': ' ', ' ': ' ',
' ': ' ', ' ': ' ',
' ': ' ', '': '\u200c',
'': '\u200d', '': '\u200e',
'': '\u200f',
'–': '–', '—': '—',
'−': '−', '‐': '‐',
# Common symbols
'…': '…', '…': '…',
'•': '•', '•': '•',
'·': '·', '·': '·',
'§': '§', '¶': '¶',
'†': '†', '‡': '‡',
'◊': '◊', '♦': '♦',
'♣': '♣', '♥': '♥',
'♠': '♠',
# Currency symbols
'¢': '¢', '£': '£',
'¥': '¥', '€': '€',
'¤': '¤',
# Mathematical symbols
'±': '±', '×': '×',
'÷': '÷', '⁄': '⁄',
'‰': '‰', '‱': '‱',
'′': '\u2032', '″': '\u2033',
'∞': '∞', '∅': '∅',
'∇': '∇', '&partial;': '∂',
'∑': '∑', '∏': '∏',
'∫': '∫', '√': '√',
'≈': '≈', '≠': '≠',
'≡': '≡', '≤': '≤',
'≥': '≥', '⊂': '⊂',
'⊃': '⊃', '⊄': '⊄',
'⊆': '⊆', '⊇': '⊇',
# Intellectual property
'©': '©', '©': '©',
'®': '®', '®': '®',
'™': '™', '™': '™',
}
# Common encoding fixes
ENCODING_FIXES = {
# UTF-8 decoded as Latin-1
'’': "'", 'â€Å"': '"', '�': '"',
'–': '–', 'â€â€': '—',
' ': ' ', 'ÂÂ': '',
'â': 'â', 'é': 'é', 'è': 'è',
'ä': 'ä', 'ö': 'ö', 'ü': 'ü',
'ñ': 'ñ', 'ç': 'ç',
# Common mojibake patterns
'’': "'", '“': '"', 'â€': '"',
'â€"': '—', 'â€"': '–',
'…': '…', '•': '•',
'â„¢': '™', '©': '©', '®': '®',
# Windows-1252 interpreted as UTF-8
'‘': '\u2018', '’': '\u2019',
'“': '\u201c', 'â€': '\u201d',
'•': '•', 'â€"': '–', 'â€"': '—',
}
@classmethod
def decode(cls, text: str) -> str:
"""Comprehensive HTML entity decoding - PRESERVES UNICODE"""
if text is None:
return ""
if not isinstance(text, str):
text = str(text)
if not text:
return text
# Fix common encoding issues first
for bad, good in cls.ENCODING_FIXES.items():
text = text.replace(bad, good)
# Multiple passes to handle nested/double-encoded entities
max_passes = 3
for _ in range(max_passes):
prev_text = text
# Use html module for standard decoding (this handles <, >, etc.)
text = html_module.unescape(text)
if text == prev_text:
break
# Apply any remaining entity replacements
for entity, char in cls.ENTITY_MAP.items():
text = text.replace(entity, char)
return text
@staticmethod
def _decode_decimal(match):
"""Decode decimal HTML entity"""
try:
code = int(match.group(1))
if XMLValidator.is_valid_char_code(code):
return chr(code)
except:
pass
return match.group(0)
@staticmethod
def _decode_hex(match):
"""Decode hexadecimal HTML entity"""
try:
code = int(match.group(1), 16)
if XMLValidator.is_valid_char_code(code):
return chr(code)
except:
pass
return match.group(0)
class XMLValidator:
"""Handles XML validation and character checking"""
@staticmethod
def is_valid_char_code(codepoint: int) -> bool:
"""Check if a codepoint is valid for XML"""
return (
codepoint == 0x9 or
codepoint == 0xA or
codepoint == 0xD or
(0x20 <= codepoint <= 0xD7FF) or
(0xE000 <= codepoint <= 0xFFFD) or
(0x10000 <= codepoint <= 0x10FFFF)
)
@staticmethod
def is_valid_char(c: str) -> bool:
"""Check if a character is valid for XML"""
return XMLValidator.is_valid_char_code(ord(c))
@staticmethod
def clean_for_xml(text: str) -> str:
"""Remove invalid XML characters"""
return ''.join(c for c in text if XMLValidator.is_valid_char(c))
class ContentProcessor:
"""Handles content cleaning and processing - UPDATED WITH UNICODE PRESERVATION"""
@staticmethod
def safe_escape(text: str) -> str:
"""Escape XML special characters for use in XHTML titles/attributes"""
if text is None:
return ""
if not isinstance(text, str):
try:
text = str(text)
except Exception:
return ""
# Use html.escape to handle &, <, > and quotes; then escape single quotes
escaped = html_module.escape(text, quote=True)
escaped = escaped.replace("'", "'")
return escaped
class TitleExtractor:
"""Handles extraction of titles from HTML content - UPDATED WITH UNICODE PRESERVATION"""
@staticmethod
def extract_from_html(html_content: str, chapter_num: Optional[int] = None,
filename: Optional[str] = None, allow_paragraph_fallback: bool = True,
allow_generic_chapter_fallback: bool = True) -> Tuple[str, float]:
"""Extract title from HTML content with confidence score - KEEP ALL HEADERS INCLUDING NUMBERS"""
try:
# Decode entities first - PRESERVES UNICODE
html_content = HTMLEntityDecoder.decode(html_content)
soup = BeautifulSoup(html_content, 'lxml', from_encoding='utf-8')
candidates = []
# Strategy 1:
tag (highest confidence)
title_tag = soup.find('title')
if title_tag and title_tag.string:
title_text = HTMLEntityDecoder.decode(title_tag.string.strip())
if title_text and len(title_text) > 0 and title_text.lower() not in ['untitled', 'chapter', 'document']:
candidates.append((title_text, 0.95, "title_tag"))
# Strategy 2: h1 tags (very high confidence)
h1_tags = soup.find_all('h1')
for i, h1 in enumerate(h1_tags[:3]): # Check first 3 h1 tags
text = HTMLEntityDecoder.decode(h1.get_text(strip=True))
if text and len(text) < 300:
# First h1 gets highest confidence
confidence = 0.9 if i == 0 else 0.85
candidates.append((text, confidence, f"h1_tag_{i+1}"))
# Strategy 3: h2 tags (high confidence)
h2_tags = soup.find_all('h2')
for i, h2 in enumerate(h2_tags[:3]): # Check first 3 h2 tags
text = HTMLEntityDecoder.decode(h2.get_text(strip=True))
if text and len(text) < 250:
# First h2 gets highest confidence among h2s
confidence = 0.8 if i == 0 else 0.75
candidates.append((text, confidence, f"h2_tag_{i+1}"))
# Strategy 4: h3 tags (moderate confidence)
h3_tags = soup.find_all('h3')
for i, h3 in enumerate(h3_tags[:3]): # Check first 3 h3 tags
text = HTMLEntityDecoder.decode(h3.get_text(strip=True))
if text and len(text) < 200:
confidence = 0.7 if i == 0 else 0.65
candidates.append((text, confidence, f"h3_tag_{i+1}"))
# Strategy 5: Bold text in first elements (lower confidence)
# If paragraph fallback is disabled, avoid using tags as title sources
body_title_tags = ['div'] if not allow_paragraph_fallback else ['p', 'div']
first_elements = soup.find_all(body_title_tags)[:5]
for elem in first_elements:
for bold in elem.find_all(['b', 'strong'])[:2]: # Limit to first 2 bold items
bold_text = HTMLEntityDecoder.decode(bold.get_text(strip=True))
if bold_text and 2 <= len(bold_text) <= 150:
candidates.append((bold_text, 0.6, "bold_text"))
# Strategy 6: Center-aligned text (common for chapter titles)
center_tags = ['center', 'div'] if not allow_paragraph_fallback else ['center', 'div', 'p']
center_elements = soup.find_all(center_tags,
attrs={'align': 'center'}) or \
soup.find_all(center_tags if not allow_paragraph_fallback else ['div', 'p'],
style=lambda x: x and 'text-align' in x and 'center' in x)
for center in center_elements[:3]: # Check first 3 centered elements
text = HTMLEntityDecoder.decode(center.get_text(strip=True))
if text and 2 <= len(text) <= 200:
candidates.append((text, 0.65, "centered_text"))
# Strategy 7: All-caps text (common for titles in older books)
all_caps_tags = ['h1', 'h2', 'h3', 'div'] if not allow_paragraph_fallback else ['h1', 'h2', 'h3', 'p', 'div']
for elem in soup.find_all(all_caps_tags)[:10]:
text = elem.get_text(strip=True)
# Check if text is mostly uppercase
if text and len(text) > 2 and text.isupper():
decoded_text = HTMLEntityDecoder.decode(text)
# Keep it as-is (don't convert to title case automatically)
candidates.append((decoded_text, 0.55, "all_caps_text"))
# Strategy 8: Patterns in first paragraph (optional)
if allow_paragraph_fallback:
first_p = soup.find('p')
if first_p:
p_text = HTMLEntityDecoder.decode(first_p.get_text(strip=True))
# Look for "Chapter X: Title" patterns
chapter_pattern = re.match(
r'^(Chapter\s+[\dIVXLCDM]+\s*[:\-\u2013\u2014]\s*)(.{2,100})(?:\.|$)',
p_text, re.IGNORECASE
)
if chapter_pattern:
# Extract just the title part after "Chapter X:"
title_part = chapter_pattern.group(2).strip()
if title_part:
candidates.append((title_part, 0.8, "paragraph_pattern_title"))
# Also add the full "Chapter X: Title" as a lower confidence option
full_title = chapter_pattern.group(0).strip().rstrip('.')
candidates.append((full_title, 0.75, "paragraph_pattern_full"))
elif len(p_text) <= 100 and len(p_text) > 2:
# Short first paragraph might be the title
candidates.append((p_text, 0.4, "paragraph_standalone"))
# Strategy 9: Filename
if filename:
filename_match = re.search(r'response_\d+_(.+?)\.html', filename)
if filename_match:
filename_title = filename_match.group(1).replace('_', ' ').title()
if len(filename_title) > 2:
candidates.append((filename_title, 0.3, "filename"))
# Filter and rank candidates
if candidates:
unique_candidates = {}
for title, confidence, source in candidates:
# Clean the title but keep roman numerals and short titles
title = TitleExtractor.clean_title(title)
# Don't reject short titles (like "III", "IX") - they're valid!
if title and len(title) > 0:
# Don't apply is_valid_title check too strictly
# Roman numerals and chapter numbers are valid titles
if title not in unique_candidates or unique_candidates[title][1] < confidence:
unique_candidates[title] = (title, confidence, source)
if unique_candidates:
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x[1], reverse=True)
best_title, best_confidence, best_source = sorted_candidates[0]
# Log what we found for debugging (only if debug mode is enabled)
import os
debug_mode_enabled = os.environ.get('DEBUG_MODE', '0') == '1'
if debug_mode_enabled:
log(f"[DEBUG] Best title candidate: '{best_title}' (confidence: {best_confidence:.2f}, source: {best_source})")
return best_title, best_confidence
# Fallback - only use generic chapter number if allowed and nothing found
if allow_generic_chapter_fallback and chapter_num:
return f"Chapter {chapter_num}", 0.1
return "Untitled Chapter", 0.0
except Exception as e:
log(f"[WARNING] Error extracting title: {e}")
if allow_generic_chapter_fallback and chapter_num:
return f"Chapter {chapter_num}", 0.1
return "Untitled Chapter", 0.0
@staticmethod
def clean_title(title: str) -> str:
"""Clean and normalize extracted title - trust HTML headers, minimal cleanup only"""
if not title:
return ""
# Decode HTML entities - PRESERVES UNICODE
title = HTMLEntityDecoder.decode(title)
# Remove HTML tags only
title = re.sub(r'<[^>]+>', '', title)
# Normalize whitespace (convert non-breaking spaces, multiple spaces, etc.)
title = re.sub(r'[\xa0\u2000-\u200a\u202f\u205f\u3000]+', ' ', title)
title = re.sub(r'\s+', ' ', title).strip()
# Normalize Unicode to NFC form
title = unicodedata.normalize('NFC', title)
# Remove invisible zero-width characters only
title = re.sub(r'[\u200b\u200c\u200d\u200e\u200f\ufeff]', '', title)
# Final whitespace cleanup
title = ' '.join(title.split())
# Truncate if excessively long (safety check only)
if len(title) > 200:
truncated = title[:197]
last_space = truncated.rfind(' ')
if last_space > 150:
truncated = truncated[:last_space]
title = truncated + "..."
return title
@staticmethod
def is_valid_title(title: str) -> bool:
"""Check if extracted title is valid - ACCEPT SHORT TITLES LIKE ROMAN NUMERALS"""
if not title:
return False
# Accept any non-empty title after cleaning
# Don't reject roman numerals or short titles
# Only reject truly invalid patterns
invalid_patterns = [
r'^untitled$', # Just "untitled"
r'^chapter$', # Just "chapter" without a number
r'^document$', # Just "document"
]
for pattern in invalid_patterns:
if re.match(pattern, title.lower().strip()):
return False
# Skip obvious filler phrases
filler_phrases = [
'click here', 'read more', 'continue reading', 'next chapter',
'previous chapter', 'table of contents', 'back to top'
]
title_lower = title.lower().strip()
if any(phrase in title_lower for phrase in filler_phrases):
return False
# Accept everything else, including roman numerals and short titles
return True
class XHTMLConverter:
"""Handles XHTML conversion and compliance"""
# Default language for generated XHTML (used for html[@lang] and html[@xml:lang])
# This will be synchronized with the EPUB book language by EPUBCompiler.
DEFAULT_LANG = "en"
@classmethod
def set_default_language(cls, lang_code: str):
"""Set default language code used for html lang/xml:lang attributes"""
if not lang_code:
return
try:
cls.DEFAULT_LANG = str(lang_code).strip() or "en"
except Exception:
cls.DEFAULT_LANG = "en"
@staticmethod
def ensure_compliance(html_content: str, title: str = "Chapter",
css_links: Optional[List[str]] = None) -> str:
"""Ensure HTML content is XHTML-compliant while PRESERVING story tags"""
try:
import html
import re
# Unescape HTML entities but PRESERVE < and > so fake angle brackets in narrative
# text don't become real tags (which breaks parsing across paragraphs like the sample).
if any(ent in html_content for ent in ['&', '"', '', '<', '>']):
# Temporarily protect < and > (both cases) from unescaping
placeholder_lt = '\ue000'
placeholder_gt = '\ue001'
html_content = html_content.replace('<', placeholder_lt).replace('<', placeholder_lt)
html_content = html_content.replace('>', placeholder_gt).replace('>', placeholder_gt)
# Unescape remaining entities
html_content = html.unescape(html_content)
# Restore protected angle bracket entities
html_content = html_content.replace(placeholder_lt, '<').replace(placeholder_gt, '>')
# Strip out ANY existing DOCTYPE, XML declaration, or html wrapper
# We only want the body content
# Try to extract just body content
body_match = re.search(r'
]*>(.*?)', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
html_content = body_match.group(1)
else:
# No body tags, strip any DOCTYPE/html tags if present
html_content = re.sub(r'<\?xml[^>]*\?>', '', html_content)
html_content = re.sub(r']*>', '', html_content)
html_content = re.sub(r'?html[^>]*>', '', html_content)
html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL)
# Now process the content normally
# Fix broken attributes with ="" pattern
def fix_broken_attributes_only(match):
tag_content = match.group(0)
if '=""' in tag_content and tag_content.count('=""') > 2:
tag_match = re.match(r'<(\w+)', tag_content)
if tag_match:
tag_name = tag_match.group(1)
words = re.findall(r'(\w+)=""', tag_content)
if words:
content = ' '.join(words)
return f'<{tag_name}>{content}{tag_name}>'
return ''
return tag_content
# Fix
"Text... and orphaned < inside
def _fix_malformed_p_tags(text: str) -> str:
# Part 1: Fix
"Text...
# This handles the specific case where the opening p tag is malformed as
]*?
)', r'"\1', text, flags=re.IGNORECASE | re.DOTALL)
# Part 1.5: Fix p>Text... ->
Text...
# Handles cases where the opening bracket is missing for the p tag
# Only matches if it starts a line or follows a closing tag, to be safe
text = re.sub(r'(^|>)\s*p>([^<]*?
)', r'\1\2', text, flags=re.IGNORECASE | re.DOTALL)
# Part 2: Fix orphaned < inside
tags (e.g.
next_gt = content.find('>', i)
should_escape = False
if next_gt == -1:
# No closing >, definitely orphaned
should_escape = True
else:
# Has closing >. Check interior.
inner_text = content[i+1:next_gt]
# If inner text contains <, then the first < is likely text
# (e.g. "x < y and y < z") unless it's nested tags which is rare inside simple P
# But simpler check: valid tag names start with alpha or /alpha
# and generally don't contain spaces immediately unless attributes
# Check if it looks like a tag: or
# Strip whitespace to handle
inner_stripped = inner_text.strip()
tag_match = re.match(r'^/?([a-zA-Z0-9]+)', inner_stripped)
if not tag_match:
# < 3 or < . or < ? (though is processing instruction)
should_escape = True
else:
# It has a tag-like name.
# Check against known HTML tags if we want to be strict,
# but user said "if it's anything else, then it's a whole sentence"
# The case . We handled that above.
# What if we have ?
# User's example was ).
# If we have , it is technically a tag "Yeah".
# But let's assume if it's not a standard HTML tag, it might be text?
# The user didn't explicitly ask to fix , only orphaned ones.
# So we stick to the "orphaned" logic (missing >) OR "obviously not a tag" logic.
# For the purpose of this fix, the "missing >" check handles the user's ".
# One edge case: Text Text
# has > and matches [a-zA-Z]. Kept.
pass
if should_escape:
new_content.append('<')
last_pos = i + 1
else:
# It's a tag (or looks enough like one), keep the <
new_content.append('<')
last_pos = i + 1
i += 1
new_content.append(content[last_pos:])
return f"{open_tag}{''.join(new_content)}{close_tag}"
return re.sub(r'(]*>)(.*?)(
)', _process_p_content, text, flags=re.IGNORECASE | re.DOTALL)
html_content = _fix_malformed_p_tags(html_content)
html_content = re.sub(r'<[^>]*?=\"\"[^>]*?>', fix_broken_attributes_only, html_content)
# Sanitize attributes that contain a colon (:) but are NOT valid namespaces.
# Example:
# becomes:
def _sanitize_colon_attributes_in_tags(text: str) -> str:
# Process only inside start tags; skip closing tags, comments, doctypes, processing instructions
def _process_tag(tag_match):
tag = tag_match.group(0)
if tag.startswith('') or tag.startswith(']+>', _process_tag, text)
html_content = _sanitize_colon_attributes_in_tags(html_content)
# Convert only "story tags" whose TAG NAME contains a colon (e.g., ),
# but DO NOT touch valid HTML/SVG tags where colons appear in attributes (e.g., style="color:red" or xlink:href)
# and DO NOT touch namespaced tags like .
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
# NEW: Fix for "Empty Attribute Tags" (e.g. )
# Transforms hallucinated patterns like into
if os.getenv('FIX_EMPTY_ATTR_TAGS_EPUB', '0') == '1':
def _escape_empty_attr_tags(text: str) -> str:
# Known HTML tags to preserve
known_tags = {
'html','head','body','title','meta','link','style','script','noscript',
'p','div','span','br','hr','img','a','h1','h2','h3','h4','h5','h6',
'ul','ol','li','dl','dt','dd',
'pre','code','em','strong','b','i','u','s','strike','del','ins','mark','small','sub','sup',
'table','thead','tbody','tr','td','th','caption','col','colgroup',
'blockquote','q','cite',
'section','article','header','footer','nav','main','aside','details','summary',
'figure','figcaption',
'form','input','button','select','option','textarea','label','fieldset','legend',
'iframe','canvas','svg','math',
'video','audio','source','track','embed','object','param',
'map','area',
'center', 'font', 'base'
}
# Transform: Content --> <Tag Attr>Content
# This removes the empty attribute value and the closing tag, creating a visual "token" style representation
def _repl_pair(m):
tagname = m.group(1)
if tagname.lower() in known_tags:
return m.group(0)
attrname = m.group(2)
content = m.group(3)
return f"<{tagname} {attrname}>{content}"
# Match (allow whitespace and content)
text = re.sub(r'<([a-zA-Z0-9_\-]+)\s+([a-zA-Z0-9_\-]+)=""\s*>(.*?)\1>', _repl_pair, text, flags=re.DOTALL)
return text
html_content = _escape_empty_attr_tags(html_content)
def _escape_story_tag(match):
full_tag = match.group(0) # Entire <...> or
tag_name = match.group(1) # The tag name possibly containing ':'
prefix = tag_name.split(':', 1)[0].lower()
# If this is a known namespace prefix (e.g., svg:rect), leave it alone
if prefix in allowed_ns_prefixes:
return full_tag
# Otherwise, treat as a story/fake tag and replace angle brackets with Chinese brackets
return full_tag.replace('<', '《').replace('>', '》')
# Escape invalid story tags (tag names containing ':') so they render literally with angle brackets.
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
def _escape_story_tag_entities(m):
tagname = m.group(1)
prefix = tagname.split(':', 1)[0].lower()
if prefix in allowed_ns_prefixes:
return m.group(0)
tag_text = m.group(0)
return tag_text.replace('<', '<').replace('>', '>')
# Apply in order: self-closing, opening, closing
html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, html_content)
html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, html_content)
html_content = re.sub(r'([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, html_content)
# PREVENT malformed "fake tags" like from being parsed as tags
# We only target angle-bracketed text that has spaces and NO '=' (so it's not real attributes)
# and ends with either '>' or the entity '>'.
def _escape_plaintext_angle_brackets(txt: str) -> str:
def repl(m):
inner = m.group(1)
# If looks like a real tag (has '=' or '/') keep it
# Check for start chars /!? or if it has attributes (=)
if '=' in inner or inner.strip().startswith(('/', '!', '?')):
return m.group(0)
# If the first token is a known HTML tag name, keep it
tokens = inner.strip().split()
if not tokens:
return m.group(0)
first = tokens[0].lower()
# Handle self-closing tags like by removing trailing slash
if first.endswith('/'):
first = first[:-1]
known = {
'html','head','body','title','meta','link','style','script','noscript',
'p','div','span','br','hr','img','a','h1','h2','h3','h4','h5','h6',
'ul','ol','li','dl','dt','dd',
'pre','code','em','strong','b','i','u','s','strike','del','ins','mark','small','sub','sup',
'table','thead','tbody','tr','td','th','caption','col','colgroup',
'blockquote','q','cite',
'section','article','header','footer','nav','main','aside','details','summary',
'figure','figcaption',
'form','input','button','select','option','textarea','label','fieldset','legend',
'iframe','canvas','svg','math',
'video','audio','source','track','embed','object','param',
'map','area',
'center', 'font', 'base'
}
if first in known:
return m.group(0)
# Otherwise, treat as narrative text in angle brackets and escape
return f'<{inner}>'
# Match <...> where content matches non-brackets.
# Allow single words without spaces (e.g. )
pattern = r'<([^<>]+)>'
txt = re.sub(pattern, repl, txt)
# Also handle cases where closing bracket is already an entity.
# IMPORTANT: Don't let this match across *real tags* like:
# ...
# because it will convert the *start tag* into literal text (<a ...), breaking TOC links.
def repl_gt(m):
inner = m.group(1)
if '=' in inner or inner.strip().startswith(('/', '!', '?')):
return m.group(0)
tokens = inner.strip().split()
if not tokens:
return m.group(0)
first = tokens[0].lower()
if first.endswith('/'):
first = first[:-1]
known = {
'html','head','body','title','meta','link','style','script','noscript',
'p','div','span','br','hr','img','a','h1','h2','h3','h4','h5','h6',
'ul','ol','li','dl','dt','dd',
'pre','code','em','strong','b','i','u','s','strike','del','ins','mark','small','sub','sup',
'table','thead','tbody','tr','td','th','caption','col','colgroup',
'blockquote','q','cite',
'section','article','header','footer','nav','main','aside','details','summary',
'figure','figcaption',
'form','input','button','select','option','textarea','label','fieldset','legend',
'iframe','canvas','svg','math',
'video','audio','source','track','embed','object','param',
'map','area',
'center', 'font', 'base'
}
if first in known:
return m.group(0)
return f'<{inner}>'
pattern_gt = r'<([^<>]+)>'
txt = re.sub(pattern_gt, repl_gt, txt)
return txt
html_content = _escape_plaintext_angle_brackets(html_content)
# Parse with lxml
from lxml import html as lxml_html, etree
parser = lxml_html.HTMLParser(recover=True)
doc = lxml_html.document_fromstring(f"{html_content}
", parser=parser)
# Fix common malformed link attributes produced by some EPUB sources/LLM output:
# href="" -> href="part0008.html#id"
# href="<part0008.html#id>" -> href="part0008.html#id"
# If left as-is, XML serialization will escape the angle brackets, breaking navigation.
def _strip_angle_wrapped_url(v: str) -> str:
try:
if v is None:
return v
s = str(v).strip()
if not s:
return s
sl = s.lower()
# Full wrapper (entities)
if sl.startswith('<') and sl.endswith('>') and len(s) >= 8:
return s[4:-4].strip()
# Full wrapper (literal)
if s.startswith('<') and s.endswith('>') and len(s) >= 2:
return s[1:-1].strip()
# One-sided wrappers (best-effort)
if sl.startswith('<'):
s = s[4:]
if sl.endswith('>') and len(s) >= 4:
s = s[:-4]
if s.startswith('<'):
s = s[1:]
if s.endswith('>'):
s = s[:-1]
return s.strip()
except Exception:
return v
try:
for el in doc.iter():
# Common link attributes
for attr in ('href', 'src'):
try:
if attr in el.attrib:
old = el.attrib.get(attr)
new = _strip_angle_wrapped_url(old)
if new != old:
el.attrib[attr] = new
except Exception:
pass
# Namespaced link attributes (e.g., SVG)
try:
for attr in ("{http://www.w3.org/1999/xlink}href", 'xlink:href'):
if attr in el.attrib:
old = el.attrib.get(attr)
new = _strip_angle_wrapped_url(old)
if new != old:
el.attrib[attr] = new
except Exception:
pass
except Exception:
pass
# Get the content back
# Use HTML method if enabled (better whitespace preservation for buggy readers like Freda)
# but may reduce XHTML compliance. Default: xml (strict XHTML)
serialize_method = 'html' if os.getenv('EPUB_USE_HTML_METHOD', '0') == '1' else 'xml'
body_xhtml = etree.tostring(doc, method=serialize_method, encoding='unicode')
# Remove the wrapper div we added
body_xhtml = re.sub(r'^]*>|
$', '', body_xhtml)
# Optionally replace angle-bracket entities with Chinese brackets
# Default behavior: keep them as entities (< >) so the output preserves the original text
bracket_style = os.getenv('ANGLE_BRACKET_OUTPUT', 'entity').lower()
if '<' in body_xhtml or '>' in body_xhtml:
if bracket_style in ('cjk', 'chinese', 'cjk_brackets'):
body_xhtml = body_xhtml.replace('<', '《').replace('>', '》')
# else: keep as entities
# Build our own clean XHTML document
return XHTMLConverter._build_xhtml(title, body_xhtml, css_links)
except Exception as e:
log(f"[WARNING] Failed to ensure XHTML compliance: {e}")
import traceback
log(f"[DEBUG] Full traceback:\n{traceback.format_exc()}")
log(f"[DEBUG] Failed chapter title: {title}")
log(f"[DEBUG] First 500 chars of input: {html_content[:500] if html_content else 'EMPTY'}")
return XHTMLConverter._build_fallback_xhtml(title)
@staticmethod
def _build_xhtml(title: str, body_content: str, css_links: Optional[List[str]] = None) -> str:
"""Build XHTML document"""
if not body_content.strip():
body_content = 'Empty chapter
'
title = ContentProcessor.safe_escape(title)
body_content = XHTMLConverter._ensure_xml_safe_readable(body_content)
xml_declaration = ''
doctype = ''
# Use class-level default language for html element language attributes
lang = getattr(XHTMLConverter, "DEFAULT_LANG", "en")
xhtml_parts = [
xml_declaration,
doctype,
f'',
'',
' ',
f'{title} '
]
if css_links:
for css_link in css_links:
if css_link.startswith(' ')
xhtml_parts.extend([
'',
'',
body_content,
'',
''
])
return '\n'.join(xhtml_parts)
@staticmethod
def _ensure_xml_safe_readable(content: str) -> str:
"""Ensure content is XML-safe"""
content = re.sub(
r'&(?!(?:'
r'[a-zA-Z][a-zA-Z0-9]{0,30};|'
r'#[0-9]{1,7};|'
r'#x[0-9a-fA-F]{1,6};'
r'))',
'&',
content
)
return content
@staticmethod
def _build_fallback_xhtml(title: str) -> str:
"""Build minimal fallback XHTML"""
safe_title = re.sub(r'[<>&"\']+', '', str(title))
if not safe_title:
safe_title = "Chapter"
lang = getattr(XHTMLConverter, "DEFAULT_LANG", "en")
return f'''
{ContentProcessor.safe_escape(safe_title)}
Error processing content. Please check the source file.
'''
@staticmethod
def validate(content: str) -> str:
"""Validate and fix XHTML content - WITH DEBUGGING"""
import re
# Ensure XML declaration
if not content.strip().startswith('\n' + content
# Remove control characters
content = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', content)
# Fix unescaped ampersands
content = re.sub(
r'&(?!(?:'
r'amp|lt|gt|quot|apos|'
r'[a-zA-Z][a-zA-Z0-9]{1,31}|'
r'#[0-9]{1,7}|'
r'#x[0-9a-fA-F]{1,6}'
r');)',
'&',
content
)
# Fix unquoted attributes
try:
content = re.sub(r'<([^>]+)\s+(\w+)=([^\s"\'>]+)([>\s])', r'<\1 \2="\3"\4', content)
except re.error:
pass # Skip if regex fails
# Sanitize invalid colon-containing attribute names (preserve XML/xlink/epub/xmlns)
def _sanitize_colon_attrs_in_content(text: str) -> str:
def _process_tag(m):
tag = m.group(0)
if tag.startswith('') or tag.startswith(']+>', _process_tag, text)
content = _sanitize_colon_attrs_in_content(content)
# Escape invalid story tags so they render literally with angle brackets in output
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
def _escape_story_tag_entities(m):
tagname = m.group(1)
prefix = tagname.split(':', 1)[0].lower()
if prefix in allowed_ns_prefixes:
return m.group(0)
tag_text = m.group(0)
return tag_text.replace('<', '<').replace('>', '>')
# Apply in order: self-closing, opening, closing
content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, content)
content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, content)
content = re.sub(r'([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, content)
# Clean for XML
content = XMLValidator.clean_for_xml(content)
# Try to parse for validation
try:
ET.fromstring(content.encode('utf-8'))
except ET.ParseError as e:
log(f"[WARNING] XHTML validation failed: {e}")
# DEBUG: Show what's at the error location
import re
match = re.search(r'line (\d+), column (\d+)', str(e))
if match:
line_num = int(match.group(1))
col_num = int(match.group(2))
lines = content.split('\n')
log(f"[DEBUG] Error at line {line_num}, column {col_num}")
log(f"[DEBUG] Total lines in content: {len(lines)}")
if line_num <= len(lines):
problem_line = lines[line_num - 1]
log(f"[DEBUG] Full problem line: {problem_line!r}")
# Show the problem area
if col_num <= len(problem_line):
# Show 40 characters before and after
start = max(0, col_num - 40)
end = min(len(problem_line), col_num + 40)
log(f"[DEBUG] Context around error: {problem_line[start:end]!r}")
log(f"[DEBUG] Character at column {col_num}: {problem_line[col_num-1]!r} (U+{ord(problem_line[col_num-1]):04X})")
# Show 5 characters before and after with hex
for i in range(max(0, col_num-5), min(len(problem_line), col_num+5)):
char = problem_line[i]
marker = " <-- ERROR" if i == col_num-1 else ""
log(f"[DEBUG] Col {i+1}: {char!r} (U+{ord(char):04X}){marker}")
else:
log(f"[DEBUG] Column {col_num} is beyond line length {len(problem_line)}")
else:
log(f"[DEBUG] Line {line_num} doesn't exist (only {len(lines)} lines)")
# Show last few lines
for i in range(max(0, len(lines)-3), len(lines)):
log(f"[DEBUG] Line {i+1}: {lines[i][:100]!r}...")
# Try to recover
content = XHTMLConverter._attempt_recovery(content, e)
return content
@staticmethod
def _attempt_recovery(content: str, error: ET.ParseError) -> str:
"""Attempt to recover from XML parse errors - ENHANCED"""
try:
# Use BeautifulSoup to fix structure
soup = BeautifulSoup(content, 'lxml')
# Ensure we have proper XHTML structure
if not soup.find('html'):
# Use default XHTML namespace and language attributes
lang = getattr(XHTMLConverter, "DEFAULT_LANG", "en")
new_soup = BeautifulSoup(f'', 'lxml')
html_tag = new_soup.html
for child in list(soup.children):
html_tag.append(child)
soup = new_soup
# Ensure we have head and body
if not soup.find('head'):
head = soup.new_tag('head')
meta = soup.new_tag('meta')
meta['http-equiv'] = 'Content-Type'
meta['content'] = 'text/html; charset=utf-8'
head.append(meta)
title_tag = soup.new_tag('title')
title_tag.string = 'Chapter'
head.append(title_tag)
if soup.html:
soup.html.insert(0, head)
if not soup.find('body'):
body = soup.new_tag('body')
if soup.html:
for child in list(soup.html.children):
if child.name not in ['head', 'body']:
body.append(child.extract())
soup.html.append(body)
# Convert back to string
recovered = str(soup)
# Ensure proper XML declaration
if not recovered.strip().startswith('\n' + recovered
# Add DOCTYPE if missing
if '')
recovered = '\n'.join(lines)
# Final validation
ET.fromstring(recovered.encode('utf-8'))
log(f"[INFO] Successfully recovered XHTML")
return recovered
except Exception as recovery_error:
log(f"[WARNING] Recovery attempt failed: {recovery_error}")
# Last resort: use fallback
return XHTMLConverter._build_fallback_xhtml("Chapter")
class FileUtils:
"""File handling utilities"""
@staticmethod
def sanitize_filename(filename: str, allow_unicode: bool = False) -> str:
"""Sanitize filename for safety"""
if allow_unicode:
filename = unicodedata.normalize('NFC', filename)
replacements = {
'/': '_', '\\': '_', ':': '_', '*': '_',
'?': '_', '"': '_', '<': '_', '>': '_',
'|': '_', '\0': '_',
}
for old, new in replacements.items():
filename = filename.replace(old, new)
filename = ''.join(char for char in filename if ord(char) >= 32 or ord(char) == 9)
else:
filename = unicodedata.normalize('NFKD', filename)
try:
filename = filename.encode('ascii', 'ignore').decode('ascii')
except:
filename = ''.join(c if ord(c) < 128 else '_' for c in filename)
replacements = {
'/': '_', '\\': '_', ':': '_', '*': '_',
'?': '_', '"': '_', '<': '_', '>': '_',
'|': '_', '\n': '_', '\r': '_', '\t': '_',
'&': '_and_', '#': '_num_', ' ': '_',
}
for old, new in replacements.items():
filename = filename.replace(old, new)
filename = ''.join(char for char in filename if ord(char) >= 32)
filename = re.sub(r'_+', '_', filename)
filename = filename.strip('_')
# Limit length
name, ext = os.path.splitext(filename)
if len(name) > 100:
name = name[:100]
if not name or name == '_':
name = 'file'
return name + ext
@staticmethod
def ensure_bytes(content) -> bytes:
"""Ensure content is bytes"""
if content is None:
return b''
if isinstance(content, bytes):
return content
if not isinstance(content, str):
content = str(content)
return content.encode('utf-8')
class EPUBCompiler:
"""Main EPUB compilation class"""
def __init__(self, base_dir: str, log_callback: Optional[Callable] = None, stop_callback: Optional[Callable] = None):
self.base_dir = os.path.abspath(base_dir)
self.log_callback = log_callback
self.stop_callback = stop_callback
self.output_dir = self.base_dir
self.images_dir = os.path.join(self.output_dir, "images")
self.css_dir = os.path.join(self.output_dir, "css")
self.fonts_dir = os.path.join(self.output_dir, "fonts")
self.metadata_path = os.path.join(self.output_dir, "metadata.json")
self.attach_css_to_chapters = os.getenv('ATTACH_CSS_TO_CHAPTERS', '0') == '1' # Default to '0' (disabled)
# EPUB layout mode: 'auto' (detect from source), 'epub2' (force OEBPS/Text), 'epub3' (flat OEBPS)
_layout_mode = os.getenv('EPUB_LAYOUT_MODE', '').lower().strip()
if _layout_mode == 'epub2':
self.legacy_epub_structure = True
elif _layout_mode == 'epub3':
self.legacy_epub_structure = False
elif _layout_mode == 'auto':
# Auto-detect from source EPUB's content.opf manifest
self.legacy_epub_structure = self._detect_epub2_layout()
else:
# Fallback to old boolean env var for backward compatibility
self.legacy_epub_structure = os.getenv('LEGACY_EPUB_STRUCTURE', '0') == '1'
# Source toc.ncx options
self.use_toc_ncx = os.getenv('USE_TOC_NCX', '0') == '1'
self.translate_toc_ncx = os.getenv('TRANSLATE_TOC_NCX', '0') == '1'
if self.translate_toc_ncx and not self.use_toc_ncx:
# Translation implies we must read the source toc.ncx
self.use_toc_ncx = True
self.max_workers = int(os.environ.get("EXTRACTION_WORKERS", "4"))
self.log(f"[INFO] Using {self.max_workers} workers for parallel processing")
_layout_label = _layout_mode if _layout_mode else ('legacy' if self.legacy_epub_structure else 'epub3')
self.log(f"[INFO] EPUB layout mode: {_layout_label} → {'EPUB2 (OEBPS/Text)' if self.legacy_epub_structure else 'EPUB3 (flat OEBPS)'}")
if self.use_toc_ncx:
self.log("[INFO] Use toc.ncx enabled: TOC will be built from the source EPUB's toc.ncx")
if self.translate_toc_ncx:
self.log("[INFO] Translate toc.ncx enabled: TOC entries will be translated in one API call and cached to TOC.txt")
# Track auxiliary (non-chapter) HTML files to include in spine but omit from TOC
self.auxiliary_html_files: set[str] = set()
# SVG rasterization settings
self.rasterize_svg = os.getenv('RASTERIZE_SVG_FALLBACK', '1') == '1'
try:
import cairosvg # noqa: F401
self._cairosvg_available = True
except Exception:
self._cairosvg_available = False
# Set global log callback
set_global_log_callback(log_callback)
# translation features
self.html_dir = self.output_dir # For compatibility
self.translate_titles = os.getenv('TRANSLATE_BOOK_TITLE', '1') == '1'
# Initialize API client if needed
self.api_client = None
if (self.translate_titles or os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1' or getattr(self, 'translate_toc_ncx', False)):
model = os.getenv('MODEL')
api_key = os.getenv('API_KEY')
if model and api_key and UnifiedClient:
self.api_client = UnifiedClient(api_key=api_key, model=model, output_dir=self.output_dir)
elif model and api_key and not UnifiedClient:
self.log("Warning: UnifiedClient module not available, translation features disabled")
# Enhance with translation features
enhance_epub_compiler(self)
def _detect_epub2_layout(self) -> bool:
"""Auto-detect whether the source EPUB used an EPUB2-style folder layout.
Checks the content.opf manifest for item hrefs that reference a ``Text/``
subdirectory, which is the hallmark of the classic ``OEBPS/Text/`` layout.
Returns ``True`` if an EPUB2 layout is detected, ``False`` otherwise
(including when content.opf doesn't exist – defaults to modern EPUB3).
"""
import xml.etree.ElementTree as _ET
opf_path = os.path.join(self.output_dir, 'content.opf')
if not os.path.exists(opf_path):
return False # no content.opf → default to EPUB3
try:
tree = _ET.parse(opf_path)
root = tree.getroot()
ns = {'opf': 'http://www.idpf.org/2007/opf'}
for item in root.findall('.//opf:manifest/opf:item', ns):
href = item.get('href', '')
if href.startswith('Text/') or href.startswith('text/'):
self.log("[INFO] Auto-detected EPUB2 layout (Text/ references in content.opf)")
return True
except Exception:
pass
return False
def log(self, message: str):
"""Log a message"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def is_stopped(self) -> bool:
"""Check if stop has been requested"""
# Check both the global flag and the callback
if is_stop_requested():
return True
if self.stop_callback and self.stop_callback():
return True
return False
def compile(self):
"""Main compilation method"""
try:
# Debug: Check what metadata enhancement was done
self.log("[DEBUG] Checking metadata translation setup...")
self.log(f"[DEBUG] Has api_client: {hasattr(self, 'api_client') and self.api_client is not None}")
self.log(f"[DEBUG] Has metadata_translator: {hasattr(self, 'metadata_translator')}")
self.log(f"[DEBUG] Has translate_metadata_fields: {hasattr(self, 'translate_metadata_fields')}")
if hasattr(self, 'translate_metadata_fields'):
self.log(f"[DEBUG] translate_metadata_fields content: {self.translate_metadata_fields}")
enabled_fields = [k for k, v in self.translate_metadata_fields.items() if v]
self.log(f"[DEBUG] Enabled metadata fields: {enabled_fields}")
# Pre-flight check
if not self._preflight_check():
return
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Analyze chapters FIRST to get the structure
chapter_titles_info = self._analyze_chapters()
# Check stop flag after chapter analysis
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Debug: Check if batch translation is enabled
self.log(f"[DEBUG] Batch translation enabled: {getattr(self, 'batch_translate_headers', False)}")
self.log(f"[DEBUG] Has header translator: {hasattr(self, 'header_translator')}")
self.log(f"[DEBUG] EPUB_PATH env: {os.getenv('EPUB_PATH', 'NOT SET')}")
self.log(f"[DEBUG] HTML dir: {self.html_dir}")
# PRIORITY: Try standalone header translation first
standalone_success = False
if (hasattr(self, 'batch_translate_headers') and self.batch_translate_headers and
run_standalone_header_translation is not None):
self.log("\n🔄 Attempting standalone header translation (content.opf based)...")
try:
# Check if translated_headers.txt already exists
translations_file = os.path.join(self.output_dir, "translated_headers.txt")
if os.path.exists(translations_file):
self.log("📁 Found existing translated_headers.txt - applying existing translations...")
self.log(f" Translation file: {translations_file}")
self.log(f" HTML directory: {self.html_dir}")
try:
# Import the functions from standalone module
from translate_headers_standalone import (
load_translations_from_file,
apply_existing_translations,
extract_source_chapters_with_opf_mapping,
match_output_to_source_chapters
)
self.log("✅ Successfully imported standalone module functions")
# Load existing translations
self.log("🔍 Loading translations from file...")
chapters_info, translated_headers, _ = load_translations_from_file(translations_file, self.log)
if translated_headers:
self.log(f"📋 Loaded {len(translated_headers)} existing translations:")
# Show first 3 translations for debugging
for num in list(translated_headers.keys())[:3]:
self.log(f" Chapter {num}: {translated_headers[num]}")
# Get the source EPUB path
source_epub_path = os.getenv('EPUB_PATH')
self.log(f" Source EPUB path: {source_epub_path}")
if source_epub_path and os.path.exists(source_epub_path):
self.log(f"🔄 Applying translations to HTML files in: {self.html_dir}")
# Apply translations using the standalone module's function
result = apply_existing_translations(
epub_path=source_epub_path,
output_dir=self.html_dir,
translations_file=translations_file,
update_html=getattr(self, 'update_html_headers', True),
log_callback=self.log
)
if result:
self.log(f"✅ Successfully applied translations to {len(result)} files:")
# Show first 3 updated files
for filename in list(result.keys())[:3]:
self.log(f" {filename}: {result[filename]}")
# CRITICAL: Update chapter_titles_info so TOC uses translated titles now
updated_for_toc = 0
for chap_num, (orig_title, conf, fname) in list(chapter_titles_info.items()):
base = os.path.basename(fname)
if base in result:
chapter_titles_info[chap_num] = (result[base], max(conf, 0.95), 'existing_translation')
updated_for_toc += 1
self.log(f"📝 Updated TOC titles from existing translations: {updated_for_toc} entries")
else:
self.log("⚠️ No files were updated with translations (result was empty)")
else:
self.log(f"⚠️ Source EPUB not found or doesn't exist: {source_epub_path}")
self.log(" Cannot apply translations without source EPUB for mapping")
else:
self.log("⚠️ No translations were loaded from file (empty result)")
standalone_success = True
except ImportError as e:
self.log(f"⚠️ Failed to import standalone module: {e}")
self.log(" Make sure translate_headers_standalone.py is in the same directory")
import traceback
self.log(traceback.format_exc())
standalone_success = False
except Exception as e:
self.log(f"⚠️ Failed to apply existing translations: {e}")
self.log(f" Exception type: {type(e).__name__}")
import traceback
self.log(traceback.format_exc())
standalone_success = False
else:
# Get the source EPUB path from environment
source_epub_path = os.getenv('EPUB_PATH')
if source_epub_path and os.path.exists(source_epub_path):
self.log(f"📚 Source EPUB: {os.path.basename(source_epub_path)}")
self.log(f"📂 Output HTML dir: {self.html_dir}")
# Run standalone header translation
result = run_standalone_header_translation(
source_epub_path=source_epub_path,
output_html_dir=self.html_dir,
log_callback=self.log
)
if result:
self.log("✅ Standalone header translation completed successfully")
standalone_success = True
# CRITICAL: Update chapter_titles_info so TOC uses translated titles
# result is a dict mapping filename -> translated_title
updated_for_toc = 0
for chap_num, (orig_title, conf, fname) in list(chapter_titles_info.items()):
base = os.path.basename(fname)
if base in result:
chapter_titles_info[chap_num] = (result[base], max(conf, 0.95), 'standalone_translation')
updated_for_toc += 1
self.log(f"📝 Updated TOC titles from standalone translation: {updated_for_toc} entries")
else:
self.log("⚠️ Standalone header translation returned no result")
else:
self.log(f"⚠️ Source EPUB not found: {source_epub_path}")
except Exception as e:
self.log(f"⚠️ Standalone header translation failed: {e}")
import traceback
self.log(traceback.format_exc())
# FALLBACK: Extract source headers AND current titles if batch translation is enabled
# Only run if standalone translation was not successful AND fallback is enabled
source_headers = {}
current_titles = {}
use_fallback = os.getenv('USE_SORTED_FALLBACK', '0') == '1'
if (not standalone_success and use_fallback and
hasattr(self, 'batch_translate_headers') and self.batch_translate_headers and
hasattr(self, 'header_translator') and self.header_translator):
self.log("\n🔄 Using fallback header translation method...")
# Check if the extraction method exists
if hasattr(self, '_extract_source_headers_and_current_titles'):
# Use the new extraction method
source_headers, current_titles = self._extract_source_headers_and_current_titles()
self.log(f"[DEBUG] Extraction complete: {len(source_headers)} source, {len(current_titles)} current")
else:
self.log("⚠️ Missing _extract_source_headers_and_current_titles method!")
# Batch translate headers if we have source headers (fallback only)
translated_headers = {}
if (not standalone_success and source_headers and
hasattr(self, 'header_translator') and self.header_translator):
# Check if translated_headers.txt already exists
translations_file = os.path.join(self.output_dir, "translated_headers.txt")
if os.path.exists(translations_file):
# File exists - load and apply existing translations
self.log("📁 Found existing translated_headers.txt - applying existing translations...")
try:
# Import the functions from standalone module
from translate_headers_standalone import load_translations_from_file
# Load existing translations
_, translated_headers, _ = load_translations_from_file(translations_file, self.log)
if translated_headers:
self.log(f"📋 Loaded {len(translated_headers)} existing translations")
# Apply translations to HTML files using the same method
if hasattr(self, 'update_html_headers') and self.update_html_headers:
self.header_translator._update_html_headers_exact(
self.html_dir,
translated_headers,
current_titles
)
self.log("✅ Applied existing translations to HTML files")
# CRITICAL: Update chapter_titles_info so TOC gets translated titles on this run
updated_for_toc = 0
# Build filename -> translated mapping from current_titles' indices
file_to_trans = {}
for num, info in current_titles.items():
if num in translated_headers:
base = os.path.basename(info.get('filename', ''))
if base:
file_to_trans[base] = translated_headers[num]
for chap_num, (orig_title, conf, fname) in list(chapter_titles_info.items()):
base = os.path.basename(fname)
if base in file_to_trans:
chapter_titles_info[chap_num] = (file_to_trans[base], max(conf, 0.95), 'existing_translation')
updated_for_toc += 1
self.log(f"📝 Updated TOC titles from existing translations (fallback path): {updated_for_toc} entries")
# Update toc.ncx if it exists
toc_path = os.path.join(self.output_dir, 'toc.ncx')
if os.path.exists(toc_path):
from translate_headers_standalone import update_toc_ncx
update_toc_ncx(toc_path, translated_headers, current_titles, self.log)
else:
self.log("⚠️ No translations found in existing file")
except Exception as e:
self.log(f"⚠️ Failed to apply existing translations: {e}")
import traceback
self.log(traceback.format_exc())
else:
# No existing file - proceed with translation
self.log("🌐 Batch translating chapter headers...")
try:
# Check if the translator has been initialized properly
if not hasattr(self.header_translator, 'client') or not self.header_translator.client:
self.log("⚠️ Header translator not properly initialized, skipping batch translation")
else:
self.log(f"📚 Found {len(source_headers)} headers to translate")
self.log(f"📚 Found {len(current_titles)} current titles in HTML files")
# Debug: Show a few examples
for num in list(source_headers.keys())[:3]:
self.log(f" Example - Chapter {num}: {source_headers[num]}")
_hpb = getattr(self, 'headers_per_batch', -1)
translated_headers = self.header_translator.translate_and_save_headers(
html_dir=self.html_dir,
headers_dict=source_headers,
batch_size=_hpb if _hpb > 0 else None,
output_dir=self.output_dir,
update_html=getattr(self, 'update_html_headers', True),
save_to_file=getattr(self, 'save_header_translations', True),
current_titles=current_titles # Pass current titles for exact replacement
)
# Update chapter_titles_info with translations
if translated_headers:
self.log("\n📝 Updating chapter titles in EPUB structure...")
for chapter_num, translated_title in translated_headers.items():
if chapter_num in chapter_titles_info:
# Keep the original confidence and method, just update the title
orig_title, confidence, method = chapter_titles_info[chapter_num]
chapter_titles_info[chapter_num] = (translated_title, confidence, method)
self.log(f"✓ Chapter {chapter_num}: {source_headers.get(chapter_num, 'Unknown')} → {translated_title}")
else:
# Add new entry if not in chapter_titles_info
chapter_titles_info[chapter_num] = (translated_title, 1.0, 'batch_translation')
self.log(f"✓ Added Chapter {chapter_num}: {translated_title}")
except Exception as e:
self.log(f"⚠️ Batch translation failed: {e}")
import traceback
self.log(traceback.format_exc())
# Continue with compilation even if translation fails
elif not standalone_success and not use_fallback and hasattr(self, 'batch_translate_headers') and self.batch_translate_headers:
# Standalone failed but fallback is disabled (only warn if batch translation is enabled)
self.log("⚠️ Standalone header translation failed and sorted fallback is disabled")
self.log(" Enable 'Use Sorted Fallback' in Other Settings if needed")
else:
if not standalone_success and not source_headers:
self.log("⚠️ No source headers found, skipping batch translation")
elif not hasattr(self, 'header_translator'):
self.log("⚠️ No header translator available")
# Find HTML files
html_files = self._find_html_files()
if not html_files:
raise Exception("No translated chapters found to compile into EPUB")
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Load metadata
metadata = self._load_metadata()
# Translate metadata if configured
if hasattr(self, 'metadata_translator') and self.metadata_translator:
if hasattr(self, 'translate_metadata_fields') and any(self.translate_metadata_fields.values()):
self.log("🌐 Translating metadata fields...")
try:
translated_metadata = self.metadata_translator.translate_metadata(
metadata,
self.translate_metadata_fields,
mode=getattr(self, 'metadata_translation_mode', 'together')
)
# Preserve original values and mark fields as translated so we don't
# keep re-translating metadata on every EPUB rebuild.
for field, should_translate in self.translate_metadata_fields.items():
if not should_translate:
continue
if field in metadata and field in translated_metadata:
if metadata[field] != translated_metadata[field]:
# Store original value (if not already present)
original_key = f'original_{field}'
if original_key not in translated_metadata:
translated_metadata[original_key] = metadata[field]
# Mark as translated so future runs can detect it
translated_metadata[f'{field}_translated'] = True
metadata = translated_metadata
except Exception as e:
self.log(f"⚠️ Metadata translation failed: {e}")
# Continue with original metadata
# Decide language for EPUB based primarily on GUI's OUTPUT_LANGUAGE
try:
detected_lang = self._detect_primary_language_from_html(html_files)
if detected_lang:
current_lang = metadata.get("language")
if not current_lang or current_lang.lower() != detected_lang.lower():
self.log(f"[INFO] Setting metadata language from OUTPUT_LANGUAGE: {current_lang!r} -> '{detected_lang}'")
metadata["language"] = detected_lang
except Exception as e:
self.log(f"[WARNING] Failed to determine EPUB language from OUTPUT_LANGUAGE: {e}")
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Create EPUB book
book = self._create_book(metadata)
# Process all components
spine = []
toc = []
# Add CSS
css_items = self._add_css_files(book)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Add fonts
self._add_fonts(book)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Process images and cover
processed_images, cover_file = self._process_images()
# Compress images if enabled (before adding to EPUB)
if os.environ.get('ENABLE_IMAGE_COMPRESSION', '0') == '1':
processed_images, cover_file = self._compress_images(processed_images, cover_file)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Add images to book
self._add_images_to_book(book, processed_images, cover_file)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Add cover page if exists
if cover_file:
cover_page = self._create_cover_page(book, cover_file, processed_images, css_items, metadata)
if cover_page:
spine.insert(0, cover_page)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Build OPF filename map for restoring original names inside the EPUB
self._opf_filename_map = self._build_opf_filename_map()
if self._opf_filename_map:
self.log(f"✅ Loaded {len(self._opf_filename_map)} original filenames from content.opf")
# Process chapters with updated titles
chapters_added = self._process_chapters(
book, html_files, chapter_titles_info,
css_items, processed_images, spine, toc, metadata
)
if chapters_added == 0:
raise Exception("No chapters could be added to the EPUB")
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Add optional gallery (unless disabled)
disable_gallery = os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1'
if disable_gallery:
self.log("📷 Image gallery disabled by user preference")
else:
gallery_images = [img for img in processed_images.values() if img != cover_file]
if gallery_images:
self.log(f"📷 Creating image gallery with {len(gallery_images)} images...")
gallery_page = self._create_gallery_page(book, gallery_images, css_items, metadata)
spine.append(gallery_page)
toc.append(gallery_page)
else:
self.log("📷 No images found for gallery")
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Optional: Build TOC from the source EPUB's toc.ncx (and optionally translate it)
if getattr(self, 'use_toc_ncx', False):
try:
toc = self._build_toc_from_source_toc_ncx(
spine=spine,
existing_toc=toc,
metadata=metadata
)
except Exception as e:
self.log(f"⚠️ Failed to build TOC from source toc.ncx: {e}")
# Finalize book
self._finalize_book(book, spine, toc, cover_file)
# Check stop flag
if self.is_stopped():
self.log("🛑 EPUB converter stopped by user")
return
# Write EPUB
self._write_epub(book, metadata)
# Build PDF TOC exclusively from TOC.txt if it exists;
# entries not in TOC.txt are excluded entirely.
toc_txt_path = os.path.join(self.output_dir, 'TOC.txt')
if os.path.exists(toc_txt_path):
try:
_, _toc_trans, _toc_files = self._load_toc_translations_file(toc_txt_path)
if _toc_trans:
# Build output-filename -> translated title lookup
# TOC.txt stores extensionless names, so strip extensions when matching
_file_to_title = {}
for _idx, _ttitle in _toc_trans.items():
_out = _toc_files.get(_idx, '')
if _out:
_file_to_title[os.path.basename(_out)] = _ttitle
# Rebuild chapter_titles_info keeping only TOC.txt entries
_pdf_titles = {}
for _cn, (_ot, _cf, _fn) in chapter_titles_info.items():
_base = os.path.basename(_fn) if _fn else ''
# Try exact match first, then extensionless match
if _base in _file_to_title:
_pdf_titles[_cn] = (_file_to_title[_base], 1.0, _fn)
else:
# Strip extensions for matching (TOC.txt stores extensionless names)
_base_no_ext = _base
while True:
_name, _ext = os.path.splitext(_base_no_ext)
if _ext and _ext.lower() in ['.html', '.xhtml', '.htm', '.xml']:
_base_no_ext = _name
else:
break
if _base_no_ext in _file_to_title:
_pdf_titles[_cn] = (_file_to_title[_base_no_ext], 1.0, _fn)
if _pdf_titles:
chapter_titles_info = _pdf_titles
self.log(f"📝 PDF TOC will use {len(_pdf_titles)} entries from TOC.txt")
except Exception as _e:
self.log(f"⚠️ Could not load TOC.txt for PDF: {_e}")
# Generate PDF if enabled
if os.environ.get('ENABLE_PDF_OUTPUT', '0') == '1':
if self.is_stopped():
self.log("🛑 PDF generation skipped - stop requested")
else:
try:
self._generate_pdf(html_files, chapter_titles_info, processed_images, cover_file, metadata)
except BaseException as e:
import traceback
self.log(f"⚠️ PDF generation failed: {type(e).__name__}: {e}")
self.log(f"[DEBUG] {traceback.format_exc()}")
# Persist updated metadata (including translated fields/language)
try:
self._save_metadata(metadata)
except Exception as e:
self.log(f"[WARNING] Failed to save updated metadata.json: {e}")
# Show summary
self._show_summary(chapter_titles_info, css_items)
except Exception as e:
self.log(f"❌ EPUB compilation failed: {e}")
raise
def _fix_encoding_issues(self, content: str) -> str:
"""Convert smart quotes and other Unicode punctuation to ASCII."""
# Convert smart quotes to regular quotes and other punctuation
fixes = {
'’': "'", # Right single quotation mark
'‘': "'", # Left single quotation mark
'“': '"', # Left double quotation mark
'”': '"', # Right double quotation mark
'–': '-', # En dash to hyphen
'…': '...', # Ellipsis to three dots
}
for bad, good in fixes.items():
if bad in content:
content = content.replace(bad, good)
#self.log(f"[DEBUG] Replaced {bad!r} with {good!r}")
return content
def _preflight_check(self) -> bool:
"""Pre-flight check before compilation with progressive fallback"""
# Check if we have standard files
if self._has_standard_files():
# Use original strict check
return self._preflight_check_strict()
else:
# Use progressive check for non-standard files
result = self._preflight_check_progressive()
return result is not None
def _has_standard_files(self) -> bool:
"""Check if directory contains standard response_ files"""
if not os.path.exists(self.base_dir):
return False
html_exts = ('.html', '.xhtml', '.htm')
html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)]
response_files = [f for f in html_files if f.startswith('response_')]
return len(response_files) > 0
def _preflight_check_strict(self) -> bool:
"""Original strict pre-flight check - for standard files"""
self.log("\n📋 Pre-flight Check")
self.log("=" * 50)
issues = []
if not os.path.exists(self.base_dir):
issues.append(f"Directory does not exist: {self.base_dir}")
return False
html_files = [f for f in os.listdir(self.base_dir) if f.endswith('.html')]
response_files = [f for f in html_files if f.startswith('response_')]
if not html_files:
issues.append("No HTML files found in directory")
elif not response_files:
issues.append(f"Found {len(html_files)} HTML files but none start with 'response_'")
else:
self.log(f"✅ Found {len(response_files)} chapter files")
if not os.path.exists(self.metadata_path):
self.log("⚠️ No metadata.json found (will use defaults)")
else:
self.log("✅ Found metadata.json")
for subdir in ['css', 'images', 'fonts']:
path = os.path.join(self.base_dir, subdir)
if os.path.exists(path):
count = len(os.listdir(path))
self.log(f"✅ Found {subdir}/ with {count} files")
if issues:
self.log("\n❌ Pre-flight check FAILED:")
for issue in issues:
self.log(f" • {issue}")
return False
self.log("\n✅ Pre-flight check PASSED")
return True
def _preflight_check_progressive(self) -> dict:
"""Progressive pre-flight check for non-standard files"""
self.log("\n📋 Starting Progressive Pre-flight Check")
self.log("=" * 50)
# Critical check - always required
if not os.path.exists(self.base_dir):
self.log(f"❌ CRITICAL: Directory does not exist: {self.base_dir}")
return None
# Phase 1: Try strict mode (response_ files) - already checked in caller
# Phase 2: Try relaxed mode (any HTML files)
self.log("\n[Phase 2] Checking for any HTML files...")
html_exts = ('.html', '.xhtml', '.htm')
html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)]
if html_files:
self.log(f"✅ Found {len(html_files)} HTML files:")
# Show first 5 files as examples
for i, f in enumerate(html_files[:5]):
self.log(f" • {f}")
if len(html_files) > 5:
self.log(f" ... and {len(html_files) - 5} more")
self._check_optional_resources()
self.log("\n⚠️ Pre-flight check PASSED with warnings (relaxed mode)")
return {'success': True, 'mode': 'relaxed'}
# Phase 3: No HTML files at all
self.log("❌ No HTML files found in directory")
self.log("\n[Phase 3] Checking directory contents...")
all_files = os.listdir(self.base_dir)
self.log(f"📁 Directory contains {len(all_files)} total files")
# Look for any potential content
potential_content = [f for f in all_files if not f.startswith('.')]
if potential_content:
self.log("⚠️ Found non-HTML files:")
for i, f in enumerate(potential_content[:5]):
self.log(f" • {f}")
if len(potential_content) > 5:
self.log(f" ... and {len(potential_content) - 5} more")
self.log("\n⚠️ BYPASSING standard checks - compilation may fail!")
return {'success': True, 'mode': 'bypass'}
self.log("\n❌ Directory appears to be empty")
return None
def _check_optional_resources(self):
"""Check for optional resources (metadata, CSS, images, fonts)"""
self.log("\n📁 Checking optional resources:")
if os.path.exists(self.metadata_path):
self.log("✅ Found metadata.json")
else:
self.log("⚠️ No metadata.json found (will use defaults)")
resources_found = False
for subdir in ['css', 'images', 'fonts']:
path = os.path.join(self.base_dir, subdir)
if os.path.exists(path):
items = os.listdir(path)
if items:
self.log(f"✅ Found {subdir}/ with {len(items)} files")
resources_found = True
else:
self.log(f"📁 Found {subdir}/ (empty)")
if not resources_found:
self.log("⚠️ No resource directories found (CSS/images/fonts)")
def _detect_primary_language_from_html(self, html_files: List[str]) -> Optional[str]:
"""Determine language for EPUB/output based on GUI target language.
Priority:
1) OUTPUT_LANGUAGE env var set by TranslatorGUI ("English", "Spanish", ...)
2) metadata["language"] if already present (2-letter/ISO-ish code)
3) Fallback: "en".
Returns a BCP‑47 style language code like "en", "es", "ru", "tr", "ko", ...
"""
# 1) First, try OUTPUT_LANGUAGE name from GUI (English, Spanish, ...)
output_lang_name = os.getenv('OUTPUT_LANGUAGE', '').strip().lower()
gui_name_to_code = {
# Matches translator_gui.py target_lang_combo entries
'english': 'en',
'spanish': 'es',
'french': 'fr',
'german': 'de',
'italian': 'it',
'portuguese': 'pt',
'russian': 'ru',
'arabic': 'ar',
'hindi': 'hi',
'chinese (simplified)': 'zh-CN',
'chinese (traditional)': 'zh-TW',
'chinese': 'zh-CN',
'japanese': 'ja',
'korean': 'ko',
'turkish': 'tr',
}
if output_lang_name:
code = gui_name_to_code.get(output_lang_name)
if code:
self.log(f"[DEBUG] Using OUTPUT_LANGUAGE from GUI for EPUB language: '{output_lang_name}' -> '{code}'")
return code
else:
# If user typed a raw code like "es" or "pt-BR", just pass it through
self.log(f"[DEBUG] OUTPUT_LANGUAGE not in predefined map, using raw value: '{output_lang_name}'")
return output_lang_name
# 2) Nothing from GUI – leave decision to caller by returning None
return None
def _analyze_chapters(self) -> Dict[int, Tuple[str, float, str]]:
"""Analyze chapter files and extract titles using parallel processing"""
self.log("\n📖 Extracting translated titles from chapter files...")
chapter_info = {}
sorted_files = self._find_html_files()
if not sorted_files:
self.log("⚠️ No translated chapter files found!")
return chapter_info
self.log(f"📖 Analyzing {len(sorted_files)} translated chapter files for titles...")
self.log(f"🔧 Using {self.max_workers} parallel workers")
def analyze_single_file(idx_filename):
"""Worker function to analyze a single file"""
idx, filename = idx_filename
file_path = os.path.join(self.output_dir, filename)
try:
# Read and process file
with open(file_path, 'r', encoding='utf-8') as f:
raw_html_content = f.read()
# Decode HTML entities
import html
html_content = html.unescape(raw_html_content)
html_content = self._fix_encoding_issues(html_content)
html_content = HTMLEntityDecoder.decode(html_content)
# Extract title
allow_p_fallback = os.getenv('USE_P_TAG_TOC_FALLBACK', '0') == '1'
title, confidence = TitleExtractor.extract_from_html(
html_content, idx, filename,
allow_paragraph_fallback=allow_p_fallback,
allow_generic_chapter_fallback=allow_p_fallback
)
return idx, (title, confidence, filename)
except Exception as e:
return idx, (f"Chapter {idx}", 0.0, filename), str(e)
# Process files in parallel using environment variable worker count
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
futures = {
executor.submit(analyze_single_file, (idx, filename)): idx
for idx, filename in enumerate(sorted_files)
}
# Collect results as they complete
completed = 0
for future in as_completed(futures):
# Check stop flag
if self.is_stopped():
self.log("🛑 Chapter analysis stopped by user")
break
try:
result = future.result()
completed += 1
if len(result) == 2: # Success
idx, info = result
chapter_info[idx] = info
# Log progress - only show issues (low confidence) unless debug mode is on
title, confidence, filename = info
debug_mode_enabled = os.environ.get('DEBUG_MODE', '0') == '1'
# Always log low confidence (issues) or errors
if confidence <= 0.4:
indicator = "🔴"
self.log(f" [{completed}/{len(sorted_files)}] {indicator} Chapter {idx}: '{title}' (confidence: {confidence:.2f})")
elif debug_mode_enabled:
# In debug mode, log all chapters
indicator = "✅" if confidence > 0.7 else "🟡"
self.log(f" [{completed}/{len(sorted_files)}] {indicator} Chapter {idx}: '{title}' (confidence: {confidence:.2f})")
else: # Error
idx, info, error = result
chapter_info[idx] = info
# Always log errors
self.log(f"❌ [{completed}/{len(sorted_files)}] Error processing chapter {idx}: {error}")
except Exception as e:
idx = futures[future]
self.log(f"❌ Failed to process chapter {idx}: {e}")
chapter_info[idx] = (f"Chapter {idx}", 0.0, sorted_files[idx])
return chapter_info
def _process_chapters(self, book: epub.EpubBook, html_files: List[str],
chapter_titles_info: Dict[int, Tuple[str, float, str]],
css_items: List[epub.EpubItem], processed_images: Dict[str, str],
spine: List, toc: List, metadata: dict) -> int:
"""Process chapters using parallel processing with AGGRESSIVE DEBUGGING"""
chapters_added = 0
self.log(f"\n{'='*80}")
self.log(f"📚 STARTING CHAPTER PROCESSING")
self.log(f"📚 Total files to process: {len(html_files)}")
self.log(f"🔧 Using {self.max_workers} parallel workers")
self.log(f"📂 Output directory: {self.output_dir}")
self.log(f"{'='*80}")
# Debug chapter titles info
self.log(f"\n[DEBUG] Chapter titles info has {len(chapter_titles_info)} entries")
for num in list(chapter_titles_info.keys())[:5]:
title, conf, method = chapter_titles_info[num]
self.log(f" Chapter {num}: {title[:50]}... (conf: {conf}, method: {method})")
# Prepare chapter data
chapter_data = []
for idx, filename in enumerate(html_files):
chapter_num = idx
if chapter_num not in chapter_titles_info and (chapter_num + 1) in chapter_titles_info:
chapter_num = idx + 1
chapter_data.append((chapter_num, filename))
# Debug specific problem chapters
if 49 <= chapter_num <= 56:
self.log(f"[DEBUG] Problem chapter found: {chapter_num} -> {filename}")
def process_chapter_content(data):
"""Worker function to process chapter content with FULL DEBUGGING"""
chapter_num, filename = data
path = os.path.join(self.output_dir, filename)
# Debug tracking for problem chapters
is_problem_chapter = 49 <= chapter_num <= 56
try:
if is_problem_chapter:
self.log(f"\n[DEBUG] {'*'*60}")
self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {chapter_num}: {filename}")
self.log(f"[DEBUG] Full path: {path}")
# Check file exists
if not os.path.exists(path):
error_msg = f"File does not exist: {path}"
self.log(f"[ERROR] {error_msg}")
raise FileNotFoundError(error_msg)
# Get file size
file_size = os.path.getsize(path)
if is_problem_chapter:
self.log(f"[DEBUG] File size: {file_size} bytes")
# Skip truly empty (0-byte) junk files entirely (do not generate error placeholders)
if file_size == 0:
title = chapter_titles_info.get(chapter_num, (f"Chapter {chapter_num}", 0, ""))[0]
return {
'num': chapter_num,
'filename': filename,
'title': title,
'error': f"Skipped 0-byte file: {filename}",
'success': False,
'skipped': True,
'skip_reason': 'zero_byte_file'
}
# Read and decode
raw_content = self._read_and_decode_html_file(path)
if is_problem_chapter:
self.log(f"[DEBUG] Raw content length after reading: {len(raw_content) if raw_content else 'NULL'}")
if raw_content:
self.log(f"[DEBUG] First 200 chars: {raw_content[:200]}")
# Fix encoding
raw_content = self._fix_encoding_issues(raw_content)
if is_problem_chapter:
self.log(f"[DEBUG] Content length after encoding fix: {len(raw_content) if raw_content else 'NULL'}")
if not raw_content or not raw_content.strip():
error_msg = f"Empty content after reading/decoding: {filename}"
if is_problem_chapter:
self.log(f"[ERROR] {error_msg}")
raise ValueError(error_msg)
# Extract main content
if not filename.startswith('response_'):
before_len = len(raw_content)
raw_content = self._extract_main_content(raw_content, filename)
if is_problem_chapter:
self.log(f"[DEBUG] Content extraction: {before_len} -> {len(raw_content)} chars")
# Get title
title = self._get_chapter_title(chapter_num, filename, raw_content, chapter_titles_info)
if is_problem_chapter:
self.log(f"[DEBUG] Chapter title: {title}")
# Prepare CSS links
css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items]
if is_problem_chapter:
self.log(f"[DEBUG] CSS links: {css_links}")
# XHTML conversion - THE CRITICAL PART
if is_problem_chapter:
self.log(f"[DEBUG] Starting XHTML conversion...")
xhtml_content = XHTMLConverter.ensure_compliance(raw_content, title, css_links)
if is_problem_chapter:
self.log(f"[DEBUG] XHTML content length: {len(xhtml_content) if xhtml_content else 'NULL'}")
if xhtml_content:
self.log(f"[DEBUG] XHTML first 300 chars: {xhtml_content[:300]}")
# Process images
xhtml_content = self._process_chapter_images(xhtml_content, processed_images)
# Validate
if is_problem_chapter:
self.log(f"[DEBUG] Starting validation...")
final_content = XHTMLConverter.validate(xhtml_content)
if is_problem_chapter:
self.log(f"[DEBUG] Final content length: {len(final_content)}")
# Final XML validation
try:
ET.fromstring(final_content.encode('utf-8'))
if is_problem_chapter:
self.log(f"[DEBUG] XML validation PASSED")
except ET.ParseError as e:
if is_problem_chapter:
self.log(f"[ERROR] XML validation FAILED: {e}")
# Show the exact error location
lines = final_content.split('\n')
import re
match = re.search(r'line (\d+), column (\d+)', str(e))
if match:
line_num = int(match.group(1))
if line_num <= len(lines):
self.log(f"[ERROR] Problem line {line_num}: {lines[line_num-1][:100]}")
# Create fallback
final_content = XHTMLConverter._build_fallback_xhtml(title)
if is_problem_chapter:
self.log(f"[DEBUG] Using fallback XHTML")
if is_problem_chapter:
self.log(f"[DEBUG] Chapter processing SUCCESSFUL")
self.log(f"[DEBUG] {'*'*60}\n")
return {
'num': chapter_num,
'filename': filename,
'title': title,
'content': final_content,
'success': True
}
except Exception as e:
import traceback
tb = traceback.format_exc()
if is_problem_chapter:
self.log(f"[ERROR] {'!'*60}")
self.log(f"[ERROR] CHAPTER {chapter_num} PROCESSING FAILED")
self.log(f"[ERROR] Exception type: {type(e).__name__}")
self.log(f"[ERROR] Exception: {e}")
self.log(f"[ERROR] Full traceback:\n{tb}")
self.log(f"[ERROR] {'!'*60}\n")
return {
'num': chapter_num,
'filename': filename,
'title': chapter_titles_info.get(chapter_num, (f"Chapter {chapter_num}", 0, ""))[0],
'error': str(e),
'traceback': tb,
'success': False
}
# Process in parallel
processed_chapters = []
completed = 0
total_chapters = len(chapter_data)
# Use reduced logging for large EPUBs
use_reduced_logging = total_chapters > 50
log_interval = max(1, total_chapters // 20) if use_reduced_logging else 1
self.log(f"\n[DEBUG] Starting parallel processing...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(process_chapter_content, data): data[0]
for data in chapter_data
}
for future in as_completed(futures):
# Check stop flag
if self.is_stopped():
self.log("🛑 Chapter processing stopped by user")
break
try:
result = future.result()
if result:
processed_chapters.append(result)
completed += 1
# Always log failures
if not result['success']:
self.log(f" [{completed}/{total_chapters}] ❌ Failed: {result['filename']} - {result['error']}")
# For successes: log at intervals for large EPUBs, or always for small ones
elif not use_reduced_logging or completed == 1 or completed % log_interval == 0 or completed == total_chapters:
current_percent = (completed * 100) // total_chapters
self.log(f" [{completed}/{total_chapters}] ({current_percent}%) ✅")
except Exception as e:
completed += 1
chapter_num = futures[future]
self.log(f" [{completed}/{total_chapters}] ❌ Exception processing chapter {chapter_num}: {e}")
import traceback
self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}")
# Sort by chapter number to maintain order
processed_chapters.sort(key=lambda x: x['num'])
# Debug what we have
self.log(f"\n[DEBUG] Processed {len(processed_chapters)} chapters")
skipped_chapters = [c for c in processed_chapters if (not c.get('success')) and c.get('skipped')]
failed_chapters = [c for c in processed_chapters if (not c.get('success')) and (not c.get('skipped'))]
if skipped_chapters:
self.log(f"[INFO] {len(skipped_chapters)} chapters skipped (0-byte junk files):")
for sc in skipped_chapters:
self.log(f" - Chapter {sc['num']}: {sc['filename']} - {sc.get('skip_reason', 'skipped')}")
if failed_chapters:
self.log(f"[WARNING] {len(failed_chapters)} chapters failed:")
for fc in failed_chapters:
self.log(f" - Chapter {fc['num']}: {fc['filename']} - {fc.get('error', 'Unknown error')}")
# Add chapters to book in order (this must be sequential)
self.log("\n📦 Adding chapters to EPUB structure...")
# Skip 0-byte junk chapters entirely; only add real chapters + placeholders for true failures
chapters_to_add = [c for c in processed_chapters if not c.get('skipped')]
# Use reduced logging for large EPUBs
total_to_add = len(chapters_to_add)
use_reduced_logging = total_to_add > 50
log_interval = max(1, total_to_add // 20) if use_reduced_logging else 1
for idx, chapter_data in enumerate(chapters_to_add, 1):
if chapter_data['success']:
try:
# Create EPUB chapter
import html
text_dirname = "Text" if getattr(self, 'legacy_epub_structure', False) else ""
# Restore original OPF filename (strips response_ prefix, restores source extension)
opf_map = getattr(self, '_opf_filename_map', {})
chapter_file_name = self._restore_opf_filename(chapter_data['filename'], opf_map)
if text_dirname:
chapter_file_name = f"{text_dirname}/{chapter_file_name}"
chapter = epub.EpubHtml(
title=html.unescape(chapter_data['title']),
file_name=chapter_file_name,
lang=metadata.get("language", "en")
)
chapter.content = FileUtils.ensure_bytes(chapter_data['content'])
if self.attach_css_to_chapters:
for css_item in css_items:
chapter.add_item(css_item)
# Add to book
book.add_item(chapter)
spine.append(chapter)
# Include auxiliary files in spine but omit from TOC
base_name = os.path.basename(chapter_data['filename'])
if hasattr(self, 'auxiliary_html_files') and base_name in self.auxiliary_html_files:
self.log(f" 🛈 Added auxiliary page to spine (not in TOC): {base_name}")
else:
title_lower = str(chapter_data.get('title', '')).strip().lower()
if title_lower in ('untitled chapter', 'untitled'):
self.log(f" 🛈 Skipped TOC entry for untitled chapter: {base_name}")
else:
toc.append(chapter)
chapters_added += 1
# Log auxiliary files always, or at intervals for regular chapters
if base_name in getattr(self, 'auxiliary_html_files', set()):
self.log(f" ✅ Added auxiliary page (spine only): '{base_name}'")
elif not use_reduced_logging or idx == 1 or idx % log_interval == 0 or idx == total_to_add:
current_percent = (idx * 100) // total_to_add
self.log(f" [{idx}/{total_to_add}] ({current_percent}%) ✅")
except Exception as e:
self.log(f" ❌ Failed to add chapter {chapter_data['num']} to book: {e}")
import traceback
self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}")
# Add error placeholder
self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata)
chapters_added += 1
else:
# Only add placeholders for real processing failures.
# 0-byte junk files are skipped earlier and not present in chapters_to_add.
self.log(f" ⚠️ Adding error placeholder for chapter {chapter_data['num']}")
self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata)
chapters_added += 1
self.log(f"\n{'='*80}")
self.log(f"✅ CHAPTER PROCESSING COMPLETE")
self.log(f"✅ Added {chapters_added} chapters to EPUB")
self.log(f"{'='*80}\n")
return chapters_added
def _add_error_chapter_from_data(self, book, chapter_data, spine, toc, metadata):
"""Helper to add an error placeholder chapter"""
try:
title = chapter_data.get('title', f"Chapter {chapter_data['num']}")
text_dirname = "Text" if getattr(self, 'legacy_epub_structure', False) else ""
err_file = f"chapter_{chapter_data['num']:03d}.xhtml"
if text_dirname:
err_file = f"{text_dirname}/{err_file}"
chapter = epub.EpubHtml(
title=title,
file_name=err_file,
lang=metadata.get("language", "en")
)
lang = metadata.get("language", "en")
error_content = f"""
{ContentProcessor.safe_escape(title)}
{ContentProcessor.safe_escape(title)}
Error loading chapter content.
File: {chapter_data.get('filename', 'unknown')}
Error: {chapter_data.get('error', 'unknown error')}
"""
chapter.content = error_content.encode('utf-8')
book.add_item(chapter)
spine.append(chapter)
title_lower = str(title).strip().lower()
if title_lower in ('untitled chapter', 'untitled'):
self.log(f" 🛈 Skipped TOC entry for untitled error chapter: {err_file}")
else:
toc.append(chapter)
except Exception as e:
self.log(f" ❌ Failed to add error placeholder: {e}")
def _get_chapter_order_from_opf(self) -> Dict[str, int]:
"""Get chapter order from content.opf or source EPUB
Returns dict mapping original_filename -> chapter_number
"""
# First, try to find content.opf in the current directory
opf_path = os.path.join(self.output_dir, "content.opf")
if os.path.exists(opf_path):
self.log("✅ Found content.opf - using for chapter ordering")
return self._parse_opf_file(opf_path)
# If not found, try to extract from source EPUB
source_epub = os.getenv('EPUB_PATH')
if source_epub and os.path.exists(source_epub):
self.log(f"📚 Extracting chapter order from source EPUB: {source_epub}")
return self._extract_order_from_epub(source_epub)
# Fallback to translation_progress.json if available
progress_file = os.path.join(self.output_dir, "translation_progress.json")
if os.path.exists(progress_file):
self.log("📄 Using translation_progress.json for chapter order")
return self._get_order_from_progress_file(progress_file)
return None
def _parse_opf_file(self, opf_path: str) -> Dict[str, int]:
"""Parse content.opf to get chapter order from spine
Returns dict mapping original_filename -> chapter_number
"""
try:
tree = ET.parse(opf_path)
root = tree.getroot()
# Handle namespaces
ns = {'opf': 'http://www.idpf.org/2007/opf'}
if root.tag.startswith('{'):
# Extract default namespace
default_ns = root.tag[1:root.tag.index('}')]
ns = {'opf': default_ns}
# Get manifest to map IDs to files
manifest = {}
for item in root.findall('.//opf:manifest/opf:item', ns):
item_id = item.get('id')
href = item.get('href')
media_type = item.get('media-type', '')
# Only include HTML/XHTML files
if item_id and href and ('html' in media_type.lower() or href.endswith(('.html', '.xhtml', '.htm'))):
# Get just the filename without path
filename = os.path.basename(href)
manifest[item_id] = filename
# Get spine order
filename_to_order = {}
chapter_num = 0 # Start from 0 for array indexing
spine = root.find('.//opf:spine', ns)
if spine is not None:
# Build dynamic skip list based on TRANSLATE_SPECIAL_FILES toggle
translate_special = os.environ.get('TRANSLATE_SPECIAL_FILES', '0') == '1'
# Backward compatibility: also check old TRANSLATE_COVER_HTML
translate_special = translate_special or (os.environ.get('TRANSLATE_COVER_HTML', '0') == '1')
if translate_special:
# When override is enabled, include ALL files in chapter ordering
skip_list = []
self.log(" 📝 Special files mode ENABLED - including all files in TOC")
else:
# Default behavior: skip navigation/metadata files
skip_list = ['nav', 'toc', 'contents', 'cover']
self.log(" 📝 Special files mode DISABLED - excluding navigation files")
# Count total items first to decide on logging
itemrefs = spine.findall('opf:itemref', ns)
total_items = len(itemrefs)
use_reduced_logging = total_items > 50
log_interval = max(1, total_items // 20) if use_reduced_logging else 1
for idx, itemref in enumerate(itemrefs):
idref = itemref.get('idref')
if idref and idref in manifest:
filename = manifest[idref]
# CRITICAL: Files with numbers are always regular chapters, regardless of keywords!
name_without_ext = os.path.splitext(filename)[0].lower()
has_numbers = bool(re.search(r'\\d', name_without_ext))
# If file has numbers, it's a chapter - include it
if has_numbers:
filename_to_order[filename] = chapter_num
# Only log periodically for large EPUBs
if not use_reduced_logging or idx % log_interval == 0 or idx == 0 or idx == total_items - 1:
if use_reduced_logging:
percent = (idx * 100) // total_items
self.log(f" [{idx}/{total_items}] ({percent}%) ✅")
else:
self.log(f" Chapter {chapter_num}: {filename} (numbered)")
chapter_num += 1
# Otherwise, check skip list for special files
elif not skip_list or not any(skip in filename.lower() for skip in skip_list):
filename_to_order[filename] = chapter_num
# Only log periodically for large EPUBs
if not use_reduced_logging or idx % log_interval == 0 or idx == 0 or idx == total_items - 1:
if use_reduced_logging:
percent = (idx * 100) // total_items
self.log(f" [{idx}/{total_items}] ({percent}%) ✅")
else:
self.log(f" Chapter {chapter_num}: {filename}")
chapter_num += 1
else:
# Always log skipped files (these are rare)
self.log(f" Skipping special file (no numbers): {filename}")
return filename_to_order
except Exception as e:
self.log(f"⚠️ Error parsing content.opf: {e}")
import traceback
self.log(traceback.format_exc())
return None
def _extract_order_from_epub(self, epub_path: str) -> List[Tuple[int, str]]:
"""Extract chapter order from source EPUB file"""
try:
import zipfile
with zipfile.ZipFile(epub_path, 'r') as zf:
# Find content.opf (might be in different locations)
opf_file = None
for name in zf.namelist():
if name.endswith('content.opf'):
opf_file = name
break
if not opf_file:
# Try META-INF/container.xml to find content.opf
try:
container = zf.read('META-INF/container.xml')
# Parse container.xml to find content.opf location
container_tree = ET.fromstring(container)
rootfile = container_tree.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile')
if rootfile is not None:
opf_file = rootfile.get('full-path')
except:
pass
if opf_file:
opf_content = zf.read(opf_file)
# Save temporarily and parse
temp_opf = os.path.join(self.output_dir, "temp_content.opf")
with open(temp_opf, 'wb') as f:
f.write(opf_content)
result = self._parse_opf_file(temp_opf)
# Clean up temp file
if os.path.exists(temp_opf):
os.remove(temp_opf)
return result
except Exception as e:
self.log(f"⚠️ Error extracting from EPUB: {e}")
return None
def _find_html_files(self) -> List[str]:
"""Find HTML files using OPF-based ordering when available"""
self.log(f"\n[DEBUG] Scanning directory: {self.output_dir}")
# Get all HTML files in directory
all_files = os.listdir(self.output_dir)
html_extensions = ('.html', '.htm', '.xhtml')
html_files = [f for f in all_files if f.lower().endswith(html_extensions)]
if not html_files:
self.log("[ERROR] No HTML files found!")
return []
# Try to get authoritative order from OPF/EPUB
opf_order = self._get_chapter_order_from_opf()
if opf_order:
self.log("✅ Using authoritative chapter order from OPF/EPUB")
# Check if debug mode is enabled
debug_mode_enabled = os.environ.get('DEBUG_MODE', '0') == '1'
if debug_mode_enabled:
self.log(f"[DEBUG] OPF entries (first 5): {list(opf_order.items())[:5]}")
# Create mapping based on core filename (strip response_ and strip ALL extensions)
ordered_files = []
unmapped_files = []
def strip_all_ext(name: str) -> str:
# Remove all trailing known extensions
core = name
while True:
parts = core.rsplit('.', 1)
if len(parts) == 2 and parts[1].lower() in ['html', 'htm', 'xhtml', 'xml']:
core = parts[0]
else:
break
return core
for output_file in html_files:
core_name = output_file[9:] if output_file.startswith('response_') else output_file
core_name = strip_all_ext(core_name)
matched = False
for opf_name, chapter_order in opf_order.items():
opf_file = opf_name.split('/')[-1]
opf_core = strip_all_ext(opf_file)
if core_name == opf_core:
ordered_files.append((chapter_order, output_file))
if debug_mode_enabled:
self.log(f" Mapped: {output_file} -> {opf_name} (order: {chapter_order})")
matched = True
break
if not matched:
unmapped_files.append(output_file)
# Always log unmapped files (these are warnings)
self.log(f" ⚠️ Could not map: {output_file} (core: {core_name})")
if ordered_files:
# Sort by chapter order and extract just the filenames
ordered_files.sort(key=lambda x: x[0])
final_order = [f for _, f in ordered_files]
# Append any unmapped files at the end
if unmapped_files:
self.log(f"⚠️ Adding {len(unmapped_files)} unmapped files at the end")
final_order.extend(sorted(unmapped_files))
# Mark non-response unmapped files as auxiliary (omit from TOC)
aux = {f for f in unmapped_files if not f.startswith('response_')}
# If special files override is enabled, do NOT treat special files as auxiliary
translate_special = os.environ.get('TRANSLATE_SPECIAL_FILES', '0') == '1'
# Backward compatibility
translate_special = translate_special or (os.environ.get('TRANSLATE_COVER_HTML', '0') == '1')
if translate_special:
# Don't exclude any special files when override is enabled
aux = set()
self.auxiliary_html_files = aux
else:
self.auxiliary_html_files = set()
self.log(f"✅ Successfully ordered {len(final_order)} chapters using OPF")
return final_order
else:
self.log("⚠️ Could not map any files using OPF order, falling back to pattern matching")
# Fallback to original pattern matching logic
self.log("⚠️ No OPF/EPUB found or mapping failed, using filename pattern matching")
# First, try to find response_ files
response_files = [f for f in html_files if f.startswith('response_')]
if response_files:
# Sort response_ files as primary chapters
main_files = list(response_files)
self.log(f"[DEBUG] Found {len(response_files)} response_ files")
# Check if files have -h- pattern
if any('-h-' in f for f in response_files):
# Use special sorting for -h- pattern
def extract_h_number(filename):
match = re.search(r'-h-(\d+)', filename)
if match:
return int(match.group(1))
return 999999
main_files.sort(key=extract_h_number)
else:
# Use numeric sorting for standard response_ files
def extract_number(filename):
match = re.match(r'response_(\d+)_', filename)
if match:
return int(match.group(1))
return 0
main_files.sort(key=extract_number)
# Append non-response files as auxiliary pages (not in TOC)
aux_files = sorted([f for f in html_files if not f.startswith('response_')])
if aux_files:
aux_set = set(aux_files)
# If special files override is enabled, don't mark special files as auxiliary
translate_special = os.environ.get('TRANSLATE_SPECIAL_FILES', '0') == '1'
# Backward compatibility
translate_special = translate_special or (os.environ.get('TRANSLATE_COVER_HTML', '0') == '1')
if translate_special:
# Don't exclude any files when override is enabled
aux_set = set()
self.auxiliary_html_files = aux_set
self.log(f"[DEBUG] Appending {len(aux_set)} auxiliary HTML file(s) (not in TOC): {list(aux_set)[:5]}")
else:
self.auxiliary_html_files = set()
return main_files + aux_files
else:
# Progressive sorting for non-standard files
html_files.sort(key=self.get_robust_sort_key)
# No response_ files -> treat none as auxiliary
self.auxiliary_html_files = set()
return html_files
def _read_and_decode_html_file(self, file_path: str) -> str:
"""Read HTML file and decode entities, preserving < and > as text.
This prevents narrative angle-bracket text from becoming bogus tags."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return content
import re
import html
# Placeholders for angle bracket entities
LT_PLACEHOLDER = "\ue000"
GT_PLACEHOLDER = "\ue001"
# Patterns for common representations of < and >
_lt_entity_patterns = [r'<', r'<', r'*60;', r'*3[cC];']
_gt_entity_patterns = [r'>', r'>', r'*62;', r'*3[eE];']
def protect_angle_entities(s: str) -> str:
# Replace all forms of < and > with placeholders so unescape won't turn them into real < >
for pat in _lt_entity_patterns:
s = re.sub(pat, LT_PLACEHOLDER, s)
for pat in _gt_entity_patterns:
s = re.sub(pat, GT_PLACEHOLDER, s)
return s
max_iterations = 5
for _ in range(max_iterations):
prev_content = content
# Protect before each pass in case of double-encoded entities
content = protect_angle_entities(content)
# html.unescape handles all standard HTML entities (except our placeholders)
content = html.unescape(content)
if content == prev_content:
break
# Restore placeholders back to entities so they remain literal text in XHTML
content = content.replace(LT_PLACEHOLDER, '<').replace(GT_PLACEHOLDER, '>')
return content
def _process_single_chapter(self, book: epub.EpubBook, num: int, filename: str,
chapter_titles_info: Dict[int, Tuple[str, float, str]],
css_items: List[epub.EpubItem], processed_images: Dict[str, str],
spine: List, toc: List, metadata: dict) -> bool:
"""Process a single chapter with COMPREHENSIVE debugging"""
path = os.path.join(self.output_dir, filename)
# Flag for extra debugging on problem chapters
is_problem_chapter = 49 <= num <= 56
is_response_file = filename.startswith('response_')
try:
if is_problem_chapter:
self.log(f"\n{'='*70}")
self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {num}")
self.log(f"[DEBUG] Filename: {filename}")
self.log(f"[DEBUG] Is response file: {is_response_file}")
self.log(f"[DEBUG] Full path: {path}")
# Check file exists and size
if not os.path.exists(path):
self.log(f"[ERROR] File does not exist: {path}")
return False
file_size = os.path.getsize(path)
if is_problem_chapter:
self.log(f"[DEBUG] File size: {file_size} bytes")
if file_size == 0:
self.log(f"[ERROR] File is empty (0 bytes): {filename}")
return False
# Read and decode
if is_problem_chapter:
self.log(f"[DEBUG] Reading and decoding file...")
raw_content = self._read_and_decode_html_file(path)
if is_problem_chapter:
self.log(f"[DEBUG] Raw content length: {len(raw_content) if raw_content else 'NULL'}")
if raw_content:
# Show first and last parts
self.log(f"[DEBUG] First 300 chars of raw content:")
self.log(f" {raw_content[:300]!r}")
self.log(f"[DEBUG] Last 300 chars of raw content:")
self.log(f" {raw_content[-300:]!r}")
# Check for common issues
if '<' in raw_content[:500]:
self.log(f"[DEBUG] Found < entities in content")
if '>' in raw_content[:500]:
self.log(f"[DEBUG] Found > entities in content")
if ' {after_fix} chars")
if before_fix != after_fix:
self.log(f"[DEBUG] Content changed during encoding fix")
if not raw_content or not raw_content.strip():
self.log(f"[WARNING] Chapter {num} is empty after decoding/encoding fix")
if is_problem_chapter:
self.log(f"[ERROR] Problem chapter {num} has no content!")
return False
# Extract main content if needed
if not filename.startswith('response_'):
if is_problem_chapter:
self.log(f"[DEBUG] Extracting main content (not a response file)...")
before_extract = len(raw_content)
raw_content = self._extract_main_content(raw_content, filename)
after_extract = len(raw_content)
if is_problem_chapter:
self.log(f"[DEBUG] Content extraction: {before_extract} -> {after_extract} chars")
if after_extract < before_extract / 2:
self.log(f"[WARNING] Lost more than 50% of content during extraction!")
self.log(f"[DEBUG] Content after extraction (first 300 chars):")
self.log(f" {raw_content[:300]!r}")
else:
if is_problem_chapter:
self.log(f"[DEBUG] Skipping content extraction for response file")
self.log(f"[DEBUG] Response file content structure:")
# Check what's in a response file
if '' in raw_content:
self.log(f" Has tag")
if '' in raw_content:
self.log(f" Has tag")
if ' str:
"""Get chapter title with fallbacks - uses position-based numbering"""
title = None
confidence = 0.0
# Primary source: pre-analyzed title using position-based number
if num in chapter_titles_info:
title, confidence, stored_filename = chapter_titles_info[num]
# Re-extract if low confidence or missing
if not title or confidence < 0.5:
allow_p_fallback = os.getenv('USE_P_TAG_TOC_FALLBACK', '0') == '1'
backup_title, backup_confidence = TitleExtractor.extract_from_html(
content, num, filename,
allow_paragraph_fallback=allow_p_fallback,
allow_generic_chapter_fallback=allow_p_fallback
)
if backup_confidence > confidence:
title = backup_title
confidence = backup_confidence
# Clean and validate
if title:
title = TitleExtractor.clean_title(title)
if not TitleExtractor.is_valid_title(title):
title = None
# Fallback for non-standard files
if not title and not filename.startswith('response_'):
# Try enhanced extraction methods for web-scraped content
title = self._fallback_title_extraction(content, filename, num)
# Final fallback - use position-based chapter number only if toggle allows it
if not title:
if os.getenv('USE_P_TAG_TOC_FALLBACK', '0') == '1':
title = f"Chapter {num}"
else:
# Avoid generic Chapter N titles; use filename stem if available
base = os.path.splitext(os.path.basename(filename))[0] if filename else ""
title = base or "Untitled Chapter"
return title
def get_robust_sort_key(self, filename):
"""Extract chapter/sequence number using multiple patterns"""
# Pattern 1: -h-NUMBER (your current pattern)
match = re.search(r'-h-(\d+)', filename)
if match:
return (1, int(match.group(1)))
# Pattern 2: chapter-NUMBER or chapter_NUMBER or chapterNUMBER
match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (2, int(match.group(1)))
# Pattern 3: ch-NUMBER or ch_NUMBER or chNUMBER
match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE)
if match:
return (3, int(match.group(1)))
# Pattern 4: response_NUMBER_ (if response_ prefix exists)
if filename.startswith('response_'):
match = re.match(r'response_(\d+)[-_]', filename)
if match:
return (4, int(match.group(1)))
# Pattern 5: book_NUMBER, story_NUMBER, part_NUMBER, section_NUMBER
match = re.search(r'(?:book|story|part|section)[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (5, int(match.group(1)))
# Pattern 6: split_NUMBER (Calibre pattern)
match = re.search(r'split_(\d+)', filename)
if match:
return (6, int(match.group(1)))
# Pattern 7: Just NUMBER.html (like 1.html, 2.html)
match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (7, int(match.group(1)))
# Pattern 8: -NUMBER at end before extension
match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (8, int(match.group(1)))
# Pattern 9: _NUMBER at end before extension
match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (9, int(match.group(1)))
# Pattern 10: (NUMBER) in parentheses anywhere
match = re.search(r'\((\d+)\)', filename)
if match:
return (10, int(match.group(1)))
# Pattern 11: [NUMBER] in brackets anywhere
match = re.search(r'\[(\d+)\]', filename)
if match:
return (11, int(match.group(1)))
# Pattern 12: page-NUMBER or p-NUMBER or pg-NUMBER
match = re.search(r'(?:page|pg?)[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (12, int(match.group(1)))
# Pattern 13: Any file ending with NUMBER before extension
match = re.search(r'(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (13, int(match.group(1)))
# Pattern 14: Roman numerals (I, II, III, IV, etc.)
roman_pattern = r'\b(M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3}))\b'
match = re.search(roman_pattern, filename)
if match:
roman = match.group(1)
# Convert roman to number
roman_dict = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}
val = 0
for i in range(len(roman)):
if i > 0 and roman_dict[roman[i]] > roman_dict[roman[i-1]]:
val += roman_dict[roman[i]] - 2 * roman_dict[roman[i-1]]
else:
val += roman_dict[roman[i]]
return (14, val)
# Pattern 15: First significant number found
numbers = re.findall(r'\d+', filename)
if numbers:
# Skip common year numbers (1900-2099) unless it's the only number
significant_numbers = [int(n) for n in numbers if not (1900 <= int(n) <= 2099)]
if significant_numbers:
return (15, significant_numbers[0])
elif numbers:
return (15, int(numbers[0]))
# Final fallback: alphabetical
return (99, filename)
def _extract_chapter_number(self, filename: str, default_idx: int) -> int:
"""Extract chapter number using multiple patterns"""
# FIXED: Pattern 1 - Check -h-NUMBER FIRST (YOUR FILES USE THIS!)
match = re.search(r'-h-(\d+)', filename)
if match:
return int(match.group(1))
# Pattern 2: response_NUMBER_ (standard pattern)
match = re.match(r"response_(\d+)_", filename)
if match:
return int(match.group(1))
# Pattern 3: chapter-NUMBER, chapter_NUMBER, chapterNUMBER
match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return int(match.group(1))
# Pattern 4: ch-NUMBER, ch_NUMBER, chNUMBER
match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE)
if match:
return int(match.group(1))
# Pattern 5: Just NUMBER.html (like 127.html)
match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 6: _NUMBER at end before extension
match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 7: -NUMBER at end before extension
match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 8: (NUMBER) in parentheses
match = re.search(r'\((\d+)\)', filename)
if match:
return int(match.group(1))
# Pattern 9: [NUMBER] in brackets
match = re.search(r'\[(\d+)\]', filename)
if match:
return int(match.group(1))
# Pattern 10: Use the sort key logic
sort_key = self.get_robust_sort_key(filename)
if isinstance(sort_key[1], int) and sort_key[1] > 0:
return sort_key[1]
# Final fallback: use position + 1
return default_idx + 1
def _extract_main_content(self, html_content: str, filename: str) -> str:
"""Extract main content from web-scraped HTML pages
This method tries to find the actual chapter content within a full webpage
"""
try:
# For web-scraped content, try to extract just the chapter part
# Common patterns for chapter content containers
content_patterns = [
# Look for specific class names commonly used for content
(r']*class="[^"]*(?:chapter-content|entry-content|epcontent|post-content|content-area|main-content)[^"]*"[^>]*>(.*?)
', re.DOTALL | re.IGNORECASE),
# Look for article tags with content
(r']*>(.*?) ', re.DOTALL | re.IGNORECASE),
# Look for main tags
(r']*>(.*?) ', re.DOTALL | re.IGNORECASE),
# Look for specific id patterns
(r']*id="[^"]*(?:content|chapter|post)[^"]*"[^>]*>(.*?)
', re.DOTALL | re.IGNORECASE),
]
for pattern, flags in content_patterns:
match = re.search(pattern, html_content, flags)
if match:
extracted = match.group(1)
# Make sure we got something substantial
if len(extracted.strip()) > 100:
self.log(f"📄 Extracted main content using pattern for {filename}")
return extracted
# If no patterns matched, check if this looks like a full webpage
if ']*>(.*?)', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
self.log(f"📄 Extracted body content for {filename}")
return body_match.group(1)
# If all else fails, return original content
self.log(f"📄 Using original content for {filename}")
return html_content
except Exception as e:
self.log(f"⚠️ Content extraction failed for {filename}: {e}")
return html_content
def _fallback_title_extraction(self, content: str, filename: str, num: int) -> Optional[str]:
"""Fallback title extraction for when TitleExtractor fails
This handles web-scraped pages and other non-standard formats
"""
# Try filename-based extraction first (often more reliable for web scrapes)
filename_title = self._extract_title_from_filename_fallback(filename, num)
if filename_title:
return filename_title
# Try HTML content extraction with patterns TitleExtractor might miss
html_title = self._extract_title_from_html_fallback(content, num)
if html_title:
return html_title
return None
def _extract_title_from_html_fallback(self, content: str, num: int) -> Optional[str]:
"""Fallback HTML title extraction for web-scraped content"""
# Look for title patterns that TitleExtractor might miss
# Specifically for web-scraped novel sites
patterns = [
# Title tags with site separators
r']*>([^|–\-]+?)(?:\s*[|–\-]\s*[^<]+)? ',
# Specific class patterns from novel sites
r']*class="[^"]*cat-series[^"]*"[^>]*>([^<]+)
',
r']*class="[^"]*entry-title[^"]*"[^>]*>([^<]+) ',
r']*class="[^"]*chapter-title[^"]*"[^>]*>([^<]+) ',
# Meta property patterns
r' ]*property="og:title"[^>]*content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
title = match.group(1).strip()
# Decode HTML entities
title = HTMLEntityDecoder.decode(title)
# Additional cleanup for web-scraped content
title = re.sub(r'\s+', ' ', title) # Normalize whitespace
title = title.strip()
# Validate it's reasonable
if 3 < len(title) < 200 and title.lower() != 'untitled':
self.log(f"📝 Fallback extracted title from HTML: '{title}'")
return title
return None
def _extract_title_from_filename_fallback(self, filename: str, num: int) -> Optional[str]:
"""Fallback filename title extraction"""
# Remove extension
base_name = re.sub(r'\.(html?|xhtml)$', '', filename, flags=re.IGNORECASE)
# Web-scraped filename patterns
patterns = [
# "theend-chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7"
r'(?:theend|story|novel)[-_]chapter[-_](\d+)[-_](.+)',
# "chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7"
r'chapter[-_](\d+)[-_](.+)',
# "ch127-title" -> "Chapter 127 - Title"
r'ch[-_]?(\d+)[-_](.+)',
# Just the title part after number
r'^\d+[-_](.+)',
]
for pattern in patterns:
match = re.search(pattern, base_name, re.IGNORECASE)
if match:
if match.lastindex == 2: # Pattern with chapter number and title
chapter_num = match.group(1)
title_part = match.group(2)
else: # Pattern with just title
chapter_num = str(num)
title_part = match.group(1)
# Clean up the title part
title_part = title_part.replace('-', ' ').replace('_', ' ')
# Capitalize properly
words = title_part.split()
title_part = ' '.join(word.capitalize() if len(word) > 2 else word for word in words)
title = f"Chapter {chapter_num} - {title_part}"
self.log(f"📝 Fallback extracted title from filename: '{title}'")
return title
return None
def _load_metadata(self) -> dict:
"""Load metadata from JSON file"""
if os.path.exists(self.metadata_path):
try:
import html
with open(self.metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
self.log("[DEBUG] Metadata loaded successfully")
return metadata
except Exception as e:
self.log(f"[WARNING] Failed to load metadata.json: {e}")
else:
self.log("[WARNING] metadata.json not found, using defaults")
return {}
def _save_metadata(self, metadata: dict) -> None:
"""Persist metadata.json alongside the translated HTML output.
This ensures that once metadata fields (title, creator, subject, description, etc.)
have been translated, future EPUB rebuilds can reuse them instead of sending the
same fields to the API again.
"""
if not isinstance(metadata, dict):
return
try:
os.makedirs(os.path.dirname(self.metadata_path), exist_ok=True)
with open(self.metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
self.log("[DEBUG] Saved updated metadata.json")
except Exception as e:
# Let caller decide how to log/handle, but don't crash compilation here
raise e
def _create_book(self, metadata: dict) -> epub.EpubBook:
"""Create and configure EPUB book with complete metadata"""
book = epub.EpubBook()
# Set identifier
book.set_identifier(metadata.get("identifier", f"translated-{os.path.basename(self.base_dir)}"))
# Fix encoding issues in titles before using them
if metadata.get('title'):
metadata['title'] = self._fix_encoding_issues(metadata['title'])
if metadata.get('original_title'):
metadata['original_title'] = self._fix_encoding_issues(metadata['original_title'])
# Determine title
book_title = self._determine_book_title(metadata)
book.set_title(book_title)
# Set language (dc:language)
language_code = metadata.get("language", "en")
book.set_language(language_code)
# Keep XHTMLConverter in sync so generated html/xml:lang attributes match book language
try:
XHTMLConverter.set_default_language(language_code)
except Exception:
pass
# Store original title as alternative metadata (not as another dc:title)
# This prevents EPUB readers from getting confused about which title to display
if metadata.get('original_title') and metadata.get('original_title') != book_title:
# Use 'alternative' field instead of 'title' to avoid display issues
book.add_metadata('DC', 'alternative', metadata['original_title'])
# Also store in a custom field for reference
book.add_metadata('calibre', 'original_title', metadata['original_title'])
self.log(f"[INFO] Stored original title as alternative: {metadata['original_title']}")
# Set author/creator
if metadata.get("creator"):
book.add_author(metadata["creator"])
self.log(f"[INFO] Set author: {metadata['creator']}")
# ADD DESCRIPTION - This is what Calibre looks for
if metadata.get("description"):
# Clean the description of any HTML entities
description = HTMLEntityDecoder.decode(str(metadata["description"]))
book.add_metadata('DC', 'description', description)
self.log(f"[INFO] Set description: {description[:100]}..." if len(description) > 100 else f"[INFO] Set description: {description}")
# Add publisher
if metadata.get("publisher"):
book.add_metadata('DC', 'publisher', metadata["publisher"])
self.log(f"[INFO] Set publisher: {metadata['publisher']}")
# Add publication date
if metadata.get("date"):
book.add_metadata('DC', 'date', metadata["date"])
self.log(f"[INFO] Set date: {metadata['date']}")
# Add rights/copyright
if metadata.get("rights"):
book.add_metadata('DC', 'rights', metadata["rights"])
self.log(f"[INFO] Set rights: {metadata['rights']}")
# Add subject/genre/tags
if metadata.get("subject"):
if isinstance(metadata["subject"], list):
for subject in metadata["subject"]:
book.add_metadata('DC', 'subject', subject)
self.log(f"[INFO] Added subject: {subject}")
else:
book.add_metadata('DC', 'subject', metadata["subject"])
self.log(f"[INFO] Set subject: {metadata['subject']}")
# Add series information if available
if metadata.get("series"):
# Calibre uses a custom metadata field for series
book.add_metadata('calibre', 'series', metadata["series"])
self.log(f"[INFO] Set series: {metadata['series']}")
# Add series index if available
if metadata.get("series_index"):
book.add_metadata('calibre', 'series_index', str(metadata["series_index"]))
self.log(f"[INFO] Set series index: {metadata['series_index']}")
# Add custom metadata for translator info
if metadata.get("translator"):
book.add_metadata('DC', 'contributor', metadata["translator"], {'role': 'translator'})
self.log(f"[INFO] Set translator: {metadata['translator']}")
# Add source information
if metadata.get("source"):
book.add_metadata('DC', 'source', metadata["source"])
self.log(f"[INFO] Set source: {metadata['source']}")
# Add any ISBN if available
if metadata.get("isbn"):
book.add_metadata('DC', 'identifier', f"ISBN:{metadata['isbn']}", {'scheme': 'ISBN'})
self.log(f"[INFO] Set ISBN: {metadata['isbn']}")
# Add coverage (geographic/temporal scope) if available
if metadata.get("coverage"):
book.add_metadata('DC', 'coverage', metadata["coverage"])
self.log(f"[INFO] Set coverage: {metadata['coverage']}")
# Add any custom metadata that might be in the JSON
# This handles any additional fields that might be present
custom_metadata_fields = [
'contributor', 'format', 'relation', 'type'
]
for field in custom_metadata_fields:
if metadata.get(field):
book.add_metadata('DC', field, metadata[field])
self.log(f"[INFO] Set {field}: {metadata[field]}")
return book
def _determine_book_title(self, metadata: dict) -> str:
"""Determine the book title from metadata"""
# Try translated title
if metadata.get('title') and str(metadata['title']).strip():
title = str(metadata['title']).strip()
self.log(f"✅ Using translated title: '{title}'")
return title
# Try original title
if metadata.get('original_title') and str(metadata['original_title']).strip():
title = str(metadata['original_title']).strip()
self.log(f"⚠️ Using original title: '{title}'")
return title
# Fallback to directory name
title = os.path.basename(self.base_dir)
self.log(f"📁 Using directory name: '{title}'")
return title
def _create_default_css(self) -> str:
"""Create default CSS for proper chapter formatting"""
return """
/* Default EPUB CSS */
body {
margin: 1em;
padding: 0;
font-family: serif;
line-height: 1.6;
}
h1, h2, h3, h4, h5, h6 {
font-weight: bold;
margin-top: 1em;
margin-bottom: 0.5em;
page-break-after: avoid;
}
h1 {
font-size: 1.5em;
text-align: center;
margin-top: 2em;
margin-bottom: 2em;
}
p {
margin: 1em 0;
text-indent: 0;
white-space: normal;
}
/* Ensure proper word spacing for readers like Freda */
body, p, div, span {
word-spacing: normal;
}
img {
max-width: 100%;
height: auto;
display: block;
margin: 1em auto;
}
/* Prevent any overlay issues */
* {
position: static !important;
z-index: auto !important;
}
/* Remove any floating elements */
.title, [class*="title"] {
position: static !important;
float: none !important;
background: transparent !important;
}
"""
def _add_css_files(self, book: epub.EpubBook) -> List[epub.EpubItem]:
"""Add CSS files to book.
Behavior:
* Always adds our built‑in default.css.
* If EPUB_CSS_OVERRIDE_PATH is set, add ONLY that CSS (plus default) and
skip all CSS files from the extracted EPUB/css directory.
* Also overwrites PDF-generated styles.css in the output directory if override is set.
* Otherwise, add all .css files from self.css_dir as before.
"""
css_items = []
# First, add a default CSS to ensure proper formatting
default_css = epub.EpubItem(
uid="css_default",
file_name="css/default.css",
media_type="text/css",
content=FileUtils.ensure_bytes(self._create_default_css())
)
book.add_item(default_css)
css_items.append(default_css)
self.log("✅ Added default CSS")
# Check for explicit CSS override from GUI
# Only apply if attach_css_to_chapters is enabled
override_path = os.getenv('EPUB_CSS_OVERRIDE_PATH', '').strip()
if override_path and self.attach_css_to_chapters:
try:
if os.path.isfile(override_path):
self.log(f"[INFO] Using override CSS for EPUB: {override_path}")
with open(override_path, 'r', encoding='utf-8') as f:
css_content = f.read()
# IMPORTANT: Overwrite PDF-generated styles.css if it exists
styles_css_path = os.path.join(self.output_dir, 'styles.css')
if os.path.exists(styles_css_path):
try:
with open(styles_css_path, 'w', encoding='utf-8') as f:
f.write(css_content)
self.log(f"✅ Overwrote PDF-generated styles.css with loaded CSS")
except Exception as e:
self.log(f"[WARNING] Failed to overwrite styles.css: {e}")
override_item = epub.EpubItem(
uid="css_override",
file_name="css/override.css",
media_type="text/css",
content=FileUtils.ensure_bytes(css_content)
)
book.add_item(override_item)
css_items.append(override_item)
return css_items
else:
self.log(f"[WARNING] EPUB_CSS_OVERRIDE_PATH does not exist: {override_path}")
except Exception as e:
self.log(f"[WARNING] Failed to load override CSS '{override_path}': {e}")
# Fall back to normal behavior below
elif override_path and not self.attach_css_to_chapters:
self.log(f"[INFO] CSS override set but 'Attach CSS to Chapters' is disabled - ignoring override")
# Then add user CSS files from css/ directory (original behavior)
if not os.path.isdir(self.css_dir):
return css_items
css_files = [f for f in sorted(os.listdir(self.css_dir)) if f.endswith('.css')]
self.log(f"[DEBUG] Found {len(css_files)} CSS files")
for css_file in css_files:
css_path = os.path.join(self.css_dir, css_file)
try:
with open(css_path, 'r', encoding='utf-8') as f:
css_content = f.read()
css_item = epub.EpubItem(
uid=f"css_{css_file}",
file_name=f"css/{css_file}",
media_type="text/css",
content=FileUtils.ensure_bytes(css_content)
)
book.add_item(css_item)
css_items.append(css_item)
self.log(f"✅ Added CSS: {css_file}")
except Exception as e:
self.log(f"[WARNING] Failed to add CSS {css_file}: {e}")
return css_items
def _add_fonts(self, book: epub.EpubBook):
"""Add font files to book"""
if not os.path.isdir(self.fonts_dir):
return
for font_file in os.listdir(self.fonts_dir):
font_path = os.path.join(self.fonts_dir, font_file)
if not os.path.isfile(font_path):
continue
try:
mime_type = 'application/font-woff'
if font_file.endswith('.ttf'):
mime_type = 'font/ttf'
elif font_file.endswith('.otf'):
mime_type = 'font/otf'
elif font_file.endswith('.woff2'):
mime_type = 'font/woff2'
with open(font_path, 'rb') as f:
book.add_item(epub.EpubItem(
uid=f"font_{font_file}",
file_name=f"fonts/{font_file}",
media_type=mime_type,
content=f.read()
))
self.log(f"✅ Added font: {font_file}")
except Exception as e:
self.log(f"[WARNING] Failed to add font {font_file}: {e}")
def _process_images(self) -> Tuple[Dict[str, str], Optional[str]]:
"""Process images using parallel processing"""
processed_images = {}
cover_file = None
try:
# Find the images directory
actual_images_dir = None
possible_dirs = [
self.images_dir,
os.path.join(self.base_dir, "images"),
os.path.join(self.output_dir, "images"),
]
for test_dir in possible_dirs:
self.log(f"[DEBUG] Checking for images in: {test_dir}")
if os.path.isdir(test_dir):
files = os.listdir(test_dir)
if files:
self.log(f"[DEBUG] Found {len(files)} files in {test_dir}")
actual_images_dir = test_dir
break
if not actual_images_dir:
self.log("[WARNING] No images directory found or directory is empty")
return processed_images, cover_file
self.images_dir = actual_images_dir
self.log(f"[INFO] Using images directory: {self.images_dir}")
# Get list of files to process
image_files = sorted(os.listdir(self.images_dir))
self.log(f"🖼️ Processing {len(image_files)} potential images with {self.max_workers} workers")
def process_single_image(img):
"""Worker function to process a single image"""
path = os.path.join(self.images_dir, img)
if not os.path.isfile(path):
return None
# Check MIME type
ctype, _ = mimetypes.guess_type(path)
# If MIME type detection fails, check extension
if not ctype:
ext = os.path.splitext(img)[1].lower()
mime_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
'.svg': 'image/svg+xml'
}
ctype = mime_map.get(ext)
if ctype and ctype.startswith("image"):
safe_name = FileUtils.sanitize_filename(img, allow_unicode=False)
# Ensure extension
if not os.path.splitext(safe_name)[1]:
ext = os.path.splitext(img)[1]
if ext:
safe_name += ext
elif ctype == 'image/jpeg':
safe_name += '.jpg'
elif ctype == 'image/png':
safe_name += '.png'
# Special handling for SVG: rasterize to PNG fallback for reader compatibility
if ctype == 'image/svg+xml' and self.rasterize_svg and self._cairosvg_available:
try:
from cairosvg import svg2png
png_name = os.path.splitext(safe_name)[0] + '.png'
png_path = os.path.join(self.images_dir, png_name)
# Generate PNG only if not already present
if not os.path.exists(png_path):
svg2png(url=path, write_to=png_path)
self.log(f" 🖼️ Rasterized SVG → PNG: {img} -> {png_name}")
# Return the PNG as the image to include
return (png_name, png_name, 'image/png')
except Exception as e:
self.log(f"[WARNING] SVG rasterization failed for {img}: {e}")
# Fall back to adding the raw SVG
return (img, safe_name, ctype)
return (img, safe_name, ctype)
else:
return None
# Process images in parallel
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(process_single_image, img) for img in image_files]
completed = 0
# Only log periodically for large image sets to avoid GUI lag
use_reduced_logging = len(image_files) > 50
# Log at 5% intervals for large sets (20 updates total)
log_interval = max(1, len(image_files) // 20) if use_reduced_logging else 1
last_logged_percent = -1
for future in as_completed(futures):
# Check stop flag
if self.is_stopped():
self.log("🛑 Image processing stopped by user")
break
try:
result = future.result()
completed += 1
if result:
original, safe, ctype = result
processed_images[original] = safe
# Log based on image count
if use_reduced_logging:
# For large sets: log at percentage milestones or interval
current_percent = (completed * 100) // len(image_files)
should_log = (completed % log_interval == 0 or completed == 1 or completed == len(image_files))
# Also log when percentage changes for better feedback
if should_log or (current_percent != last_logged_percent and current_percent % 5 == 0):
self.log(f" [{completed}/{len(image_files)}] ({current_percent}%) ✅")
last_logged_percent = current_percent
else:
# For small sets: log every image
self.log(f" [{completed}/{len(image_files)}] ✅ Processed: {original} -> {safe}")
else:
# Log skipped files based on image count
if not use_reduced_logging or completed % log_interval == 0:
self.log(f" [{completed}/{len(image_files)}] ⏭️ Skipped non-image file")
except Exception as e:
completed += 1
# Always log errors
self.log(f" [{completed}/{len(image_files)}] ❌ Failed to process image: {e}")
# Find cover (sequential - quick operation)
# Respect user preference to disable automatic cover creation
disable_auto_cover = os.environ.get('DISABLE_AUTOMATIC_COVER_CREATION', '0') == '1'
if processed_images and not disable_auto_cover:
cover_prefixes = ['cover', 'front']
for original_name, safe_name in processed_images.items():
name_lower = original_name.lower()
if any(name_lower.startswith(prefix) for prefix in cover_prefixes):
cover_file = safe_name
self.log(f"📔 Found cover image: {original_name} -> {cover_file}")
break
if not cover_file:
# Sort numerically so e.g. "2.jpg" comes before "10.jpg"
import re as _re
def _numeric_key(name):
parts = _re.split(r'(\d+)', name.lower())
return [int(p) if p.isdigit() else p for p in parts]
first_original = sorted(processed_images.keys(), key=_numeric_key)[0]
cover_file = processed_images[first_original]
self.log(f"📔 Using first image (numerically sorted) as cover: {cover_file}")
self.log(f"✅ Processed {len(processed_images)} images successfully")
except Exception as e:
self.log(f"[ERROR] Error processing images: {e}")
import traceback
self.log(f"[DEBUG] Traceback: {traceback.format_exc()}")
return processed_images, cover_file
def _add_images_to_book(self, book: epub.EpubBook, processed_images: Dict[str, str],
cover_file: Optional[str]):
"""Add images to book using parallel processing for reading files"""
# Filter out cover image
images_to_add = [(orig, safe) for orig, safe in processed_images.items()
if safe != cover_file]
if not images_to_add:
self.log("No images to add (besides cover)")
return
self.log(f"📚 Adding {len(images_to_add)} images to EPUB with {self.max_workers} workers")
def read_image_file(image_data):
"""Worker function to read image file"""
original_name, safe_name = image_data
img_path = os.path.join(self.images_dir, original_name)
# If original was compressed to a different format (e.g. png→webp),
# the original file may have been deleted. Fall back to the
# compressed file whose extension matches safe_name.
if not os.path.isfile(img_path):
safe_ext = os.path.splitext(safe_name)[1]
if safe_ext:
alt_path = os.path.join(
self.images_dir,
os.path.splitext(original_name)[0] + safe_ext
)
if os.path.isfile(alt_path):
img_path = alt_path
try:
ctype, _ = mimetypes.guess_type(img_path)
if not ctype:
ctype = "image/jpeg" # Default fallback
with open(img_path, 'rb') as f:
content = f.read()
return {
'original': original_name,
'safe': safe_name,
'ctype': ctype,
'content': content,
'success': True
}
except Exception as e:
return {
'original': original_name,
'safe': safe_name,
'error': str(e),
'success': False
}
# Read all images in parallel
image_data_list = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(read_image_file, img_data) for img_data in images_to_add]
completed = 0
# Only log periodically for large image sets to avoid GUI lag
use_reduced_logging = len(images_to_add) > 50
# Log at 5% intervals for large sets (20 updates total)
log_interval = max(1, len(images_to_add) // 20) if use_reduced_logging else 1
last_logged_percent = -1
for future in as_completed(futures):
# Check stop flag
if self.is_stopped():
self.log("🛑 Image reading stopped by user")
break
try:
result = future.result()
completed += 1
if result['success']:
image_data_list.append(result)
# Log based on image count
if use_reduced_logging:
# For large sets: log at percentage milestones or interval
current_percent = (completed * 100) // len(images_to_add)
should_log = (completed % log_interval == 0 or completed == 1 or completed == len(images_to_add))
# Also log when percentage changes for better feedback
if should_log or (current_percent != last_logged_percent and current_percent % 5 == 0):
self.log(f" [{completed}/{len(images_to_add)}] ({current_percent}%) ✅")
last_logged_percent = current_percent
else:
# For small sets: log every image
self.log(f" [{completed}/{len(images_to_add)}] ✅ Read: {result['original']}")
else:
# Always log failures
self.log(f" [{completed}/{len(images_to_add)}] ❌ Failed: {result['original']} - {result['error']}")
except Exception as e:
completed += 1
# Always log exceptions
self.log(f" [{completed}/{len(images_to_add)}] ❌ Exception reading image: {e}")
# Add images to book sequentially (required by ebooklib)
self.log("\n📦 Adding images to EPUB structure...")
added = 0
use_reduced_logging = len(image_data_list) > 50
log_interval = max(1, len(image_data_list) // 20) if use_reduced_logging else 1
for idx, img_data in enumerate(image_data_list, 1):
# Check stop flag
if self.is_stopped():
self.log(f"🛑 Image addition stopped by user ({added}/{len(image_data_list)} images added)")
break
try:
book.add_item(epub.EpubItem(
uid=img_data['safe'],
file_name=f"images/{img_data['safe']}",
media_type=img_data['ctype'],
content=img_data['content']
))
added += 1
# Only log periodically for large sets
if use_reduced_logging:
if idx % log_interval == 0 or idx == 1 or idx == len(image_data_list):
percent = (idx * 100) // len(image_data_list)
self.log(f" [{idx}/{len(image_data_list)}] ({percent}%) ✅")
else:
self.log(f" ✅ Added: {img_data['original']}")
except Exception as e:
self.log(f" ❌ Failed to add {img_data['original']} to EPUB: {e}")
if self.is_stopped():
self.log(f"⚠️ Image addition incomplete: {added}/{len(images_to_add)} images were added before stopping")
else:
self.log(f"✅ Successfully added {added}/{len(images_to_add)} images to EPUB")
def _create_cover_page(self, book: epub.EpubBook, cover_file: str,
processed_images: Dict[str, str], css_items: List[epub.EpubItem],
metadata: dict) -> Optional[epub.EpubHtml]:
"""Create cover page"""
# Find original filename
original_cover = None
for orig, safe in processed_images.items():
if safe == cover_file:
original_cover = orig
break
if not original_cover:
return None
cover_path = os.path.join(self.images_dir, original_cover)
# If original was compressed to a different format (e.g. jpg→webp),
# the original file may have been deleted. Fall back to the
# compressed file whose extension matches cover_file.
if not os.path.isfile(cover_path):
cover_ext = os.path.splitext(cover_file)[1]
if cover_ext:
alt_path = os.path.join(
self.images_dir,
os.path.splitext(original_cover)[0] + cover_ext
)
if os.path.isfile(alt_path):
cover_path = alt_path
try:
with open(cover_path, 'rb') as f:
cover_data = f.read()
# Add cover image
cover_img = epub.EpubItem(
uid="cover-image",
file_name=f"images/{cover_file}",
media_type=mimetypes.guess_type(cover_path)[0] or "image/jpeg",
content=cover_data
)
book.add_item(cover_img)
# Set cover metadata
cover_img.properties = ["cover-image"]
book.add_metadata('http://purl.org/dc/elements/1.1/', 'cover', 'cover-image')
# Create cover page
text_dirname = "Text" if getattr(self, 'legacy_epub_structure', False) else ""
cover_page_name = f"{text_dirname}/cover.xhtml" if text_dirname else "cover.xhtml"
cover_page = epub.EpubHtml(
title="Cover",
file_name=cover_page_name,
lang=metadata.get("language", "en")
)
# Build cover HTML directly without going through ensure_compliance
# Since it's simple and controlled, we can build it directly
lang = metadata.get("language", "en")
img_href = f"../images/{cover_file}" if getattr(self, 'legacy_epub_structure', False) else f"images/{cover_file}"
cover_content = f'''
Cover
'''
cover_page.content = cover_content.encode('utf-8')
# Associate CSS with cover page if needed
if self.attach_css_to_chapters:
for css_item in css_items:
cover_page.add_item(css_item)
book.add_item(cover_page)
self.log(f"✅ Set cover image: {cover_file}")
return cover_page
except Exception as e:
self.log(f"[WARNING] Failed to add cover: {e}")
return None
def _process_chapter_images(self, xhtml_content: str, processed_images: Dict[str, str]) -> str:
"""Process image paths and inline SVG in chapter content.
- Rewrites to use images/ paths and prefers PNG fallback for SVGs.
- Converts inline elements to when CairoSVG is available.
"""
try:
soup = BeautifulSoup(xhtml_content, 'lxml')
changed = False
# Track statistics for summary
total_images = 0
found_images = 0
missing_images = []
# 1) Handle tags that reference files
for img in soup.find_all('img'):
src = img.get('src', '')
if not src:
self.log(f"[WARNING] Image tag with no src attribute found")
continue
total_images += 1
# Get the base filename - handle various path formats
# Remove query parameters first
clean_src = src.split('?')[0]
basename = os.path.basename(clean_src)
# Look up the safe name
if basename in processed_images:
safe_name = processed_images[basename]
img_prefix = "../images/" if getattr(self, 'legacy_epub_structure', False) else "images/"
new_src = f"{img_prefix}{safe_name}"
if src != new_src:
img['src'] = new_src
changed = True
found_images += 1
else:
# Try without extension variations
name_without_ext = os.path.splitext(basename)[0]
found = False
for original_name, safe_name in processed_images.items():
if os.path.splitext(original_name)[0] == name_without_ext:
img_prefix = "../images/" if getattr(self, 'legacy_epub_structure', False) else "images/"
new_src = f"{img_prefix}{safe_name}"
img['src'] = new_src
changed = True
found = True
found_images += 1
break
if not found:
missing_images.append(basename)
# Still update the path to use images/ prefix if it doesn't have it
img_prefix = "../images/" if getattr(self, 'legacy_epub_structure', False) else "images/"
if not src.startswith(('images/', '../images/')):
img['src'] = f"{img_prefix}{basename}"
changed = True
# Ensure alt attribute exists (required for XHTML)
if not img.get('alt'):
img['alt'] = ''
changed = True
# 2) Convert inline SVG wrappers that point to raster images into plain
# Example:
for svg_tag in soup.find_all('svg'):
try:
image_child = svg_tag.find('image')
if image_child:
href = (
image_child.get('xlink:href') or
image_child.get('href') or
image_child.get('{http://www.w3.org/1999/xlink}href')
)
if href:
clean_href = href.split('?')[0]
basename = os.path.basename(clean_href)
# Map to processed image name
if basename in processed_images:
safe_name = processed_images[basename]
else:
name_wo = os.path.splitext(basename)[0]
safe_name = None
for orig, safe in processed_images.items():
if os.path.splitext(orig)[0] == name_wo:
safe_name = safe
break
img_prefix = "../images/" if getattr(self, 'legacy_epub_structure', False) else "images/"
new_src = f"{img_prefix}{safe_name}" if safe_name else f"{img_prefix}{basename}"
new_img = soup.new_tag('img')
new_img['src'] = new_src
new_img['alt'] = svg_tag.get('aria-label') or svg_tag.get('title') or ''
new_img['style'] = 'width:100%; height:auto; display:block;'
svg_tag.replace_with(new_img)
changed = True
self.log(f"[DEBUG] Rewrote inline SVG to ")
except Exception as e:
self.log(f"[WARNING] Failed to rewrite inline SVG wrapper: {e}")
# 3) Convert remaining inline (complex vector art) to PNG data URIs if possible
if self.rasterize_svg and self._cairosvg_available:
try:
from cairosvg import svg2png
import base64
for svg_tag in soup.find_all('svg'):
try:
svg_markup = str(svg_tag)
png_bytes = svg2png(bytestring=svg_markup.encode('utf-8'))
b64 = base64.b64encode(png_bytes).decode('ascii')
alt_text = svg_tag.get('aria-label') or svg_tag.get('title') or ''
new_img = soup.new_tag('img')
new_img['src'] = f'data:image/png;base64,{b64}'
new_img['alt'] = alt_text
new_img['style'] = 'width:100%; height:auto; display:block;'
svg_tag.replace_with(new_img)
changed = True
self.log("[DEBUG] Converted inline to PNG data URI")
except Exception as e:
self.log(f"[WARNING] Failed to rasterize inline SVG: {e}")
except Exception:
pass
# Log summary only if there are issues
if total_images > 0 and missing_images:
self.log(f"[WARNING] Chapter images: {found_images}/{total_images} found. Missing: {missing_images[:5]}{'...' if len(missing_images) > 5 else ''}")
if changed:
# Return the modified content
return str(soup)
return xhtml_content
except Exception as e:
self.log(f"[WARNING] Failed to process images in chapter: {e}")
return xhtml_content
def _create_gallery_page(self, book: epub.EpubBook, images: List[str],
css_items: List[epub.EpubItem], metadata: dict) -> epub.EpubHtml:
"""Create image gallery page - FIXED to avoid escaping HTML tags"""
text_dirname = "Text" if getattr(self, 'legacy_epub_structure', False) else ""
gallery_page_name = f"{text_dirname}/gallery.xhtml" if text_dirname else "gallery.xhtml"
gallery_page = epub.EpubHtml(
title="Gallery",
file_name=gallery_page_name,
lang=metadata.get("language", "en")
)
# Build the gallery body content
gallery_body_parts = ['Image Gallery ']
img_prefix = "../images/" if getattr(self, 'legacy_epub_structure', False) else "images/"
for img in images:
gallery_body_parts.append(
f''
f'
'
f'
'
)
gallery_body_content = '\n'.join(gallery_body_parts)
# Build XHTML directly without going through ensure_compliance
# which might escape our HTML tags
css_prefix = "../css/" if getattr(self, 'legacy_epub_structure', False) else "css/"
css_links = [f"{css_prefix}{item.file_name.split('/')[-1]}" for item in css_items]
# Build the complete XHTML document manually
lang = metadata.get("language", "en")
xhtml_content = f'''
Gallery '''
# Add CSS links
for css_link in css_links:
xhtml_content += f'\n '
xhtml_content += f'''
{gallery_body_content}
'''
# Validate the XHTML
validated_content = XHTMLConverter.validate(xhtml_content)
# Set the content
gallery_page.content = FileUtils.ensure_bytes(validated_content)
# Associate CSS with gallery page
if self.attach_css_to_chapters:
for css_item in css_items:
gallery_page.add_item(css_item)
book.add_item(gallery_page)
return gallery_page
# --- toc.ncx support (source TOC) ---
def _strip_all_ext(self, name: str) -> str:
core = name
while True:
base, ext = os.path.splitext(core)
if ext and ext.lower() in ['.html', '.htm', '.xhtml', '.xml']:
core = base
else:
break
return core
def _build_opf_filename_map(self) -> dict:
"""Build a mapping from core_name → original OPF basename from content.opf.
Returns an empty dict if content.opf does not exist or is unparseable.
The keys are lower-cased core names (extensions and 'response_' stripped).
The values are the original basenames exactly as they appear in the OPF manifest.
"""
opf_path = os.path.join(self.output_dir, 'content.opf')
if not os.path.exists(opf_path):
return {}
try:
tree = ET.parse(opf_path)
root = tree.getroot()
ns_uri = ''
if root.tag.startswith('{'):
ns_uri = root.tag[1:root.tag.index('}')]
ns = {'opf': ns_uri} if ns_uri else {}
mapping = {}
xpath = './/opf:manifest/opf:item' if ns else \
'.//{http://www.idpf.org/2007/opf}manifest/{http://www.idpf.org/2007/opf}item'
for item in root.findall(xpath, ns if ns else None):
href = item.get('href', '')
media = item.get('media-type', '')
if not href:
continue
if 'html' not in media.lower() and \
not href.lower().endswith(('.html', '.xhtml', '.htm')):
continue
basename = os.path.basename(href)
core = self._strip_all_ext(basename).lower().strip()
if core:
mapping[core] = basename
return mapping
except Exception:
return {}
def _restore_opf_filename(self, disk_filename: str, opf_map: dict) -> str:
"""Return the original OPF basename for *disk_filename*, or the
basename unchanged if it cannot be resolved.
Handles ``response_`` prefix and stacked extensions automatically.
"""
base = os.path.basename(disk_filename)
core = base
if core.startswith('response_'):
core = core[9:]
core = self._strip_all_ext(core).lower().strip()
return opf_map.get(core, base)
def _normalize_core_name(self, filename_or_href: str) -> str:
"""Normalize a filename/href for matching (strip fragment, response_ prefix, and all extensions)."""
if not filename_or_href:
return ''
href = str(filename_or_href)
if '#' in href:
href = href.split('#', 1)[0]
base = os.path.basename(href)
if base.startswith('response_'):
base = base[9:]
base = self._strip_all_ext(base)
return base.lower().strip()
def _extract_source_toc_ncx_entries(self, source_epub_path: str) -> List[Dict[str, str]]:
"""Extract ordered navPoint entries from the source EPUB's toc.ncx.
Returns list of dicts: {'label': str, 'src': str}
"""
entries: List[Dict[str, str]] = []
if not source_epub_path or not os.path.exists(source_epub_path):
return entries
try:
import zipfile
with zipfile.ZipFile(source_epub_path, 'r') as zf:
ncx_path = None
# 1) Try container.xml -> OPF -> manifest item media-type application/x-dtbncx+xml
opf_path = None
try:
container = zf.read('META-INF/container.xml')
tree = ET.fromstring(container)
rootfile = tree.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile')
if rootfile is not None:
opf_path = rootfile.get('full-path')
except Exception:
opf_path = None
if not opf_path:
for name in zf.namelist():
if name.lower().endswith('.opf'):
opf_path = name
break
if opf_path:
try:
opf_bytes = zf.read(opf_path)
root = ET.fromstring(opf_bytes)
ns = {'opf': 'http://www.idpf.org/2007/opf'}
if root.tag.startswith('{'):
default_ns = root.tag[1:root.tag.index('}')]
ns = {'opf': default_ns}
for item in root.findall('.//opf:manifest/opf:item', ns):
mt = (item.get('media-type') or '').strip().lower()
item_id = (item.get('id') or '').strip().lower()
href = item.get('href')
if not href:
continue
if mt == 'application/x-dtbncx+xml' or item_id == 'ncx':
base_dir = os.path.dirname(opf_path)
candidate = os.path.join(base_dir, href).replace('\\', '/') if base_dir else href
if candidate in zf.namelist():
ncx_path = candidate
break
except Exception:
ncx_path = None
# 2) Fallback: find toc.ncx by name
if not ncx_path:
for name in zf.namelist():
if name.lower().endswith('toc.ncx'):
ncx_path = name
break
if not ncx_path:
return entries
ncx_bytes = zf.read(ncx_path)
# Parse outside zip context
root = ET.fromstring(ncx_bytes)
ns_uri = ''
if root.tag.startswith('{'):
ns_uri = root.tag[1:root.tag.index('}')]
ns = {'ncx': ns_uri} if ns_uri else {}
navpoints = root.findall('.//ncx:navPoint', ns) if ns else root.findall('.//navPoint')
for np in navpoints:
label = ''
src = ''
nav_text = np.find('ncx:navLabel/ncx:text', ns) if ns else np.find('navLabel/text')
if nav_text is not None and nav_text.text:
label = nav_text.text.strip()
content = np.find('ncx:content', ns) if ns else np.find('content')
if content is not None:
src = (content.get('src') or '').strip()
if label or src:
entries.append({'label': label, 'src': src})
except Exception as e:
self.log(f"⚠️ Failed to parse source toc.ncx: {e}")
return []
return entries
def _save_toc_translations_file(self, output_path: str, original: Dict[int, str], translated: Dict[int, str], refs: Dict[int, str]):
"""Save TOC translations in the same block format as translated_headers.txt (robust parsing)."""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write("TOC Translations\n")
f.write("=" * 50 + "\n\n")
nums = list(original.keys())
for num in nums:
orig = original.get(num, "")
trans = translated.get(num, orig)
f.write(f"Chapter {num}:\n")
f.write(f" Original: {orig}\n")
f.write(f" Translated: {trans}\n")
if refs and num in refs and refs[num]:
f.write(f" Output File: {refs[num]}\n")
if num not in translated:
f.write(" Status: ⚠️ Using original (translation failed)\n")
f.write("-" * 40 + "\n")
f.write("\nSummary:\n")
f.write(f"Total entries: {len(nums)}\n")
if nums:
f.write(f"Entry range: {min(nums)} to {max(nums)}\n")
f.write(f"Successfully translated: {len(translated)}\n")
except Exception as e:
self.log(f"⚠️ Failed to save TOC.txt: {e}")
def _load_toc_translations_file(self, toc_txt_path: str) -> Tuple[Dict[int, str], Dict[int, str], Dict[int, str]]:
"""Load TOC translations from TOC.txt (same parser as translated_headers.txt)."""
try:
from translate_headers_standalone import load_translations_from_file
source_headers, translated, output_files = load_translations_from_file(toc_txt_path, self.log)
return source_headers or {}, translated or {}, output_files or {}
except Exception:
return {}, {}, {}
def _build_toc_from_source_toc_ncx(self, spine: List, existing_toc: List, metadata: dict) -> List:
"""Build TOC from source toc.ncx, optionally translating navLabels and caching to TOC.txt."""
source_epub_path = os.getenv('EPUB_PATH')
if not source_epub_path or not os.path.exists(source_epub_path):
self.log("⚠️ USE_TOC_NCX enabled but EPUB_PATH is missing or invalid; using generated TOC")
return existing_toc
entries = self._extract_source_toc_ncx_entries(source_epub_path)
if not entries:
self.log("⚠️ USE_TOC_NCX enabled but no toc.ncx entries were found; using generated TOC")
return existing_toc
# Build mapping from normalized core name -> actual spine href
spine_href_by_core: Dict[str, str] = {}
spine_items_for_order = []
for it in spine:
if not hasattr(it, 'file_name'):
continue
# Skip cover for mapping
if hasattr(it, 'title') and str(getattr(it, 'title', '')).strip().lower() == 'cover':
continue
# Skip gallery for mapping
if os.path.basename(it.file_name).lower() == 'gallery.xhtml':
continue
spine_items_for_order.append(it)
core = self._normalize_core_name(it.file_name)
if core and core not in spine_href_by_core:
spine_href_by_core[core] = it.file_name
# Fallback: map by OPF order (source filename order -> spine chapter order)
try:
opf_order = self._get_chapter_order_from_opf() or {}
if opf_order and spine_items_for_order:
ordered_source = [fn for fn, _ in sorted(opf_order.items(), key=lambda x: x[1])]
limit = min(len(ordered_source), len(spine_items_for_order))
for i in range(limit):
src_core = self._normalize_core_name(ordered_source[i])
if src_core and src_core not in spine_href_by_core:
spine_href_by_core[src_core] = spine_items_for_order[i].file_name
except Exception:
pass
# Optional translation (single API call) with caching to TOC.txt
translations: Dict[int, str] = {}
toc_filter_nums: Optional[Set[int]] = None
original: Dict[int, str] = {}
refs: Dict[int, str] = {}
for idx, ent in enumerate(entries, 1):
if toc_filter_nums is not None and idx not in toc_filter_nums:
# Entry was removed from TOC.txt by user; skip it entirely
continue
original[idx] = ent.get('label', '') or ''
refs[idx] = ent.get('src', '') or ''
if getattr(self, 'translate_toc_ncx', False):
toc_txt_path = os.path.join(self.output_dir, 'TOC.txt')
if os.path.exists(toc_txt_path):
self.log("📁 Found existing TOC.txt - using cached toc.ncx translations")
toc_source_headers, translations, _ = self._load_toc_translations_file(toc_txt_path)
# If TOC.txt exists, treat it as authoritative: only include entries present in the file.
if toc_source_headers:
toc_filter_nums = set(toc_source_headers.keys())
else:
if not getattr(self, 'api_client', None):
self.log("⚠️ TRANSLATE_TOC_NCX enabled but API client is not initialized; using original toc.ncx labels")
translations = {}
else:
toc_ncx_per_batch = int(os.environ.get('TOC_NCX_PER_BATCH', '-1'))
if toc_ncx_per_batch <= 0:
# Auto-calculate based on output token limit / compression factor
max_output_tokens = int(os.environ.get('MAX_OUTPUT_TOKENS', '8192'))
compression_factor = float(os.environ.get('COMPRESSION_FACTOR', '3.0'))
if compression_factor <= 0:
compression_factor = 3.0
available_tokens = int(max_output_tokens / compression_factor)
# ~50 tokens per TOC entry (title text + JSON key/formatting overhead)
toc_ncx_per_batch = max(10, available_tokens // 50)
self.log(f"📐 Auto batch size: {toc_ncx_per_batch} entries "
f"({max_output_tokens:,} output / {compression_factor:.1f}x compression = "
f"{available_tokens:,} available tokens, ~50 tok/entry)")
self.log(f"🌐 Translating {len(original)} toc.ncx entries in chunks of {toc_ncx_per_batch}...")
try:
from metadata_batch_translator import BatchHeaderTranslator
tr = BatchHeaderTranslator(self.api_client, {})
skip_dup_translate = os.environ.get('SKIP_DUPLICATE_TOC_TRANSLATION', '0') == '1'
if skip_dup_translate:
unique_original: Dict[int, str] = {}
first_idx_by_label: Dict[str, int] = {}
for idx, label in original.items():
key = (label or '').strip()
if key in first_idx_by_label:
continue
first_idx_by_label[key] = idx
unique_original[idx] = label
self.log(f"🔁 Skip duplicate translation enabled: {len(unique_original)}/{len(original)} unique labels")
unique_translations = tr.translate_headers_batch(unique_original, batch_size=toc_ncx_per_batch, translation_type='toc') or {}
translations = {}
for idx, label in original.items():
key = (label or '').strip()
first_idx = first_idx_by_label.get(key, idx)
if first_idx in unique_translations:
translations[idx] = unique_translations[first_idx]
else:
translations = tr.translate_headers_batch(original, batch_size=toc_ncx_per_batch, translation_type='toc') or {}
if translations:
# Pre-compute actual output paths so "Output File" shows
# the path in the built EPUB, not the raw NCX src.
# Strip file extensions to prevent double-extension issues.
output_refs: Dict[int, str] = {}
for _idx, _ent in enumerate(entries, 1):
_src = (_ent.get('src') or '').strip()
if not _src:
_fallback = refs.get(_idx, '')
# Strip extensions from fallback too
_fb_base = os.path.basename(_fallback) if _fallback else ''
while _fb_base:
_name, _ext = os.path.splitext(_fb_base)
if _ext and _ext.lower() in ['.html', '.xhtml', '.htm', '.xml']:
_fb_base = _name
else:
break
output_refs[_idx] = _fb_base
continue
_src_base, _frag = (_src.split('#', 1) + [''])[:2] if '#' in _src else (_src, '')
_core = self._normalize_core_name(_src_base)
_target = spine_href_by_core.get(_core)
_resolved = _target if _target else _src_base
# Strip extensions to prevent double-extension issues
_clean = os.path.basename(_resolved)
while True:
_name, _ext = os.path.splitext(_clean)
if _ext and _ext.lower() in ['.html', '.xhtml', '.htm', '.xml']:
_clean = _name
else:
break
output_refs[_idx] = _clean
self._save_toc_translations_file(toc_txt_path, original, translations, output_refs)
self.log(f"✅ Saved TOC translations cache: {toc_txt_path}")
else:
self.log(f"⚠️ toc.ncx translation returned no results - skipping cache (will retry next run)")
except Exception as e:
self.log(f"⚠️ toc.ncx translation failed: {e}")
translations = {}
# Build toc links in source toc.ncx order
toc_links = []
missing = 0
seen_labels = set()
dedup_enabled = os.environ.get('DEDUPLICATE_TOC', '0') == '1'
dedup_use_translated = os.environ.get('DEDUPLICATE_TOC_USE_TRANSLATED', '0') == '1'
for idx, ent in enumerate(entries, 1):
if toc_filter_nums is not None and idx not in toc_filter_nums:
# Entry was removed from TOC.txt by user; skip it entirely
continue
src = (ent.get('src') or '').strip()
raw_label = original.get(idx) or (ent.get('label') or '').strip()
label = translations.get(idx) or raw_label
if not src:
continue
if dedup_enabled:
key = label if dedup_use_translated else raw_label
if key in seen_labels:
continue
seen_labels.add(key)
frag = ''
src_base = src
if '#' in src:
src_base, frag = src.split('#', 1)
core = self._normalize_core_name(src_base)
target_base = spine_href_by_core.get(core)
if not target_base:
missing += 1
continue
# Ensure the target HTML actually exists before linking (dynamic fallback roots)
def _path_exists_with_fallback(rel_path: str) -> bool:
if not rel_path:
return False
# Build all candidate relative paths:
# - original, with/without Text/ prefix
# - with response_ prefix and alternate extensions
base_candidates = [rel_path]
norm_rel = rel_path.replace('\\', '/')
if norm_rel.lower().startswith('text/'):
base_candidates.append(norm_rel[5:])
else:
base_candidates.append(f"Text/{norm_rel}")
# Also try response_ prefix + alternate extensions
_alt_exts = ['.html', '.xhtml', '.htm']
expanded = []
for cand in base_candidates:
expanded.append(cand)
cand_dir = os.path.dirname(cand)
cand_base = os.path.basename(cand)
cand_core, cand_ext = os.path.splitext(cand_base)
# Strip stacked extensions
while cand_ext and cand_ext.lower() in ('.html', '.xhtml', '.htm', '.xml'):
cand_core_next, cand_ext_next = os.path.splitext(cand_core)
if cand_ext_next and cand_ext_next.lower() in ('.html', '.xhtml', '.htm', '.xml'):
cand_core = cand_core_next
cand_ext = cand_ext_next
else:
break
# Try with response_ prefix
if not cand_base.startswith('response_'):
for ext in _alt_exts:
resp_name = f"response_{cand_core}{ext}"
expanded.append(os.path.join(cand_dir, resp_name) if cand_dir else resp_name)
# Try without response_ prefix + alternate extensions
bare_core = cand_core
if bare_core.startswith('response_'):
bare_core = bare_core[9:]
for ext in _alt_exts:
if ext != cand_ext:
expanded.append(os.path.join(cand_dir, f"{bare_core}{ext}") if cand_dir else f"{bare_core}{ext}")
# Direct path check
for cand in expanded:
direct = os.path.normpath(os.path.join(self.output_dir, cand))
if os.path.exists(direct):
return True
# Dynamically discover likely roots by locating any HTML/XHTML files
roots = set()
try:
for root_dir, _, files in os.walk(self.output_dir):
if any(f.lower().endswith(('.html', '.htm', '.xhtml')) for f in files):
roots.add(root_dir)
# Prefer shorter paths first
candidate_roots = sorted(roots, key=lambda p: len(p))
except Exception:
candidate_roots = []
for root_dir in candidate_roots:
for cand in expanded:
candidate = os.path.normpath(os.path.join(root_dir, cand))
if os.path.exists(candidate):
return True
# Final fallback: scan output dir for any HTML file matching core name
# This catches double extensions like .htm.xhtml, .html.html, etc.
_html_exts_set = {'.html', '.xhtml', '.htm', '.xml'}
def _core_of(fn):
n = fn
if n.startswith('response_'):
n = n[9:]
while True:
b, e = os.path.splitext(n)
if e and e.lower() in _html_exts_set:
n = b
else:
break
return n.lower().strip()
target_core = _core_of(os.path.basename(rel_path))
if target_core:
try:
for f in os.listdir(self.output_dir):
if f.lower().endswith(('.html', '.htm', '.xhtml')) and os.path.isfile(os.path.join(self.output_dir, f)):
if _core_of(f) == target_core:
return True
except Exception:
pass
return False
if not _path_exists_with_fallback(target_base):
missing += 1
continue
target_href = target_base
if frag:
target_href = f"{target_href}#{frag}"
try:
toc_links.append(epub.Link(target_href, label or os.path.basename(target_base), f"toc_{idx}"))
except Exception:
# If Link construction fails for any reason, skip
missing += 1
# Preserve any non-chapter TOC entries we added (e.g., gallery)
extras = []
for it in existing_toc:
try:
if hasattr(it, 'file_name') and os.path.basename(it.file_name).lower() == 'gallery.xhtml':
extras.append(it)
except Exception:
continue
if missing:
self.log(f"⚠️ toc.ncx mapping: skipped {missing} entry(ies) that couldn't be matched to output chapters")
self.log(f"✅ Built TOC from source toc.ncx: {len(toc_links)} entries")
return toc_links + extras
def _create_nav_content(self, toc_items, book_title="Book"):
"""Create navigation content manually"""
# Use the same primary language as the rest of the book for nav.xhtml
# We read from XHTMLConverter.DEFAULT_LANG, which is kept in sync with book language.
lang = getattr(XHTMLConverter, "DEFAULT_LANG", "en")
nav_content = f'''
Table of Contents
Table of Contents
'''
# The toc_items are already sorted properly by _finalize_book
# Don't re-sort them here - just use them as-is
for item in toc_items:
href = None
title = None
# EpubHtml
if hasattr(item, 'title') and hasattr(item, 'file_name'):
href = item.file_name
title = item.title
# epub.Link
elif hasattr(item, 'title') and hasattr(item, 'href'):
href = item.href
title = item.title
if href and title is not None:
nav_content += f'\n{ContentProcessor.safe_escape(str(title))} '
nav_content += '''
'''
return nav_content
def _get_order_from_progress_file(self, progress_file: str) -> Dict[str, int]:
"""Get chapter order from translation_progress.json
Returns dict mapping original_filename -> chapter_number
"""
try:
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
filename_to_order = {}
# Extract chapter order from progress data
chapters = progress_data.get('chapters', {})
for chapter_key, chapter_info in chapters.items():
# Get the original basename from progress data
original_basename = chapter_info.get('original_basename', '')
if original_basename:
# Map to chapter position (key is usually the chapter number)
try:
chapter_num = int(chapter_key)
filename_to_order[original_basename] = chapter_num - 1 # Convert to 0-based
self.log(f" Progress mapping: {original_basename} -> Chapter {chapter_num}")
except (ValueError, TypeError):
pass
return filename_to_order if filename_to_order else None
except Exception as e:
self.log(f"⚠️ Error reading translation_progress.json: {e}")
return None
def _finalize_book(self, book: epub.EpubBook, spine: List, toc: List,
cover_file: Optional[str]):
"""Finalize book structure"""
legacy = getattr(self, 'legacy_epub_structure', False)
# Check if we should use NCX-only
use_ncx_only = legacy or (os.environ.get('FORCE_NCX_ONLY', '0') == '1')
# Check if first item in spine is a cover
has_cover = False
cover_item = None
if spine and len(spine) > 0:
first_item = spine[0]
if hasattr(first_item, 'title') and first_item.title == "Cover":
has_cover = True
cover_item = first_item
spine = spine[1:] # Remove cover from spine temporarily
def _href_without_fragment(h: str) -> str:
if not h:
return ''
return h.split('#', 1)[0]
def _get_toc_href(it):
# EpubHtml
if hasattr(it, 'file_name'):
return getattr(it, 'file_name', '')
# epub.Link
if hasattr(it, 'href'):
return getattr(it, 'href', '')
return ''
# DEBUG: Log what we have before sorting (only if debug mode is enabled)
debug_mode_enabled = os.environ.get('DEBUG_MODE', '0') == '1'
if debug_mode_enabled:
self.log("\n[DEBUG] Before sorting TOC:")
self.log("Spine order:")
for idx, item in enumerate(spine):
if hasattr(item, 'file_name') and hasattr(item, 'title'):
self.log(f" Spine[{idx}]: {item.file_name} -> {item.title}")
self.log("TOC order:")
for idx, item in enumerate(toc):
href = _get_toc_href(item)
title = getattr(item, 'title', '') if hasattr(item, 'title') else ''
if href:
self.log(f" TOC[{idx}]: {href} -> {title}")
# CRITICAL FIX: Sort TOC to match spine order
# Create a mapping of target href to spine position
spine_order_full = {}
spine_order_base = {}
for idx, item in enumerate(spine):
if hasattr(item, 'file_name'):
full = getattr(item, 'file_name', '')
if not full:
continue
full_base = _href_without_fragment(full)
spine_order_full[full_base] = idx
spine_order_base[os.path.basename(full_base)] = idx
# Sort the TOC based on spine order
sorted_toc = []
unsorted_items = []
for toc_item in toc:
href = _href_without_fragment(_get_toc_href(toc_item))
if href and href in spine_order_full:
sorted_toc.append((spine_order_full[href], toc_item))
elif href and os.path.basename(href) in spine_order_base:
sorted_toc.append((spine_order_base[os.path.basename(href)], toc_item))
else:
# Items not in spine (like gallery) go at the end
unsorted_items.append(toc_item)
# Sort by spine position
sorted_toc.sort(key=lambda x: x[0])
# Extract just the items (remove the sort key)
final_toc = [item for _, item in sorted_toc]
# Add any unsorted items at the end (like gallery)
final_toc.extend(unsorted_items)
# Deduplicate TOC entries if enabled
dedup_enabled = os.environ.get('DEDUPLICATE_TOC', '0') == '1'
dedup_use_translated = os.environ.get('DEDUPLICATE_TOC_USE_TRANSLATED', '0') == '1'
if dedup_enabled:
seen_titles = set()
deduped_toc = []
removed = 0
for item in final_toc:
title = getattr(item, 'title', '') if hasattr(item, 'title') else ''
key = (title or '').strip()
if key and key in seen_titles:
removed += 1
continue
if key:
seen_titles.add(key)
deduped_toc.append(item)
if removed:
self.log(f"🔁 Deduplicated EPUB TOC: removed {removed} duplicate entries ({len(deduped_toc)} remaining)")
final_toc = deduped_toc
# DEBUG: Log after sorting (only if debug mode is enabled)
if debug_mode_enabled:
self.log("\nTOC order (after sorting to match spine):")
for idx, item in enumerate(final_toc):
href = _get_toc_href(item)
title = getattr(item, 'title', '') if hasattr(item, 'title') else ''
if href:
self.log(f" TOC[{idx}]: {href} -> {title}")
# Set the sorted TOC
book.toc = final_toc
# Add NCX
ncx = epub.EpubNcx()
book.add_item(ncx)
if use_ncx_only:
self.log(f"[INFO] NCX-only navigation forced - {len(final_toc)} chapters")
# Build final spine: Cover (if exists) → Chapters
final_spine = []
if has_cover:
final_spine.append(cover_item)
final_spine.extend(spine)
book.spine = final_spine
if legacy:
self.log("📖 Using EPUB2 (legacy OEBPS/Text structure)")
else:
self.log("📖 Using EPUB 3.3 with NCX navigation only")
if has_cover:
self.log("📖 Reading order: Cover → Chapters")
else:
self.log("📖 Reading order: Chapters")
else:
# Normal EPUB3 processing with Nav
self.log(f"[INFO] EPUB3 format - {len(final_toc)} chapters")
# Create Nav with manual content using SORTED TOC
nav = epub.EpubNav()
nav.content = self._create_nav_content(final_toc, book.title).encode('utf-8')
nav.uid = 'nav'
nav.file_name = 'nav.xhtml'
book.add_item(nav)
# Build final spine: Cover (if exists) → Nav → Chapters
final_spine = []
if has_cover:
final_spine.append(cover_item)
final_spine.append(nav)
final_spine.extend(spine)
book.spine = final_spine
self.log("📖 Using EPUB3 format with full navigation")
if has_cover:
self.log("📖 Reading order: Cover → Table of Contents → Chapters")
else:
self.log("📖 Reading order: Table of Contents → Chapters")
def _write_epub(self, book: epub.EpubBook, metadata: dict):
"""Write EPUB file with automatic format selection"""
import time
import threading
# Determine output filename
book_title = book.title
if book_title and book_title != os.path.basename(self.output_dir):
safe_filename = FileUtils.sanitize_filename(book_title, allow_unicode=True)
out_path = os.path.join(self.output_dir, f"{safe_filename}.epub")
else:
base_name = os.path.basename(self.output_dir)
out_path = os.path.join(self.output_dir, f"{base_name}.epub")
# Check stop flag before starting the write operation
if self.is_stopped():
self.log("🛑 EPUB write cancelled - stop requested before write started")
return
self.log(f"\n[DEBUG] Writing EPUB to: {out_path}")
# Test if we can write to the target file BEFORE generating EPUB
try:
# Try to open the file in write mode to detect if it's locked
with open(out_path, 'ab') as test_file:
pass # Just checking if we can open it
except PermissionError as e:
self.log(f"[ERROR] Cannot write to file - it may be opened in another program")
self.log(f"[ERROR] File: {out_path}")
self.log(f"[ERROR] Details: {e}")
raise Exception(f"File is locked or inaccessible: {out_path}") from e
except Exception as e:
self.log(f"[ERROR] Cannot access file for writing: {e}")
raise
self.log("⏳ Writing EPUB file... (this may take a while for large files)")
# Track elapsed time with periodic updates
start_time = time.time()
write_completed = threading.Event()
def progress_logger():
"""Log progress every 5 seconds during write"""
while not write_completed.is_set():
if write_completed.wait(5): # Wait 5 seconds or until completed
break
# Check stop flag during write
if self.is_stopped():
elapsed = time.time() - start_time
self.log(f"⏳ Still writing... ({elapsed:.0f}s elapsed) - Stop requested, will finish current write operation")
else:
elapsed = time.time() - start_time
self.log(f"⏳ Still writing... ({elapsed:.0f}s elapsed)")
# Start progress logger thread
logger_thread = threading.Thread(target=progress_logger, daemon=True)
logger_thread.start()
# Write as EPUB2 when legacy structure is enabled, otherwise EPUB3
try:
legacy = getattr(self, 'legacy_epub_structure', False)
opts = {'epub3': (not legacy)}
epub.write_epub(out_path, book, opts)
write_completed.set() # Signal completion
logger_thread.join(timeout=1) # Wait for logger to finish
# Check if stop was requested during write
if self.is_stopped():
elapsed = time.time() - start_time
if legacy:
self.log(f"[SUCCESS] Written as EPUB2 (took {elapsed:.1f}s) - Write completed before stop")
else:
self.log(f"[SUCCESS] Written as EPUB 3.3 (took {elapsed:.1f}s) - Write completed before stop")
self.log("🛑 Note: Stop was requested but write operation finished normally")
else:
elapsed = time.time() - start_time
if legacy:
self.log(f"[SUCCESS] Written as EPUB2 (took {elapsed:.1f}s)")
else:
self.log(f"[SUCCESS] Written as EPUB 3.3 (took {elapsed:.1f}s)")
except Exception as e:
self.log(f"[ERROR] Write failed: {e}")
raise
# Verify the file
if os.path.exists(out_path):
file_size = os.path.getsize(out_path)
if file_size > 0:
self.log(f"✅ EPUB created: {out_path}")
self.log(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")
if getattr(self, 'legacy_epub_structure', False):
self.log("📝 Format: EPUB2 (legacy OEBPS/Text structure)")
else:
self.log("📝 Format: EPUB 3.3")
else:
raise Exception("EPUB file is empty")
else:
raise Exception("EPUB file was not created")
def _show_summary(self, chapter_titles_info: Dict[int, Tuple[str, float, str]],
css_items: List[epub.EpubItem]):
"""Show compilation summary"""
if chapter_titles_info:
high = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf > 0.7)
medium = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if 0.4 < conf <= 0.7)
low = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf <= 0.4)
self.log(f"\n📊 Title Extraction Summary:")
self.log(f" • High confidence: {high} chapters")
self.log(f" • Medium confidence: {medium} chapters")
self.log(f" • Low confidence: {low} chapters")
if css_items:
self.log(f"\n✅ Successfully embedded {len(css_items)} CSS files")
# Gallery status
if os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1':
self.log("\n📷 Image Gallery: Disabled by user preference")
self.log("\n📱 Compatibility Notes:")
self.log(" • XHTML 1.1 compliant")
self.log(" • All tags properly closed")
self.log(" • Special characters escaped")
self.log(" • Extracted translated titles")
self.log(" • Enhanced entity decoding")
def _compress_images(self, processed_images: Dict[str, str], cover_file: Optional[str]) -> Tuple[Dict[str, str], Optional[str]]:
"""Compress images: convert to .webp, with configurable cover/GIF exclusion and quality"""
try:
from PIL import Image
except ImportError:
self.log("⚠️ Pillow not installed - image compression disabled. Install with: pip install Pillow")
return processed_images, cover_file
# Read compression settings
quality = int(os.environ.get('IMAGE_COMPRESSION_QUALITY', '80'))
exclude_cover = os.environ.get('EXCLUDE_COVER_COMPRESSION', '1') == '1'
exclude_gif = os.environ.get('EXCLUDE_GIF_COMPRESSION', '1') == '1'
self.log(f"\n🗜️ Compressing images (quality: {quality}%, exclude cover: {exclude_cover}, exclude GIF: {exclude_gif})...")
new_processed = {}
new_cover = cover_file
compressed_count = 0
skipped_count = 0
for original_name, safe_name in processed_images.items():
if self.is_stopped():
self.log("🛑 Image compression stopped by user")
break
img_path = os.path.join(self.images_dir, original_name)
if not os.path.isfile(img_path):
new_processed[original_name] = safe_name
continue
ext = os.path.splitext(original_name)[1].lower()
is_cover = (safe_name == cover_file)
is_gif = (ext == '.gif')
# Skip cover page if excluded
if is_cover and exclude_cover:
self.log(f" ⏭️ Skipping cover: {original_name}")
new_processed[original_name] = safe_name
skipped_count += 1
continue
# Skip GIF if excluded
if is_gif and exclude_gif:
self.log(f" ⏭️ Skipping GIF: {original_name}")
new_processed[original_name] = safe_name
skipped_count += 1
continue
try:
if is_gif:
# Compress GIF in place - optimize without changing format
im = Image.open(img_path)
if hasattr(im, 'n_frames') and im.n_frames > 1:
# Animated GIF: save with optimization
frames = []
try:
while True:
frames.append(im.copy())
im.seek(im.tell() + 1)
except EOFError:
pass
if frames:
frames[0].save(
img_path, save_all=True, append_images=frames[1:],
optimize=True, loop=im.info.get('loop', 0)
)
else:
# Static GIF: optimize
im.save(img_path, optimize=True)
im.close()
new_processed[original_name] = safe_name
compressed_count += 1
else:
# Convert to .webp
webp_name = os.path.splitext(safe_name)[0] + '.webp'
webp_path = os.path.join(self.images_dir, os.path.splitext(original_name)[0] + '.webp')
im = Image.open(img_path)
# Convert RGBA/P modes for webp compatibility
if im.mode in ('RGBA', 'LA') or (im.mode == 'P' and 'transparency' in im.info):
im = im.convert('RGBA')
elif im.mode != 'RGB':
im = im.convert('RGB')
im.save(webp_path, 'WEBP', quality=quality, method=4)
im.close()
# Remove original if webp was created successfully and is different file
if os.path.exists(webp_path) and webp_path != img_path:
try:
os.remove(img_path)
except Exception:
pass
# Keep original key so HTML image references still resolve correctly.
# _add_images_to_book and _create_cover_page have fallback logic to
# find the compressed .webp file on disk when the original is gone.
new_processed[original_name] = webp_name
compressed_count += 1
# Update cover reference if this image was the cover
if is_cover:
new_cover = webp_name
except Exception as e:
self.log(f" ⚠️ Failed to compress {original_name}: {e}")
new_processed[original_name] = safe_name
skipped_count += 1
self.log(f"✅ Image compression complete: {compressed_count} compressed, {skipped_count} skipped")
return new_processed, new_cover
def _generate_pdf(self, html_files: List[str], chapter_titles_info: Dict[int, Tuple[str, float, str]],
processed_images: Dict[str, str], cover_file: Optional[str], metadata: dict):
"""Generate PDF from the output folder using WeasyPrint"""
import time as _time
# Ensure fontconfig is available on Windows for WeasyPrint
import tempfile as _tempfile
if not os.environ.get("FONTCONFIG_FILE"):
_fc_dir = _tempfile.mkdtemp(prefix="fontconfig_")
_fc_path = os.path.join(_fc_dir, "fonts.conf")
if not os.path.exists(_fc_path):
with open(_fc_path, "w", encoding="utf-8") as _f:
_f.write('\n\n'
'WINDOWSFONTDIR '
'~/.cache/fontconfig \n')
os.environ["FONTCONFIG_FILE"] = _fc_path
os.environ["FONTCONFIG_PATH"] = _fc_dir
os.environ["FC_CONFIG_FILE"] = _fc_path
try:
from weasyprint import HTML as WeasyHTML
except ImportError:
self.log("⚠️ WeasyPrint not installed - PDF generation disabled. Install with: pip install weasyprint")
self.log(" Also requires GTK libraries. See: https://doc.courtbouillon.org/weasyprint/stable/first_steps.html")
return
self.log("\n📄 Generating PDF...")
start_time = _time.time()
settings = {
'page_numbers': os.environ.get('PDF_PAGE_NUMBERS', '1') == '1',
'page_number_alignment': os.environ.get('PDF_PAGE_NUMBER_ALIGNMENT', 'center'),
'toc': os.environ.get('PDF_GENERATE_TOC', '1') == '1',
'toc_numbers': os.environ.get('PDF_TOC_PAGE_NUMBERS', '1') == '1',
}
# Determine PDF output path
book_title = metadata.get('title', os.path.basename(self.output_dir))
safe_title = FileUtils.sanitize_filename(book_title, allow_unicode=True)
pdf_path = os.path.join(self.output_dir, f"{safe_title}.pdf")
self.log(f" PDF output: {pdf_path}")
self.log(f" Settings: page_numbers={settings['page_numbers']}, toc={settings['toc']}, "
f"toc_numbers={settings['toc_numbers']}, alignment={settings['page_number_alignment']}")
compression_enabled = os.environ.get('ENABLE_IMAGE_COMPRESSION', '0') == '1'
if compression_enabled:
pdf_img_fmt = os.environ.get('PDF_IMAGE_FORMAT', 'jpeg').upper()
if pdf_img_fmt == 'PNG':
img_detail = f"format=PNG"
else:
img_quality = os.environ.get('IMAGE_COMPRESSION_QUALITY', '80')
img_detail = f"format=JPEG, quality={img_quality}"
self.log(f" Image: compression=enabled, {img_detail}")
else:
self.log(f" Image: compression=disabled")
import base64
import re
# Build image lookup from images directory
_mime_fallback = {
'.webp': 'image/webp', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp',
'.svg': 'image/svg+xml',
}
images_by_name = {}
if os.path.isdir(self.images_dir):
for fname in os.listdir(self.images_dir):
fpath = os.path.join(self.images_dir, fname)
if os.path.isfile(fpath):
ctype, _ = mimetypes.guess_type(fpath)
if not ctype:
ctype = _mime_fallback.get(os.path.splitext(fname)[1].lower())
if ctype and ctype.startswith('image/'):
images_by_name[fname] = (fpath, ctype)
# Also index without extension for flexible matching
base = os.path.splitext(fname)[0]
images_by_name[base] = (fpath, ctype)
def _read_image_as_pdf_compatible(fpath, ctype):
"""Read image bytes, converting webp to JPEG/PNG since WeasyPrint does not support webp.
When ENABLE_IMAGE_COMPRESSION is off, converts to lossless PNG to avoid quality loss.
When enabled, respects PDF_IMAGE_FORMAT, IMAGE_COMPRESSION_QUALITY, and PDF_PNG_OPTIMIZE settings."""
if ctype == 'image/webp':
try:
from PIL import Image
from io import BytesIO
_compression_on = os.environ.get('ENABLE_IMAGE_COMPRESSION', '0') == '1'
with Image.open(fpath) as img:
buf = BytesIO()
if not _compression_on:
# Compression disabled: convert to lossless PNG
if img.mode not in ('RGB', 'RGBA', 'L', 'LA'):
img = img.convert('RGBA')
img.save(buf, format='PNG')
return buf.getvalue(), 'image/png'
# Compression enabled: respect user format/quality settings
pdf_fmt = os.environ.get('PDF_IMAGE_FORMAT', 'jpeg').lower()
if pdf_fmt == 'png':
# PNG: preserves transparency
optimize = os.environ.get('PDF_PNG_OPTIMIZE', '1') == '1'
compress_level = int(os.environ.get('PDF_PNG_COMPRESS_LEVEL', '6'))
if img.mode not in ('RGB', 'RGBA', 'L', 'LA'):
img = img.convert('RGBA')
img.save(buf, format='PNG', optimize=optimize, compress_level=compress_level)
return buf.getvalue(), 'image/png'
else:
# JPEG: lossy with quality control, flatten transparency
quality = int(os.environ.get('IMAGE_COMPRESSION_QUALITY', '80'))
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode != 'RGBA':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[3])
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
img.save(buf, format='JPEG', quality=quality, optimize=True)
return buf.getvalue(), 'image/jpeg'
except Exception:
pass
with open(fpath, 'rb') as f:
return f.read(), ctype
def _data_uri_for_src(src_value):
"""Convert image src to data URI"""
if not src_value or src_value.startswith('data:'):
return None
from urllib.parse import unquote
raw = unquote(src_value).replace('\\', '/')
filename = os.path.basename(raw)
# Try exact match, then basename
for key in [raw, filename, os.path.splitext(filename)[0]]:
if key in images_by_name:
fpath, ctype = images_by_name[key]
try:
img_bytes, final_ctype = _read_image_as_pdf_compatible(fpath, ctype)
b64 = base64.b64encode(img_bytes).decode('utf-8')
return f"data:{final_ctype};base64,{b64}"
except Exception:
pass
return None
def _embed_images_in_html(content):
"""Replace image src attributes with data URIs"""
def replace_attr(match):
attr = match.group(1)
quote = match.group(2)
src_value = match.group(3)
data_uri = _data_uri_for_src(src_value)
if data_uri:
return f'{attr}={quote}{data_uri}{quote}'
return match.group(0)
return re.sub(
r'(\b(?:src|href|xlink:href)\b)\s*=\s*([\'"])([^\'"]+)\2',
replace_attr, content, flags=re.IGNORECASE
)
def _extract_body_content(html_content):
"""Extract just the inner content from a full HTML document to avoid nested HTML."""
body_match = re.search(r']*>(.*)', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
return body_match.group(1)
return html_content
def _rewrite_epub_hrefs(content, epub_file_map):
"""Convert EPUB relative file hrefs to PDF internal anchors.
Uses #chapter-N (always exists) when the file is in the map.
Falls back to the raw #fragment only when the file is unknown.
"""
def _replace(m):
val = m.group(1)
# Leave absolute URLs, data URIs, and already-internal anchors unchanged
low = val.lower()
if low.startswith(('#', 'data:', 'http://', 'https://', 'mailto:', 'tel:')):
return m.group(0)
# Must look like a relative file reference
if not any(c in val for c in ('/', '.xhtml', '.html', '.htm')):
return m.group(0)
# Extract file basename (strip fragment and query string)
file_part = val.split('#')[0].split('?')[0]
basename = os.path.basename(file_part)
# Prefer the guaranteed #chapter-N anchor from the map
anchor = epub_file_map.get(basename)
if anchor:
return f'href="{anchor}"'
# File not in map — fall back to raw fragment if present
if '#' in val:
frag = val.split('#', 1)[1]
if frag:
return f'href="#{frag}"'
return m.group(0)
return re.sub(r'href="([^"]*)"', _replace, content, flags=re.IGNORECASE)
# Collect CSS
styles = ""
if os.path.isdir(self.css_dir):
for css_file in sorted(os.listdir(self.css_dir)):
if css_file.endswith('.css'):
try:
with open(os.path.join(self.css_dir, css_file), 'r', encoding='utf-8') as f:
styles += f.read() + "\n"
except Exception:
pass
# Override any @page rules from EPUB CSS that may set margin: 0 (which clips page number footers)
styles += " @page { margin: 15mm; } "
# Add image styles for PDF rendering
styles += " img { max-width: 100%; height: auto; display: block; margin: 10px auto; } "
# Suppress all heading-generated sidebar bookmarks; only .pdf-bm elements create them
styles += " h1, h2, h3, h4, h5, h6 { bookmark-level: none; } "
styles += " h1.pdf-bm { bookmark-level: 1; font-size: 0 !important; line-height: 0 !important; margin: 0 !important; padding: 0 !important; height: 0 !important; overflow: hidden !important; } "
# Page number CSS
alignment = settings['page_number_alignment']
page_position = f'@bottom-{alignment}' if alignment != 'center' else '@bottom-center'
if settings['page_numbers']:
styles += f" @page {{ {page_position} {{ content: counter(page); color: rgba(0,0,0,0.4); font-size: 10pt; }} }} "
# Deduplication settings (match EPUB TOC behaviour)
_dedup_enabled = os.environ.get('DEDUPLICATE_TOC', '0') == '1'
_dedup_use_translated = os.environ.get('DEDUPLICATE_TOC_USE_TRANSLATED', '0') == '1'
_seen_bm_titles = set()
# Process chapters
documents = []
chapter_page_map = {}
current_page = 0
# Render cover page first (TOC goes after cover)
if cover_file:
try:
# Find the actual cover file in the images directory
cover_path = None
for fname in os.listdir(self.images_dir):
fpath = os.path.join(self.images_dir, fname)
if os.path.isfile(fpath):
# Match by safe name or by base name
if fname == cover_file or os.path.splitext(fname)[0] == os.path.splitext(cover_file)[0]:
cover_path = fpath
break
if cover_path:
ctype, _ = mimetypes.guess_type(cover_path)
if not ctype:
ctype = _mime_fallback.get(os.path.splitext(cover_path)[1].lower())
if ctype and ctype.startswith('image/'):
cover_bytes, cover_ctype = _read_image_as_pdf_compatible(cover_path, ctype)
b64 = base64.b64encode(cover_bytes).decode('utf-8')
cover_data_uri = f"data:{cover_ctype};base64,{b64}"
cover_html = (
f''
f' '
f''
)
cover_doc = WeasyHTML(string=cover_html, base_url=self.output_dir).render()
documents.append(cover_doc)
current_page += len(cover_doc.pages)
self.log(f" ✅ Added cover page to PDF")
except Exception as e:
self.log(f" ⚠️ Failed to add cover to PDF: {e}")
# TOC dry run to estimate page count (after cover, before chapters)
toc_page_count = 0
toc_insert_index = len(documents) # Insert TOC after whatever is already in documents (cover)
if settings['toc']:
self.log(" Calculating TOC size...")
dummy_toc_html = self._build_pdf_toc_html(chapter_titles_info, settings, None)
# TOC page should not have page numbers
toc_css = ""
dummy_toc_html = dummy_toc_html.replace("", f"{toc_css}")
try:
toc_doc = WeasyHTML(string=dummy_toc_html, base_url=self.output_dir).render()
toc_page_count = len(toc_doc.pages)
except Exception as e:
self.log(f" ⚠️ TOC size estimation failed: {e}")
toc_page_count = 1
current_page += toc_page_count
self.log(f" Estimated TOC: {toc_page_count} pages")
# Create mapping from source filename to chapter number for TOC
source_to_chapter = {}
for chap_num, (title, conf, source) in chapter_titles_info.items():
if source:
source_to_chapter[source] = chap_num
# Build EPUB filename -> PDF chapter anchor map from the source EPUB OPF spine
epub_file_map = {}
_epub_path = os.environ.get('EPUB_PATH', '')
if _epub_path and os.path.exists(_epub_path):
try:
import zipfile as _zf_mod
with _zf_mod.ZipFile(_epub_path) as _zf:
_container = _zf.read('META-INF/container.xml').decode('utf-8', errors='replace')
_opf_m = re.search(r'full-path="([^"]+)"', _container)
if _opf_m:
_opf_txt = _zf.read(_opf_m.group(1)).decode('utf-8', errors='replace')
_manifest = {m.group(1): m.group(2)
for m in re.finditer(r'- ]*\bid="([^"]+)"[^>]*\bhref="([^"]+)"', _opf_txt)}
_spine_hrefs = []
for _iid in re.findall(r'
]*\bidref="([^"]+)"', _opf_txt):
_href = _manifest.get(_iid, '')
if not _href.lower().endswith(('.xhtml', '.html', '.htm')):
continue
_pm = re.search(rf'- ]*\bid="{re.escape(_iid)}"[^>]*\bproperties="([^"]*)"', _opf_txt)
if _pm and 'nav' in _pm.group(1):
continue
_spine_hrefs.append(_href)
for _si, _sh in enumerate(_spine_hrefs):
if _si < len(html_files):
_hf = html_files[_si]
_cn = source_to_chapter.get(_hf, _si)
epub_file_map[os.path.basename(_sh)] = f'#chapter-{_cn}'
except Exception as _em:
self.log(f" ⚠️ Could not build EPUB href map: {_em}")
# Build all chapters as a single HTML document for continuous page numbering
self.log(f" Building combined chapter document ({len(html_files)} chapters)...")
all_chapters_html = ""
chapters_order = [] # Track chapter order for page mapping
for i, html_file in enumerate(html_files):
if self.is_stopped():
self.log("🛑 PDF generation stopped by user")
return
file_path = os.path.join(self.output_dir, html_file)
if not os.path.exists(file_path):
continue
# Determine chapter number (use i as fallback since chapters are 0-indexed)
chap_num = source_to_chapter.get(html_file, i)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
content = _embed_images_in_html(content)
# Extract just the body content to avoid nested structures
body_content = _extract_body_content(content)
# Convert EPUB-style relative hrefs to PDF internal anchors
body_content = _rewrite_epub_hrefs(body_content, epub_file_map)
# Add anchor ID for TOC linking and page break before each chapter (except first)
_bm_title = chapter_titles_info.get(chap_num, ('', 0, ''))[0]
# Skip untitled placeholders (matches EPUB TOC filtering)
if _bm_title and _bm_title.strip().lower() in ('untitled chapter', 'untitled'):
_bm_title = ''
# Deduplicate bookmark titles when enabled
if _bm_title and _dedup_enabled:
_bm_key = _bm_title if _dedup_use_translated else chapter_titles_info.get(chap_num, ('', 0, ''))[0]
if _bm_key in _seen_bm_titles:
_bm_title = ''
else:
_seen_bm_titles.add(_bm_key)
_bm_h1 = (f'
{html_module.escape(str(_bm_title))} '
if _bm_title else '')
if i > 0:
body_content = f'{_bm_h1}{body_content}
'
else:
body_content = f'{_bm_h1}{body_content}
'
# Add to combined HTML
all_chapters_html += body_content
chapters_order.append((html_file, chap_num))
if (i + 1) % 10 == 0 or (i + 1) == len(html_files):
self.log(f" [{i+1}/{len(html_files)}] Added to document")
except Exception as e:
self.log(f" ⚠️ Failed to process {html_file}: {e}")
# Render all chapters as a single document
if all_chapters_html:
self.log(" Rendering combined chapter document...")
combined_html = f"{all_chapters_html}"
try:
chapters_doc = WeasyHTML(string=combined_html, base_url=self.output_dir).render()
documents.append(chapters_doc)
# Resolve accurate page numbers using WeasyPrint's Page.anchors.
# Each chapter div has id="chapter-{chap_num}"; iterate rendered
# pages and find which page each anchor lands on (1-indexed).
_anchor_to_page = {}
for _pidx, _pg in enumerate(chapters_doc.pages):
for _aid in _pg.anchors:
if _aid not in _anchor_to_page:
_anchor_to_page[_aid] = _pidx + 1
_fallback_page = 1
for _html_file, _chap_num in chapters_order:
_pg_num = _anchor_to_page.get(f'chapter-{_chap_num}')
if _pg_num is not None:
chapter_page_map[_html_file] = _pg_num
chapter_page_map[_chap_num] = _pg_num
_fallback_page = _pg_num + 1
else:
chapter_page_map[_html_file] = _fallback_page
chapter_page_map[_chap_num] = _fallback_page
_fallback_page += 1
current_page += len(chapters_doc.pages)
self.log(f" Combined document: {len(chapters_doc.pages)} pages")
except Exception as e:
self.log(f" ⚠️ Failed to render combined document: {e}")
if not documents:
self.log("⚠️ No chapters rendered for PDF")
return
# Generate final TOC with real page numbers
if settings['toc'] and chapter_titles_info:
self.log(" Generating TOC with page numbers...")
real_toc_html = self._build_pdf_toc_html(chapter_titles_info, settings, chapter_page_map)
# TOC page should not have page numbers
toc_css = ""
real_toc_html = real_toc_html.replace("", f"{toc_css}")
try:
toc_doc = WeasyHTML(string=real_toc_html, base_url=self.output_dir).render()
documents.insert(toc_insert_index, toc_doc)
except Exception as e:
self.log(f" ⚠️ TOC generation failed: {e}")
# Merge and write PDF
self.log(" Merging all pages...")
all_pages = [page for doc in documents for page in doc.pages]
self.log(f" Total pages: {len(all_pages)}")
self.log(" Writing PDF to disk...")
try:
# When image compression is disabled, tell WeasyPrint to write
# an uncompressed PDF so it does not re-encode images to lossy
# JPEG internally (which would shrink a 10 MB cover to ~2 MB).
_pdf_write_kwargs = {}
if compression_enabled:
_pdf_quality = int(os.environ.get('IMAGE_COMPRESSION_QUALITY', '80'))
_pdf_write_kwargs['image_quality'] = _pdf_quality
self.log(f" PDF image quality: {_pdf_quality}%")
else:
_pdf_write_kwargs['uncompressed_pdf'] = True
self.log(" PDF images: uncompressed (preserving original quality)")
documents[0].copy(all_pages).write_pdf(pdf_path, **_pdf_write_kwargs)
except BaseException as e:
import traceback
self.log(f" ❌ write_pdf failed: {type(e).__name__}: {e}")
self.log(f" [DEBUG] {traceback.format_exc()}")
raise
elapsed = _time.time() - start_time
if os.path.exists(pdf_path):
file_size = os.path.getsize(pdf_path)
self.log(f"✅ PDF created: {pdf_path}")
self.log(f"📊 PDF size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")
self.log(f"⏱️ PDF generation took {elapsed:.1f}s")
else:
self.log("❌ PDF file was not created")
def _build_pdf_toc_html(self, chapter_titles_info: Dict[int, Tuple[str, float, str]],
settings: dict, chapter_page_map: Optional[Dict[str, int]]) -> str:
"""Build HTML for PDF table of contents with clickable links and page numbers"""
import html as _html
toc_html = (
''
'Table of Contents '
)
_dedup_on = os.environ.get('DEDUPLICATE_TOC', '0') == '1'
_dedup_translated = os.environ.get('DEDUPLICATE_TOC_USE_TRANSLATED', '0') == '1'
_seen_titles = set()
for chap_num in sorted(chapter_titles_info.keys()):
title, confidence, source = chapter_titles_info[chap_num]
# Skip untitled placeholders (matches EPUB TOC filtering)
if title.strip().lower() in ('untitled chapter', 'untitled'):
continue
# Deduplicate when enabled
if _dedup_on:
if title in _seen_titles:
continue
_seen_titles.add(title)
safe_title = _html.escape(title)
# Get page number - try chapter number key directly first
page_num = ""
if settings.get('toc_numbers') and chapter_page_map:
# Try chapter number directly (stored as integer key)
page_num = chapter_page_map.get(chap_num, "")
# Fallback to source filename
if not page_num and source:
page_num = chapter_page_map.get(source, "")
page_num = str(page_num) if page_num else ""
page_span = f'{page_num} ' if page_num else ''
# Create clickable link
toc_html += f'{safe_title} {page_span} '
toc_html += ' '
return toc_html
# Main entry point
def compile_epub(base_dir: str, log_callback: Optional[Callable] = None):
"""Compile translated HTML files into EPUB"""
# Reset stop flag for new compilation
set_stop_flag(False)
compiler = EPUBCompiler(base_dir, log_callback)
compiler.compile()
# Legacy alias
fallback_compile_epub = compile_epub
if __name__ == "__main__":
from shutdown_utils import run_cli_main
def _main():
if len(sys.argv) < 2:
print("Usage: python epub_converter.py ")
return 1
directory_path = sys.argv[1]
try:
compile_epub(directory_path)
except Exception as e:
print(f"Error: {e}")
return 1
return 0
run_cli_main(_main)