
* refactor: upgrade BeautifulSoup4 with type hints Upgrade dependency library BeautifulSoup4 to 4.13.3 (with type hints). Refactor backends using BeautifulSoup4 to comply with type hints. Apply style simplifications and improvements for consistency. Remove variables and functions that are never used. Remove code duplication between backends for parsing HTML tables. Signed-off-by: Cesar Berrospi Ramis <75900930+ceberam@users.noreply.github.com> * build: allow beautifulsoup4 version 4.12.3 Allow older version of beautifulsoup4 and ensure compatibility. Update library dependencies. Signed-off-by: Cesar Berrospi Ramis <75900930+ceberam@users.noreply.github.com> --------- Signed-off-by: Cesar Berrospi Ramis <75900930+ceberam@users.noreply.github.com>
446 lines
16 KiB
Python
446 lines
16 KiB
Python
import logging
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Optional, Union, cast
|
|
|
|
from bs4 import BeautifulSoup, NavigableString, PageElement, Tag
|
|
from docling_core.types.doc import (
|
|
DocItemLabel,
|
|
DoclingDocument,
|
|
DocumentOrigin,
|
|
GroupLabel,
|
|
TableCell,
|
|
TableData,
|
|
)
|
|
from typing_extensions import override
|
|
|
|
from docling.backend.abstract_backend import DeclarativeDocumentBackend
|
|
from docling.datamodel.base_models import InputFormat
|
|
from docling.datamodel.document import InputDocument
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class HTMLDocumentBackend(DeclarativeDocumentBackend):
|
|
@override
|
|
def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]):
|
|
super().__init__(in_doc, path_or_stream)
|
|
_log.debug("About to init HTML backend...")
|
|
self.soup: Optional[Tag] = None
|
|
# HTML file:
|
|
self.path_or_stream = path_or_stream
|
|
# Initialise the parents for the hierarchy
|
|
self.max_levels = 10
|
|
self.level = 0
|
|
self.parents = {} # type: ignore
|
|
for i in range(0, self.max_levels):
|
|
self.parents[i] = None
|
|
self.labels = {} # type: ignore
|
|
|
|
try:
|
|
if isinstance(self.path_or_stream, BytesIO):
|
|
text_stream = self.path_or_stream.getvalue()
|
|
self.soup = BeautifulSoup(text_stream, "html.parser")
|
|
if isinstance(self.path_or_stream, Path):
|
|
with open(self.path_or_stream, "rb") as f:
|
|
html_content = f.read()
|
|
self.soup = BeautifulSoup(html_content, "html.parser")
|
|
except Exception as e:
|
|
raise RuntimeError(
|
|
f"Could not initialize HTML backend for file with hash {self.document_hash}."
|
|
) from e
|
|
|
|
@override
|
|
def is_valid(self) -> bool:
|
|
return self.soup is not None
|
|
|
|
@classmethod
|
|
@override
|
|
def supports_pagination(cls) -> bool:
|
|
return False
|
|
|
|
@override
|
|
def unload(self):
|
|
if isinstance(self.path_or_stream, BytesIO):
|
|
self.path_or_stream.close()
|
|
|
|
self.path_or_stream = None
|
|
|
|
@classmethod
|
|
@override
|
|
def supported_formats(cls) -> set[InputFormat]:
|
|
return {InputFormat.HTML}
|
|
|
|
@override
|
|
def convert(self) -> DoclingDocument:
|
|
# access self.path_or_stream to load stuff
|
|
origin = DocumentOrigin(
|
|
filename=self.file.name or "file",
|
|
mimetype="text/html",
|
|
binary_hash=self.document_hash,
|
|
)
|
|
|
|
doc = DoclingDocument(name=self.file.stem or "file", origin=origin)
|
|
_log.debug("Trying to convert HTML...")
|
|
|
|
if self.is_valid():
|
|
assert self.soup is not None
|
|
content = self.soup.body or self.soup
|
|
# Replace <br> tags with newline characters
|
|
for br in content("br"):
|
|
br.replace_with(NavigableString("\n"))
|
|
self.walk(content, doc)
|
|
else:
|
|
raise RuntimeError(
|
|
f"Cannot convert doc with {self.document_hash} because the backend failed to init."
|
|
)
|
|
return doc
|
|
|
|
def walk(self, tag: Tag, doc: DoclingDocument) -> None:
|
|
# Iterate over elements in the body of the document
|
|
for element in tag.children:
|
|
if isinstance(element, Tag):
|
|
try:
|
|
self.analyze_tag(cast(Tag, element), doc)
|
|
except Exception as exc_child:
|
|
_log.error(
|
|
f"Error processing child from tag{tag.name}: {exc_child}"
|
|
)
|
|
raise exc_child
|
|
|
|
return
|
|
|
|
def analyze_tag(self, tag: Tag, doc: DoclingDocument) -> None:
|
|
if tag.name in self.labels:
|
|
self.labels[tag.name] += 1
|
|
else:
|
|
self.labels[tag.name] = 1
|
|
|
|
if tag.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
|
|
self.handle_header(tag, doc)
|
|
elif tag.name in ["p"]:
|
|
self.handle_paragraph(tag, doc)
|
|
elif tag.name in ["pre"]:
|
|
self.handle_code(tag, doc)
|
|
elif tag.name in ["ul", "ol"]:
|
|
self.handle_list(tag, doc)
|
|
elif tag.name in ["li"]:
|
|
self.handle_list_item(tag, doc)
|
|
elif tag.name == "table":
|
|
self.handle_table(tag, doc)
|
|
elif tag.name == "figure":
|
|
self.handle_figure(tag, doc)
|
|
elif tag.name == "img":
|
|
self.handle_image(doc)
|
|
else:
|
|
self.walk(tag, doc)
|
|
|
|
def get_text(self, item: PageElement) -> str:
|
|
"""Get the text content of a tag."""
|
|
parts: list[str] = self.extract_text_recursively(item)
|
|
|
|
return "".join(parts) + " "
|
|
|
|
# Function to recursively extract text from all child nodes
|
|
def extract_text_recursively(self, item: PageElement) -> list[str]:
|
|
result: list[str] = []
|
|
|
|
if isinstance(item, NavigableString):
|
|
return [item]
|
|
|
|
tag = cast(Tag, item)
|
|
if tag.name not in ["ul", "ol"]:
|
|
for child in tag:
|
|
# Recursively get the child's text content
|
|
result.extend(self.extract_text_recursively(child))
|
|
|
|
return ["".join(result) + " "]
|
|
|
|
def handle_header(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles header tags (h1, h2, etc.)."""
|
|
hlevel = int(element.name.replace("h", ""))
|
|
text = element.text.strip()
|
|
|
|
if hlevel == 1:
|
|
for key, val in self.parents.items():
|
|
self.parents[key] = None
|
|
|
|
self.level = 1
|
|
self.parents[self.level] = doc.add_text(
|
|
parent=self.parents[0], label=DocItemLabel.TITLE, text=text
|
|
)
|
|
else:
|
|
if hlevel > self.level:
|
|
|
|
# add invisible group
|
|
for i in range(self.level + 1, hlevel):
|
|
self.parents[i] = doc.add_group(
|
|
name=f"header-{i}",
|
|
label=GroupLabel.SECTION,
|
|
parent=self.parents[i - 1],
|
|
)
|
|
self.level = hlevel
|
|
|
|
elif hlevel < self.level:
|
|
|
|
# remove the tail
|
|
for key in self.parents.keys():
|
|
if key > hlevel:
|
|
self.parents[key] = None
|
|
self.level = hlevel
|
|
|
|
self.parents[hlevel] = doc.add_heading(
|
|
parent=self.parents[hlevel - 1],
|
|
text=text,
|
|
level=hlevel,
|
|
)
|
|
|
|
def handle_code(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles monospace code snippets (pre)."""
|
|
if element.text is None:
|
|
return
|
|
text = element.text.strip()
|
|
if text:
|
|
doc.add_code(parent=self.parents[self.level], text=text)
|
|
|
|
def handle_paragraph(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles paragraph tags (p)."""
|
|
if element.text is None:
|
|
return
|
|
text = element.text.strip()
|
|
label = DocItemLabel.PARAGRAPH
|
|
if text:
|
|
doc.add_text(parent=self.parents[self.level], label=label, text=text)
|
|
|
|
def handle_list(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles list tags (ul, ol) and their list items."""
|
|
|
|
if element.name == "ul":
|
|
# create a list group
|
|
self.parents[self.level + 1] = doc.add_group(
|
|
parent=self.parents[self.level], name="list", label=GroupLabel.LIST
|
|
)
|
|
elif element.name == "ol":
|
|
# create a list group
|
|
self.parents[self.level + 1] = doc.add_group(
|
|
parent=self.parents[self.level],
|
|
name="ordered list",
|
|
label=GroupLabel.ORDERED_LIST,
|
|
)
|
|
self.level += 1
|
|
|
|
self.walk(element, doc)
|
|
|
|
self.parents[self.level + 1] = None
|
|
self.level -= 1
|
|
|
|
def handle_list_item(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles listitem tags (li)."""
|
|
nested_list = element.find(["ul", "ol"])
|
|
|
|
parent_list_label = self.parents[self.level].label
|
|
index_in_list = len(self.parents[self.level].children) + 1
|
|
|
|
if nested_list:
|
|
# Text in list item can be hidden within hierarchy, hence
|
|
# we need to extract it recursively
|
|
text: str = self.get_text(element)
|
|
# Flatten text, remove break lines:
|
|
text = text.replace("\n", "").replace("\r", "")
|
|
text = " ".join(text.split()).strip()
|
|
|
|
marker = ""
|
|
enumerated = False
|
|
if parent_list_label == GroupLabel.ORDERED_LIST:
|
|
marker = str(index_in_list)
|
|
enumerated = True
|
|
|
|
if len(text) > 0:
|
|
# create a list-item
|
|
self.parents[self.level + 1] = doc.add_list_item(
|
|
text=text,
|
|
enumerated=enumerated,
|
|
marker=marker,
|
|
parent=self.parents[self.level],
|
|
)
|
|
self.level += 1
|
|
|
|
self.walk(element, doc)
|
|
|
|
self.parents[self.level + 1] = None
|
|
self.level -= 1
|
|
|
|
elif element.text.strip():
|
|
text = element.text.strip()
|
|
|
|
marker = ""
|
|
enumerated = False
|
|
if parent_list_label == GroupLabel.ORDERED_LIST:
|
|
marker = f"{str(index_in_list)}."
|
|
enumerated = True
|
|
doc.add_list_item(
|
|
text=text,
|
|
enumerated=enumerated,
|
|
marker=marker,
|
|
parent=self.parents[self.level],
|
|
)
|
|
else:
|
|
_log.warning(f"list-item has no text: {element}")
|
|
|
|
@staticmethod
|
|
def parse_table_data(element: Tag) -> Optional[TableData]:
|
|
nested_tables = element.find("table")
|
|
if nested_tables is not None:
|
|
_log.warning("Skipping nested table.")
|
|
return None
|
|
|
|
# Count the number of rows (number of <tr> elements)
|
|
num_rows = len(element("tr"))
|
|
|
|
# Find the number of columns (taking into account colspan)
|
|
num_cols = 0
|
|
for row in element("tr"):
|
|
col_count = 0
|
|
if not isinstance(row, Tag):
|
|
continue
|
|
for cell in row(["td", "th"]):
|
|
if not isinstance(row, Tag):
|
|
continue
|
|
val = cast(Tag, cell).get("colspan", "1")
|
|
colspan = int(val) if (isinstance(val, str) and val.isnumeric()) else 1
|
|
col_count += colspan
|
|
num_cols = max(num_cols, col_count)
|
|
|
|
grid: list = [[None for _ in range(num_cols)] for _ in range(num_rows)]
|
|
|
|
data = TableData(num_rows=num_rows, num_cols=num_cols, table_cells=[])
|
|
|
|
# Iterate over the rows in the table
|
|
for row_idx, row in enumerate(element("tr")):
|
|
if not isinstance(row, Tag):
|
|
continue
|
|
|
|
# For each row, find all the column cells (both <td> and <th>)
|
|
cells = row(["td", "th"])
|
|
|
|
# Check if each cell in the row is a header -> means it is a column header
|
|
col_header = True
|
|
for html_cell in cells:
|
|
if isinstance(html_cell, Tag) and html_cell.name == "td":
|
|
col_header = False
|
|
|
|
# Extract the text content of each cell
|
|
col_idx = 0
|
|
for html_cell in cells:
|
|
if not isinstance(html_cell, Tag):
|
|
continue
|
|
|
|
# extract inline formulas
|
|
for formula in html_cell("inline-formula"):
|
|
math_parts = formula.text.split("$$")
|
|
if len(math_parts) == 3:
|
|
math_formula = f"$${math_parts[1]}$$"
|
|
formula.replace_with(NavigableString(math_formula))
|
|
|
|
# TODO: extract content correctly from table-cells with lists
|
|
text = html_cell.text
|
|
|
|
# label = html_cell.name
|
|
col_val = html_cell.get("colspan", "1")
|
|
col_span = (
|
|
int(col_val)
|
|
if isinstance(col_val, str) and col_val.isnumeric()
|
|
else 1
|
|
)
|
|
row_val = html_cell.get("rowspan", "1")
|
|
row_span = (
|
|
int(row_val)
|
|
if isinstance(row_val, str) and row_val.isnumeric()
|
|
else 1
|
|
)
|
|
|
|
while grid[row_idx][col_idx] is not None:
|
|
col_idx += 1
|
|
for r in range(row_span):
|
|
for c in range(col_span):
|
|
grid[row_idx + r][col_idx + c] = text
|
|
|
|
table_cell = TableCell(
|
|
text=text,
|
|
row_span=row_span,
|
|
col_span=col_span,
|
|
start_row_offset_idx=row_idx,
|
|
end_row_offset_idx=row_idx + row_span,
|
|
start_col_offset_idx=col_idx,
|
|
end_col_offset_idx=col_idx + col_span,
|
|
col_header=col_header,
|
|
row_header=((not col_header) and html_cell.name == "th"),
|
|
)
|
|
data.table_cells.append(table_cell)
|
|
|
|
return data
|
|
|
|
def handle_table(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles table tags."""
|
|
|
|
table_data = HTMLDocumentBackend.parse_table_data(element)
|
|
|
|
if table_data is not None:
|
|
doc.add_table(data=table_data, parent=self.parents[self.level])
|
|
|
|
def get_list_text(self, list_element: Tag, level: int = 0) -> list[str]:
|
|
"""Recursively extract text from <ul> or <ol> with proper indentation."""
|
|
result = []
|
|
bullet_char = "*" # Default bullet character for unordered lists
|
|
|
|
if list_element.name == "ol": # For ordered lists, use numbers
|
|
for i, li in enumerate(list_element("li", recursive=False), 1):
|
|
if not isinstance(li, Tag):
|
|
continue
|
|
# Add numbering for ordered lists
|
|
result.append(f"{' ' * level}{i}. {li.get_text(strip=True)}")
|
|
# Handle nested lists
|
|
nested_list = li.find(["ul", "ol"])
|
|
if isinstance(nested_list, Tag):
|
|
result.extend(self.get_list_text(nested_list, level + 1))
|
|
elif list_element.name == "ul": # For unordered lists, use bullet points
|
|
for li in list_element("li", recursive=False):
|
|
if not isinstance(li, Tag):
|
|
continue
|
|
# Add bullet points for unordered lists
|
|
result.append(
|
|
f"{' ' * level}{bullet_char} {li.get_text(strip=True)}"
|
|
)
|
|
# Handle nested lists
|
|
nested_list = li.find(["ul", "ol"])
|
|
if isinstance(nested_list, Tag):
|
|
result.extend(self.get_list_text(nested_list, level + 1))
|
|
|
|
return result
|
|
|
|
def handle_figure(self, element: Tag, doc: DoclingDocument) -> None:
|
|
"""Handles image tags (img)."""
|
|
|
|
# Extract the image URI from the <img> tag
|
|
# image_uri = root.xpath('//figure//img/@src')[0]
|
|
|
|
contains_captions = element.find(["figcaption"])
|
|
if not isinstance(contains_captions, Tag):
|
|
doc.add_picture(parent=self.parents[self.level], caption=None)
|
|
else:
|
|
texts = []
|
|
for item in contains_captions:
|
|
texts.append(item.text)
|
|
|
|
fig_caption = doc.add_text(
|
|
label=DocItemLabel.CAPTION, text=("".join(texts)).strip()
|
|
)
|
|
doc.add_picture(
|
|
parent=self.parents[self.level],
|
|
caption=fig_caption,
|
|
)
|
|
|
|
def handle_image(self, doc: DoclingDocument) -> None:
|
|
"""Handles image tags (img)."""
|
|
doc.add_picture(parent=self.parents[self.level], caption=None)
|