Docling/docling/backend/msword_backend.py
Christoph Auer 3023f18ba0
feat: Support AsciiDoc and Markdown input format (#168)
* updated the base-model and added the asciidoc_backend

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* updated the asciidoc backend

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* Ensure all models work only on valid pages (#158)

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* ci: run ci also on forks (#160)


---------

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
Signed-off-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>

* fix: fix legacy doc ref (#162)

Signed-off-by: Panos Vagenas <35837085+vagenas@users.noreply.github.com>

* docs: typo fix (#155)

* Docs: Typo fix

- Corrected spelling of invidual to automatic

Signed-off-by: ABHISHEK FADAKE <31249309+fadkeabhi@users.noreply.github.com>

* add synchronize event for forks

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

---------

Signed-off-by: ABHISHEK FADAKE <31249309+fadkeabhi@users.noreply.github.com>
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
Co-authored-by: Michele Dolfi <dol@zurich.ibm.com>

* feat: add coverage_threshold to skip OCR for small images (#161)

* feat: add coverage_threshold to skip OCR for small images

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

* filter individual boxes

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

* rename option

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

---------

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

* chore: bump version to 2.1.0 [skip ci]

* adding tests for asciidocs

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* first working asciidoc parser

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* reformatted the code

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* fixed the mypy

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* adding test_02.asciidoc

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* Drafting Markdown backend via Marko library

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* work in progress on MD backend

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* md_backend produces docling document with headers, paragraphs, lists

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Improvements in md parsing

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Detecting and assembling tables in markdown in temporary buffers

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Added initial docling table support to md_backend

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Cleaned code, improved logging for MD

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Fixes MyPy requirements, and rest of pre-commit

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Fixed example run_md, added origin info to md_backend

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* working on asciidocs, struggling with ImageRef

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* able to parse the captions and image uri's

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* fixed the mypy

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* Update all backends with proper filename in DocumentOrigin

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Update to docling-core v2.1.0

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Fixes for MD Backend, to avoid duplicated text inserts into docling doc

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Fix styling

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Added support for code blocks and fenced code in MD

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* cleaned prints

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Added proper processing of in-line textual elements for MD backend

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Fixed issues with duplicated paragraphs and incorrect lists in pptx

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

* Fixed issue with group ordeering in pptx backend, added gebug log into run with formats

Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>

---------

Signed-off-by: Peter Staar <taa@zurich.ibm.com>
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
Signed-off-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>
Signed-off-by: Panos Vagenas <35837085+vagenas@users.noreply.github.com>
Signed-off-by: ABHISHEK FADAKE <31249309+fadkeabhi@users.noreply.github.com>
Signed-off-by: Maksym Lysak <mly@zurich.ibm.com>
Co-authored-by: Peter Staar <taa@zurich.ibm.com>
Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>
Co-authored-by: Panos Vagenas <35837085+vagenas@users.noreply.github.com>
Co-authored-by: ABHISHEK FADAKE <31249309+fadkeabhi@users.noreply.github.com>
Co-authored-by: Michele Dolfi <dol@zurich.ibm.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Maksym Lysak <mly@zurich.ibm.com>
2024-10-23 16:14:26 +02:00

503 lines
17 KiB
Python

import logging
from io import BytesIO
from pathlib import Path
from typing import Set, Union
import docx
from docling_core.types.doc import (
DocItemLabel,
DoclingDocument,
DocumentOrigin,
GroupLabel,
TableCell,
TableData,
)
from lxml import etree
from docling.backend.abstract_backend import DeclarativeDocumentBackend
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import InputDocument
_log = logging.getLogger(__name__)
class MsWordDocumentBackend(DeclarativeDocumentBackend):
def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]):
super().__init__(in_doc, path_or_stream)
self.XML_KEY = (
"{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val"
)
self.xml_namespaces = {
"w": "http://schemas.microsoft.com/office/word/2003/wordml"
}
# self.initialise(path_or_stream)
# Word file:
self.path_or_stream = path_or_stream
self.valid = False
# Initialise the parents for the hierarchy
self.max_levels = 10
self.level_at_new_list = None
self.parents = {} # type: ignore
for i in range(-1, self.max_levels):
self.parents[i] = None
self.level = 0
self.listIter = 0
self.history = {
"names": [None],
"levels": [None],
"numids": [None],
"indents": [None],
}
self.docx_obj = None
try:
if isinstance(self.path_or_stream, BytesIO):
self.docx_obj = docx.Document(self.path_or_stream)
elif isinstance(self.path_or_stream, Path):
self.docx_obj = docx.Document(str(self.path_or_stream))
self.valid = True
except Exception as e:
raise RuntimeError(
f"MsPowerpointDocumentBackend could not load document with hash {self.document_hash}"
) from e
def is_valid(self) -> bool:
return self.valid
@classmethod
def supports_pagination(cls) -> bool:
return False
def unload(self):
if isinstance(self.path_or_stream, BytesIO):
self.path_or_stream.close()
self.path_or_stream = None
@classmethod
def supported_formats(cls) -> Set[InputFormat]:
return {InputFormat.DOCX}
def convert(self) -> DoclingDocument:
# Parses the DOCX into a structured document model.
origin = DocumentOrigin(
filename=self.file.name or "file",
mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
binary_hash=self.document_hash,
)
doc = DoclingDocument(name=self.file.stem or "file", origin=origin)
if self.is_valid():
assert self.docx_obj is not None
doc = self.walk_linear(self.docx_obj.element.body, self.docx_obj, doc)
return doc
else:
raise RuntimeError(
f"Cannot convert doc with {self.document_hash} because the backend failed to init."
)
def update_history(self, name, level, numid, ilevel):
self.history["names"].append(name)
self.history["levels"].append(level)
self.history["numids"].append(numid)
self.history["indents"].append(ilevel)
def prev_name(self):
return self.history["names"][-1]
def prev_level(self):
return self.history["levels"][-1]
def prev_numid(self):
return self.history["numids"][-1]
def prev_indent(self):
return self.history["indents"][-1]
def get_level(self) -> int:
"""Return the first None index."""
for k, v in self.parents.items():
if k >= 0 and v == None:
return k
return 0
def walk_linear(self, body, docx_obj, doc) -> DoclingDocument:
for element in body:
tag_name = etree.QName(element).localname
# Check for Inline Images (drawings or blip elements)
found_drawing = etree.ElementBase.xpath(
element, ".//w:drawing", namespaces=self.xml_namespaces
)
found_pict = etree.ElementBase.xpath(
element, ".//w:pict", namespaces=self.xml_namespaces
)
# Check for Tables
if element.tag.endswith("tbl"):
try:
self.handle_tables(element, docx_obj, doc)
except Exception:
_log.debug("could not parse a table, broken docx table")
elif found_drawing or found_pict:
self.handle_pictures(element, docx_obj, doc)
# Check for Text
elif tag_name in ["p"]:
self.handle_text_elements(element, docx_obj, doc)
else:
_log.debug(f"Ignoring element in DOCX with tag: {tag_name}")
return doc
def str_to_int(self, s, default=0):
if s is None:
return None
try:
return int(s)
except ValueError:
return default
def get_numId_and_ilvl(self, paragraph):
# Access the XML element of the paragraph
numPr = paragraph._element.find(
".//w:numPr", namespaces=paragraph._element.nsmap
)
if numPr is not None:
# Get the numId element and extract the value
numId_elem = numPr.find("w:numId", namespaces=paragraph._element.nsmap)
ilvl_elem = numPr.find("w:ilvl", namespaces=paragraph._element.nsmap)
numId = numId_elem.get(self.XML_KEY) if numId_elem is not None else None
ilvl = ilvl_elem.get(self.XML_KEY) if ilvl_elem is not None else None
return self.str_to_int(numId, default=None), self.str_to_int(
ilvl, default=None
)
return None, None # If the paragraph is not part of a list
def get_label_and_level(self, paragraph):
if paragraph.style is None:
return "Normal", None
label = paragraph.style.name
if label is None:
return "Normal", None
if ":" in label:
parts = label.split(":")
if len(parts) == 2:
return parts[0], int(parts[1])
parts = label.split(" ")
if "Heading" in label and len(parts) == 2:
parts.sort()
label_str = ""
label_level = 0
if parts[0] == "Heading":
# print("{} - {}".format(parts[0], parts[1]))
label_str = parts[0]
label_level = self.str_to_int(parts[1], default=None)
if parts[1] == "Heading":
label_str = parts[1]
label_level = self.str_to_int(parts[0], default=None)
return label_str, label_level
else:
return label, None
def handle_text_elements(self, element, docx_obj, doc):
paragraph = docx.text.paragraph.Paragraph(element, docx_obj)
if paragraph.text is None:
# _log.warn(f"paragraph has text==None")
return
text = paragraph.text.strip()
# if len(text)==0 # keep empty paragraphs, they seperate adjacent lists!
# Common styles for bullet and numbered lists.
# "List Bullet", "List Number", "List Paragraph"
# TODO: reliably identify wether list is a numbered list or not
# is_numbered = "List Bullet" not in paragraph.style.name
is_numbered = False
p_style_name, p_level = self.get_label_and_level(paragraph)
numid, ilevel = self.get_numId_and_ilvl(paragraph)
# print("numid: {}, ilevel: {}, text: {}".format(numid, ilevel, text))
if numid == 0:
numid = None
# Handle lists
if numid is not None and ilevel is not None:
self.add_listitem(
element,
docx_obj,
doc,
p_style_name,
p_level,
numid,
ilevel,
text,
is_numbered,
)
self.update_history(p_style_name, p_level, numid, ilevel)
return
elif numid is None and self.prev_numid() is not None: # Close list
for key, val in self.parents.items():
if key >= self.level_at_new_list:
self.parents[key] = None
self.level = self.level_at_new_list - 1
self.level_at_new_list = None
if p_style_name in ["Title"]:
for key, val in self.parents.items():
self.parents[key] = None
self.parents[0] = doc.add_text(
parent=None, label=DocItemLabel.TITLE, text=text
)
elif "Heading" in p_style_name:
self.add_header(element, docx_obj, doc, p_style_name, p_level, text)
elif p_style_name in [
"Paragraph",
"Normal",
"Subtitle",
"Author",
"Default Text",
"List Paragraph",
"List Bullet",
"Quote",
]:
level = self.get_level()
doc.add_text(
label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text
)
else:
# Text style names can, and will have, not only default values but user values too
# hence we treat all other labels as pure text
level = self.get_level()
doc.add_text(
label=DocItemLabel.PARAGRAPH, parent=self.parents[level - 1], text=text
)
self.update_history(p_style_name, p_level, numid, ilevel)
return
def add_header(self, element, docx_obj, doc, curr_name, curr_level, text: str):
level = self.get_level()
if isinstance(curr_level, int):
if curr_level == level:
self.parents[level] = doc.add_heading(
parent=self.parents[level - 1], text=text
)
elif curr_level > level:
# add invisible group
for i in range(level, curr_level):
self.parents[i] = doc.add_group(
parent=self.parents[i - 1],
label=GroupLabel.SECTION,
name=f"header-{i}",
)
self.parents[curr_level] = doc.add_heading(
parent=self.parents[curr_level - 1], text=text
)
elif curr_level < level:
# remove the tail
for key, val in self.parents.items():
if key >= curr_level:
self.parents[key] = None
self.parents[curr_level] = doc.add_heading(
parent=self.parents[curr_level - 1], text=text
)
else:
self.parents[self.level] = doc.add_heading(
parent=self.parents[self.level - 1], text=text
)
return
def add_listitem(
self,
element,
docx_obj,
doc,
p_style_name,
p_level,
numid,
ilevel,
text: str,
is_numbered=False,
):
# is_numbered = is_numbered
enum_marker = ""
level = self.get_level()
if self.prev_numid() is None: # Open new list
self.level_at_new_list = level # type: ignore
self.parents[level] = doc.add_group(
label=GroupLabel.LIST, name="list", parent=self.parents[level - 1]
)
# TODO: Set marker and enumerated arguments if this is an enumeration element.
self.listIter += 1
if is_numbered:
enum_marker = str(self.listIter) + "."
is_numbered = True
doc.add_list_item(
marker=enum_marker,
enumerated=is_numbered,
parent=self.parents[level],
text=text,
)
elif (
self.prev_numid() == numid and self.prev_indent() < ilevel
): # Open indented list
for i in range(
self.level_at_new_list + self.prev_indent() + 1,
self.level_at_new_list + ilevel + 1,
):
# TODO: determine if this is an unordered list or an ordered list.
# Set GroupLabel.ORDERED_LIST when it fits.
self.listIter = 0
if is_numbered:
self.parents[i] = doc.add_group(
label=GroupLabel.ORDERED_LIST,
name="list",
parent=self.parents[i - 1],
)
else:
self.parents[i] = doc.add_group(
label=GroupLabel.LIST, name="list", parent=self.parents[i - 1]
)
# TODO: Set marker and enumerated arguments if this is an enumeration element.
self.listIter += 1
if is_numbered:
enum_marker = str(self.listIter) + "."
is_numbered = True
doc.add_list_item(
marker=enum_marker,
enumerated=is_numbered,
parent=self.parents[self.level_at_new_list + ilevel],
text=text,
)
elif self.prev_numid() == numid and ilevel < self.prev_indent(): # Close list
for k, v in self.parents.items():
if k > self.level_at_new_list + ilevel:
self.parents[k] = None
# TODO: Set marker and enumerated arguments if this is an enumeration element.
self.listIter += 1
if is_numbered:
enum_marker = str(self.listIter) + "."
is_numbered = True
doc.add_list_item(
marker=enum_marker,
enumerated=is_numbered,
parent=self.parents[self.level_at_new_list + ilevel],
text=text,
)
self.listIter = 0
elif self.prev_numid() == numid or self.prev_indent() == ilevel:
# TODO: Set marker and enumerated arguments if this is an enumeration element.
self.listIter += 1
if is_numbered:
enum_marker = str(self.listIter) + "."
is_numbered = True
doc.add_list_item(
marker=enum_marker,
enumerated=is_numbered,
parent=self.parents[level - 1],
text=text,
)
return
def handle_tables(self, element, docx_obj, doc):
# Function to check if a cell has a colspan (gridSpan)
def get_colspan(cell):
grid_span = cell._element.xpath("@w:gridSpan")
if grid_span:
return int(grid_span[0]) # Return the number of columns spanned
return 1 # Default is 1 (no colspan)
# Function to check if a cell has a rowspan (vMerge)
def get_rowspan(cell):
v_merge = cell._element.xpath("@w:vMerge")
if v_merge:
return v_merge[
0
] # 'restart' indicates the beginning of a rowspan, others are continuation
return 1
table = docx.table.Table(element, docx_obj)
num_rows = len(table.rows)
num_cols = 0
for row in table.rows:
# Calculate the max number of columns
num_cols = max(num_cols, sum(get_colspan(cell) for cell in row.cells))
# if row.cells:
# num_cols = max(num_cols, len(row.cells))
# Initialize the table grid
table_grid = [[None for _ in range(num_cols)] for _ in range(num_rows)]
data = TableData(num_rows=num_rows, num_cols=num_cols, table_cells=[])
for row_idx, row in enumerate(table.rows):
col_idx = 0
for c, cell in enumerate(row.cells):
row_span = get_rowspan(cell)
col_span = get_colspan(cell)
# Find the next available column in the grid
while table_grid[row_idx][col_idx] is not None:
col_idx += 1
# Fill the grid with the cell value, considering rowspan and colspan
for i in range(row_span if row_span == "restart" else 1):
for j in range(col_span):
table_grid[row_idx + i][col_idx + j] = ""
cell = TableCell(
text=cell.text,
row_span=row_span,
col_span=col_span,
start_row_offset_idx=row_idx,
end_row_offset_idx=row_idx + row_span,
start_col_offset_idx=col_idx,
end_col_offset_idx=col_idx + col_span,
col_header=False, # col_header,
row_header=False, # ((not col_header) and html_cell.name=='th')
)
data.table_cells.append(cell)
level = self.get_level()
doc.add_table(data=data, parent=self.parents[level - 1])
return
def handle_pictures(self, element, docx_obj, doc):
doc.add_picture(parent=self.parents[self.level], caption=None)
return