
* Put safety-checks for failed parse of pages Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Introduce page-level error checks Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Bump to docling-parse 1.1.1 Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Introduce page-level error checks Signed-off-by: Christoph Auer <cau@zurich.ibm.com> --------- Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
297 lines
11 KiB
Python
297 lines
11 KiB
Python
import functools
|
|
import logging
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Iterable, Optional, Type, Union
|
|
|
|
import requests
|
|
from docling_core.types import Document
|
|
from PIL import ImageDraw
|
|
from pydantic import AnyHttpUrl, TypeAdapter, ValidationError
|
|
|
|
from docling.backend.abstract_backend import PdfDocumentBackend
|
|
from docling.datamodel.base_models import (
|
|
AssembledUnit,
|
|
AssembleOptions,
|
|
ConversionStatus,
|
|
DoclingComponentType,
|
|
ErrorItem,
|
|
Page,
|
|
PipelineOptions,
|
|
)
|
|
from docling.datamodel.document import (
|
|
ConvertedDocument,
|
|
DocumentConversionInput,
|
|
InputDocument,
|
|
)
|
|
from docling.datamodel.settings import settings
|
|
from docling.models.ds_glm_model import GlmModel
|
|
from docling.models.page_assemble_model import PageAssembleModel
|
|
from docling.pipeline.base_model_pipeline import BaseModelPipeline
|
|
from docling.pipeline.standard_model_pipeline import StandardModelPipeline
|
|
from docling.utils.utils import chunkify, create_hash
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class DocumentConverter:
|
|
_default_download_filename = "file.pdf"
|
|
|
|
def __init__(
|
|
self,
|
|
artifacts_path: Optional[Union[Path, str]] = None,
|
|
pipeline_options: PipelineOptions = PipelineOptions(),
|
|
pdf_backend: Type[PdfDocumentBackend] = DocumentConversionInput.DEFAULT_BACKEND,
|
|
pipeline_cls: Type[BaseModelPipeline] = StandardModelPipeline,
|
|
assemble_options: AssembleOptions = AssembleOptions(),
|
|
):
|
|
if not artifacts_path:
|
|
artifacts_path = self.download_models_hf()
|
|
|
|
artifacts_path = Path(artifacts_path)
|
|
|
|
self.model_pipeline = pipeline_cls(
|
|
artifacts_path=artifacts_path, pipeline_options=pipeline_options
|
|
)
|
|
|
|
self.page_assemble_model = PageAssembleModel(config={})
|
|
self.glm_model = GlmModel(config={})
|
|
self.pdf_backend = pdf_backend
|
|
self.assemble_options = assemble_options
|
|
|
|
@staticmethod
|
|
def download_models_hf(
|
|
local_dir: Optional[Path] = None, force: bool = False
|
|
) -> Path:
|
|
from huggingface_hub import snapshot_download
|
|
|
|
download_path = snapshot_download(
|
|
repo_id="ds4sd/docling-models", force_download=force, local_dir=local_dir
|
|
)
|
|
|
|
return Path(download_path)
|
|
|
|
def convert(self, input: DocumentConversionInput) -> Iterable[ConvertedDocument]:
|
|
|
|
for input_batch in chunkify(
|
|
input.docs(pdf_backend=self.pdf_backend), settings.perf.doc_batch_size
|
|
):
|
|
_log.info(f"Going to convert document batch...")
|
|
# parallel processing only within input_batch
|
|
# with ThreadPoolExecutor(
|
|
# max_workers=settings.perf.doc_batch_concurrency
|
|
# ) as pool:
|
|
# yield from pool.map(self.process_document, input_batch)
|
|
|
|
# Note: Pdfium backend is not thread-safe, thread pool usage was disabled.
|
|
yield from map(self.process_document, input_batch)
|
|
|
|
def convert_single(self, source: Path | AnyHttpUrl | str) -> Document:
|
|
"""Convert a single document.
|
|
|
|
Args:
|
|
source (Path | AnyHttpUrl | str): The PDF input source. Can be a path or URL.
|
|
|
|
Raises:
|
|
ValueError: If source is of unexpected type.
|
|
RuntimeError: If conversion fails.
|
|
|
|
Returns:
|
|
Document: The converted document object.
|
|
"""
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
try:
|
|
http_url: AnyHttpUrl = TypeAdapter(AnyHttpUrl).validate_python(source)
|
|
res = requests.get(http_url, stream=True)
|
|
res.raise_for_status()
|
|
fname = None
|
|
# try to get filename from response header
|
|
if cont_disp := res.headers.get("Content-Disposition"):
|
|
for par in cont_disp.strip().split(";"):
|
|
# currently only handling directive "filename" (not "*filename")
|
|
if (split := par.split("=")) and split[0].strip() == "filename":
|
|
fname = "=".join(split[1:]).strip().strip("'\"") or None
|
|
break
|
|
# otherwise, use name from URL:
|
|
if fname is None:
|
|
fname = Path(http_url.path).name or self._default_download_filename
|
|
local_path = Path(temp_dir) / fname
|
|
with open(local_path, "wb") as f:
|
|
for chunk in res.iter_content(chunk_size=1024): # using 1-KB chunks
|
|
f.write(chunk)
|
|
except ValidationError:
|
|
try:
|
|
local_path = TypeAdapter(Path).validate_python(source)
|
|
except ValidationError:
|
|
raise ValueError(
|
|
f"Unexpected file path type encountered: {type(source)}"
|
|
)
|
|
conv_inp = DocumentConversionInput.from_paths(paths=[local_path])
|
|
converted_docs_iter = self.convert(conv_inp)
|
|
converted_doc: ConvertedDocument = next(converted_docs_iter)
|
|
if converted_doc.status not in {
|
|
ConversionStatus.SUCCESS,
|
|
ConversionStatus.SUCCESS_WITH_ERRORS,
|
|
}:
|
|
raise RuntimeError(f"Conversion failed with status: {converted_doc.status}")
|
|
doc = converted_doc.to_ds_document()
|
|
return doc
|
|
|
|
def process_document(self, in_doc: InputDocument) -> ConvertedDocument:
|
|
start_doc_time = time.time()
|
|
converted_doc = ConvertedDocument(input=in_doc)
|
|
|
|
_log.info(f"Processing document {in_doc.file.name}")
|
|
|
|
if not in_doc.valid:
|
|
converted_doc.status = ConversionStatus.FAILURE
|
|
return converted_doc
|
|
|
|
for i in range(0, in_doc.page_count):
|
|
converted_doc.pages.append(Page(page_no=i))
|
|
|
|
all_assembled_pages = []
|
|
|
|
try:
|
|
# Iterate batches of pages (page_batch_size) in the doc
|
|
for page_batch in chunkify(
|
|
converted_doc.pages, settings.perf.page_batch_size
|
|
):
|
|
start_pb_time = time.time()
|
|
# Pipeline
|
|
|
|
# 1. Initialise the page resources
|
|
init_pages = map(
|
|
functools.partial(self.initialize_page, in_doc), page_batch
|
|
)
|
|
|
|
# 2. Populate page image
|
|
pages_with_images = map(
|
|
functools.partial(self.populate_page_images, in_doc), init_pages
|
|
)
|
|
|
|
# 3. Populate programmatic page cells
|
|
pages_with_cells = map(
|
|
functools.partial(self.parse_page_cells, in_doc),
|
|
pages_with_images,
|
|
)
|
|
|
|
# 4. Run pipeline stages
|
|
pipeline_pages = self.model_pipeline.apply(pages_with_cells)
|
|
|
|
# 5. Assemble page elements (per page)
|
|
assembled_pages = self.page_assemble_model(pipeline_pages)
|
|
|
|
# exhaust assembled_pages
|
|
for assembled_page in assembled_pages:
|
|
# Free up mem resources before moving on with next batch
|
|
|
|
# Remove page images (can be disabled)
|
|
if self.assemble_options.images_scale is None:
|
|
assembled_page._image_cache = {}
|
|
|
|
# Unload backend
|
|
assembled_page._backend.unload()
|
|
|
|
all_assembled_pages.append(assembled_page)
|
|
|
|
end_pb_time = time.time() - start_pb_time
|
|
_log.info(f"Finished converting page batch time={end_pb_time:.3f}")
|
|
|
|
# Free up mem resources of PDF backend
|
|
in_doc._backend.unload()
|
|
|
|
converted_doc.pages = all_assembled_pages
|
|
self.assemble_doc(converted_doc)
|
|
|
|
status = ConversionStatus.SUCCESS
|
|
for page in converted_doc.pages:
|
|
if not page._backend.is_valid():
|
|
converted_doc.errors.append(
|
|
ErrorItem(
|
|
component_type=DoclingComponentType.PDF_BACKEND,
|
|
module_name=type(page._backend).__name__,
|
|
error_message=f"Page {page.page_no} failed to parse.",
|
|
)
|
|
)
|
|
status = ConversionStatus.PARTIAL_SUCCESS
|
|
|
|
converted_doc.status = status
|
|
|
|
except Exception as e:
|
|
converted_doc.status = ConversionStatus.FAILURE
|
|
trace = "\n".join(traceback.format_exception(e))
|
|
_log.info(
|
|
f"Encountered an error during conversion of document {in_doc.document_hash}:\n"
|
|
f"{trace}"
|
|
)
|
|
|
|
end_doc_time = time.time() - start_doc_time
|
|
_log.info(
|
|
f"Finished converting document time-pages={end_doc_time:.2f}/{in_doc.page_count}"
|
|
)
|
|
|
|
return converted_doc
|
|
|
|
# Initialise and load resources for a page, before downstream steps (populate images, cells, ...)
|
|
def initialize_page(self, doc: InputDocument, page: Page) -> Page:
|
|
page._backend = doc._backend.load_page(page.page_no)
|
|
page.size = page._backend.get_size()
|
|
page.page_hash = create_hash(doc.document_hash + ":" + str(page.page_no))
|
|
|
|
return page
|
|
|
|
# Generate the page image and store it in the page object
|
|
def populate_page_images(self, doc: InputDocument, page: Page) -> Page:
|
|
# default scale
|
|
page.get_image(
|
|
scale=1.0
|
|
) # puts the page image on the image cache at default scale
|
|
|
|
# user requested scales
|
|
if self.assemble_options.images_scale is not None:
|
|
page._default_image_scale = self.assemble_options.images_scale
|
|
page.get_image(
|
|
scale=self.assemble_options.images_scale
|
|
) # this will trigger storing the image in the internal cache
|
|
|
|
return page
|
|
|
|
# Extract and populate the page cells and store it in the page object
|
|
def parse_page_cells(self, doc: InputDocument, page: Page) -> Page:
|
|
page.cells = page._backend.get_text_cells()
|
|
|
|
# DEBUG code:
|
|
def draw_text_boxes(image, cells):
|
|
draw = ImageDraw.Draw(image)
|
|
for c in cells:
|
|
x0, y0, x1, y1 = c.bbox.as_tuple()
|
|
draw.rectangle([(x0, y0), (x1, y1)], outline="red")
|
|
image.show()
|
|
|
|
# draw_text_boxes(page.get_image(scale=1.0), cells)
|
|
|
|
return page
|
|
|
|
def assemble_doc(self, converted_doc: ConvertedDocument):
|
|
all_elements = []
|
|
all_headers = []
|
|
all_body = []
|
|
|
|
for p in converted_doc.pages:
|
|
|
|
for el in p.assembled.body:
|
|
all_body.append(el)
|
|
for el in p.assembled.headers:
|
|
all_headers.append(el)
|
|
for el in p.assembled.elements:
|
|
all_elements.append(el)
|
|
|
|
converted_doc.assembled = AssembledUnit(
|
|
elements=all_elements, headers=all_headers, body=all_body
|
|
)
|
|
|
|
converted_doc.output = self.glm_model(converted_doc)
|