import logging import re from io import BytesIO from pathlib import Path from typing import Any, Optional, Union from docling_core.types.doc import ( DocItemLabel, DoclingDocument, DocumentOrigin, GroupLabel, ImageRef, NodeItem, TableCell, TableData, ) from docx import Document from docx.document import Document as DocxDocument from docx.oxml.table import CT_Tc from docx.oxml.xmlchemy import BaseOxmlElement from docx.table import Table, _Cell from docx.text.paragraph import Paragraph from lxml import etree from lxml.etree import XPath from PIL import Image, UnidentifiedImageError from typing_extensions import override from docling.backend.abstract_backend import DeclarativeDocumentBackend from docling.backend.docx.latex.omml import oMath2Latex from docling.datamodel.base_models import InputFormat from docling.datamodel.document import InputDocument _log = logging.getLogger(__name__) class MsWordDocumentBackend(DeclarativeDocumentBackend): @override def __init__( self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path] ) -> None: super().__init__(in_doc, path_or_stream) self.XML_KEY = ( "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val" ) self.xml_namespaces = { "w": "http://schemas.microsoft.com/office/word/2003/wordml" } # self.initialise(path_or_stream) # Word file: self.path_or_stream: Union[BytesIO, Path] = path_or_stream self.valid: bool = False # Initialise the parents for the hierarchy self.max_levels: int = 10 self.level_at_new_list: Optional[int] = None self.parents: dict[int, Optional[NodeItem]] = {} for i in range(-1, self.max_levels): self.parents[i] = None self.level = 0 self.listIter = 0 self.history: dict[str, Any] = { "names": [None], "levels": [None], "numids": [None], "indents": [None], } self.docx_obj = None try: if isinstance(self.path_or_stream, BytesIO): self.docx_obj = Document(self.path_or_stream) elif isinstance(self.path_or_stream, Path): self.docx_obj = Document(str(self.path_or_stream)) self.valid = True except Exception as e: raise RuntimeError( f"MsPowerpointDocumentBackend could not load document with hash {self.document_hash}" ) from e @override def is_valid(self) -> bool: return self.valid @classmethod @override def supports_pagination(cls) -> bool: return False @override def unload(self): if isinstance(self.path_or_stream, BytesIO): self.path_or_stream.close() self.path_or_stream = None @classmethod @override def supported_formats(cls) -> set[InputFormat]: return {InputFormat.DOCX} @override def convert(self) -> DoclingDocument: """Parses the DOCX into a structured document model. Returns: The parsed document. """ origin = DocumentOrigin( filename=self.file.name or "file", mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document", binary_hash=self.document_hash, ) doc = DoclingDocument(name=self.file.stem or "file", origin=origin) if self.is_valid(): assert self.docx_obj is not None doc = self.walk_linear(self.docx_obj.element.body, self.docx_obj, doc) return doc else: raise RuntimeError( f"Cannot convert doc with {self.document_hash} because the backend failed to init." ) def update_history( self, name: str, level: Optional[int], numid: Optional[int], ilevel: Optional[int], ): self.history["names"].append(name) self.history["levels"].append(level) self.history["numids"].append(numid) self.history["indents"].append(ilevel) def prev_name(self) -> Optional[str]: return self.history["names"][-1] def prev_level(self) -> Optional[int]: return self.history["levels"][-1] def prev_numid(self) -> Optional[int]: return self.history["numids"][-1] def prev_indent(self) -> Optional[int]: return self.history["indents"][-1] def get_level(self) -> int: """Return the first None index.""" for k, v in self.parents.items(): if k >= 0 and v == None: return k return 0 def walk_linear( self, body: BaseOxmlElement, docx_obj: DocxDocument, doc: DoclingDocument, ) -> DoclingDocument: for element in body: tag_name = etree.QName(element).localname # Check for Inline Images (blip elements) namespaces = { "a": "http://schemas.openxmlformats.org/drawingml/2006/main", "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", } xpath_expr = XPath(".//a:blip", namespaces=namespaces) drawing_blip = xpath_expr(element) # Check for Tables if element.tag.endswith("tbl"): try: self.handle_tables(element, docx_obj, doc) except Exception: _log.debug("could not parse a table, broken docx table") elif drawing_blip: self.handle_pictures(docx_obj, drawing_blip, doc) # Check for the sdt containers, like table of contents elif tag_name in ["sdt"]: sdt_content = element.find(".//w:sdtContent", namespaces=namespaces) if sdt_content is not None: # Iterate paragraphs, runs, or text inside . paragraphs = sdt_content.findall(".//w:p", namespaces=namespaces) for p in paragraphs: self.handle_text_elements(p, docx_obj, doc) # Check for Text elif tag_name in ["p"]: # "tcPr", "sectPr" self.handle_text_elements(element, docx_obj, doc) else: _log.debug(f"Ignoring element in DOCX with tag: {tag_name}") return doc def str_to_int(self, s: Optional[str], default: Optional[int] = 0) -> Optional[int]: if s is None: return None try: return int(s) except ValueError: return default def split_text_and_number(self, input_string: str) -> list[str]: match = re.match(r"(\D+)(\d+)$|^(\d+)(\D+)", input_string) if match: parts = list(filter(None, match.groups())) return parts else: return [input_string] def get_numId_and_ilvl( self, paragraph: Paragraph ) -> tuple[Optional[int], Optional[int]]: # Access the XML element of the paragraph numPr = paragraph._element.find( ".//w:numPr", namespaces=paragraph._element.nsmap ) if numPr is not None: # Get the numId element and extract the value numId_elem = numPr.find("w:numId", namespaces=paragraph._element.nsmap) ilvl_elem = numPr.find("w:ilvl", namespaces=paragraph._element.nsmap) numId = numId_elem.get(self.XML_KEY) if numId_elem is not None else None ilvl = ilvl_elem.get(self.XML_KEY) if ilvl_elem is not None else None return self.str_to_int(numId, None), self.str_to_int(ilvl, None) return None, None # If the paragraph is not part of a list def get_label_and_level(self, paragraph: Paragraph) -> tuple[str, Optional[int]]: if paragraph.style is None: return "Normal", None label = paragraph.style.style_id if label is None: return "Normal", None if ":" in label: parts = label.split(":") if len(parts) == 2: return parts[0], self.str_to_int(parts[1], None) parts = self.split_text_and_number(label) if "Heading" in label and len(parts) == 2: parts.sort() label_str: str = "" label_level: Optional[int] = 0 if parts[0] == "Heading": label_str = parts[0] label_level = self.str_to_int(parts[1], None) if parts[1] == "Heading": label_str = parts[1] label_level = self.str_to_int(parts[0], None) return label_str, label_level else: return label, None def handle_equations_in_text(self, element, text): only_texts = [] only_equations = [] texts_and_equations = [] for subt in element.iter(): tag_name = etree.QName(subt).localname if tag_name == "t" and "math" not in subt.tag: only_texts.append(subt.text) texts_and_equations.append(subt.text) elif "oMath" in subt.tag and "oMathPara" not in subt.tag: latex_equation = str(oMath2Latex(subt)) only_equations.append(latex_equation) texts_and_equations.append(latex_equation) if "".join(only_texts).strip() != text.strip(): # If we are not able to reconstruct the initial raw text # do not try to parse equations and return the original return text, [] return "".join(texts_and_equations), only_equations def handle_text_elements( self, element: BaseOxmlElement, docx_obj: DocxDocument, doc: DoclingDocument, ) -> None: paragraph = Paragraph(element, docx_obj) raw_text = paragraph.text text, equations = self.handle_equations_in_text(element=element, text=raw_text) if text is None: return text = text.strip() # Common styles for bullet and numbered lists. # "List Bullet", "List Number", "List Paragraph" # Identify wether list is a numbered list or not # is_numbered = "List Bullet" not in paragraph.style.name is_numbered = False p_style_id, p_level = self.get_label_and_level(paragraph) numid, ilevel = self.get_numId_and_ilvl(paragraph) if numid == 0: numid = None # Handle lists if ( numid is not None and ilevel is not None and p_style_id not in ["Title", "Heading"] ): self.add_listitem( doc, numid, ilevel, text, is_numbered, ) self.update_history(p_style_id, p_level, numid, ilevel) return elif ( numid is None and self.prev_numid() is not None and p_style_id not in ["Title", "Heading"] ): # Close list if self.level_at_new_list: for key in range(len(self.parents)): if key >= self.level_at_new_list: self.parents[key] = None self.level = self.level_at_new_list - 1 self.level_at_new_list = None else: for key in range(len(self.parents)): self.parents[key] = None self.level = 0 if p_style_id in ["Title"]: for key in range(len(self.parents)): self.parents[key] = None self.parents[0] = doc.add_text( parent=None, label=DocItemLabel.TITLE, text=text ) elif "Heading" in p_style_id: self.add_header(doc, p_level, text) elif len(equations) > 0: if (raw_text is None or len(raw_text) == 0) and len(text) > 0: # Standalone equation level = self.get_level() doc.add_text( label=DocItemLabel.FORMULA, parent=self.parents[level - 1], text=text, ) else: # Inline equation level = self.get_level() inline_equation = doc.add_group( label=GroupLabel.INLINE, parent=self.parents[level - 1] ) text_tmp = text for eq in equations: if len(text_tmp) == 0: break pre_eq_text = text_tmp.split(eq, maxsplit=1)[0] text_tmp = text_tmp.split(eq, maxsplit=1)[1] if len(pre_eq_text) > 0: doc.add_text( label=DocItemLabel.PARAGRAPH, parent=inline_equation, text=pre_eq_text, ) doc.add_text( label=DocItemLabel.FORMULA, parent=inline_equation, text=eq, ) if len(text_tmp) > 0: doc.add_text( label=DocItemLabel.PARAGRAPH, parent=inline_equation, text=text_tmp, ) elif p_style_id in [ "Paragraph", "Normal", "Subtitle", "Author", "DefaultText", "ListParagraph", "ListBullet", "Quote", ]: level = self.get_level() doc.add_text( label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text ) else: # Text style names can, and will have, not only default values but user values too # hence we treat all other labels as pure text level = self.get_level() doc.add_text( label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text ) self.update_history(p_style_id, p_level, numid, ilevel) return def add_header( self, doc: DoclingDocument, curr_level: Optional[int], text: str ) -> None: level = self.get_level() if isinstance(curr_level, int): if curr_level > level: # add invisible group for i in range(level, curr_level): self.parents[i] = doc.add_group( parent=self.parents[i - 1], label=GroupLabel.SECTION, name=f"header-{i}", ) elif curr_level < level: # remove the tail for key in range(len(self.parents)): if key >= curr_level: self.parents[key] = None self.parents[curr_level] = doc.add_heading( parent=self.parents[curr_level - 1], text=text, level=curr_level, ) else: self.parents[self.level] = doc.add_heading( parent=self.parents[self.level - 1], text=text, level=1, ) return def add_listitem( self, doc: DoclingDocument, numid: int, ilevel: int, text: str, is_numbered: bool = False, ) -> None: enum_marker = "" level = self.get_level() prev_indent = self.prev_indent() if self.prev_numid() is None: # Open new list self.level_at_new_list = level self.parents[level] = doc.add_group( label=GroupLabel.LIST, name="list", parent=self.parents[level - 1] ) # Set marker and enumerated arguments if this is an enumeration element. self.listIter += 1 if is_numbered: enum_marker = str(self.listIter) + "." is_numbered = True doc.add_list_item( marker=enum_marker, enumerated=is_numbered, parent=self.parents[level], text=text, ) elif ( self.prev_numid() == numid and self.level_at_new_list is not None and prev_indent is not None and prev_indent < ilevel ): # Open indented list for i in range( self.level_at_new_list + prev_indent + 1, self.level_at_new_list + ilevel + 1, ): # Determine if this is an unordered list or an ordered list. # Set GroupLabel.ORDERED_LIST when it fits. self.listIter = 0 if is_numbered: self.parents[i] = doc.add_group( label=GroupLabel.ORDERED_LIST, name="list", parent=self.parents[i - 1], ) else: self.parents[i] = doc.add_group( label=GroupLabel.LIST, name="list", parent=self.parents[i - 1] ) # TODO: Set marker and enumerated arguments if this is an enumeration element. self.listIter += 1 if is_numbered: enum_marker = str(self.listIter) + "." is_numbered = True doc.add_list_item( marker=enum_marker, enumerated=is_numbered, parent=self.parents[self.level_at_new_list + ilevel], text=text, ) elif ( self.prev_numid() == numid and self.level_at_new_list is not None and prev_indent is not None and ilevel < prev_indent ): # Close list for k, v in self.parents.items(): if k > self.level_at_new_list + ilevel: self.parents[k] = None # TODO: Set marker and enumerated arguments if this is an enumeration element. self.listIter += 1 if is_numbered: enum_marker = str(self.listIter) + "." is_numbered = True doc.add_list_item( marker=enum_marker, enumerated=is_numbered, parent=self.parents[self.level_at_new_list + ilevel], text=text, ) self.listIter = 0 elif self.prev_numid() == numid or prev_indent == ilevel: # TODO: Set marker and enumerated arguments if this is an enumeration element. self.listIter += 1 if is_numbered: enum_marker = str(self.listIter) + "." is_numbered = True doc.add_list_item( marker=enum_marker, enumerated=is_numbered, parent=self.parents[level - 1], text=text, ) return def handle_tables( self, element: BaseOxmlElement, docx_obj: DocxDocument, doc: DoclingDocument, ) -> None: table: Table = Table(element, docx_obj) num_rows = len(table.rows) num_cols = len(table.columns) _log.debug(f"Table grid with {num_rows} rows and {num_cols} columns") if num_rows == 1 and num_cols == 1: cell_element = table.rows[0].cells[0] # In case we have a table of only 1 cell, we consider it furniture # And proceed processing the content of the cell as though it's in the document body self.walk_linear(cell_element._element, docx_obj, doc) return data = TableData(num_rows=num_rows, num_cols=num_cols) cell_set: set[CT_Tc] = set() for row_idx, row in enumerate(table.rows): _log.debug(f"Row index {row_idx} with {len(row.cells)} populated cells") col_idx = 0 while col_idx < num_cols: cell: _Cell = row.cells[col_idx] _log.debug( f" col {col_idx} grid_span {cell.grid_span} grid_cols_before {row.grid_cols_before}" ) if cell is None or cell._tc in cell_set: _log.debug(f" skipped since repeated content") col_idx += cell.grid_span continue else: cell_set.add(cell._tc) spanned_idx = row_idx spanned_tc: Optional[CT_Tc] = cell._tc while spanned_tc == cell._tc: spanned_idx += 1 spanned_tc = ( table.rows[spanned_idx].cells[col_idx]._tc if spanned_idx < num_rows else None ) _log.debug(f" spanned before row {spanned_idx}") table_cell = TableCell( text=cell.text, row_span=spanned_idx - row_idx, col_span=cell.grid_span, start_row_offset_idx=row.grid_cols_before + row_idx, end_row_offset_idx=row.grid_cols_before + spanned_idx, start_col_offset_idx=col_idx, end_col_offset_idx=col_idx + cell.grid_span, column_header=row.grid_cols_before + row_idx == 0, row_header=False, ) data.table_cells.append(table_cell) col_idx += cell.grid_span level = self.get_level() doc.add_table(data=data, parent=self.parents[level - 1]) return def handle_pictures( self, docx_obj: DocxDocument, drawing_blip: Any, doc: DoclingDocument ) -> None: def get_docx_image(drawing_blip): rId = drawing_blip[0].get( "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}embed" ) if rId in docx_obj.part.rels: # Access the image part using the relationship ID image_part = docx_obj.part.rels[rId].target_part image_data = image_part.blob # Get the binary image data return image_data level = self.get_level() # Open the BytesIO object with PIL to create an Image try: image_data = get_docx_image(drawing_blip) image_bytes = BytesIO(image_data) pil_image = Image.open(image_bytes) doc.add_picture( parent=self.parents[level - 1], image=ImageRef.from_pil(image=pil_image, dpi=72), caption=None, ) except (UnidentifiedImageError, OSError) as e: _log.warning("Warning: image cannot be loaded by Pillow") doc.add_picture( parent=self.parents[level - 1], caption=None, ) return