Docling/docling/document_converter.py
Peter W. J. Staar 1557e7ce3e
feat: Support audio input (#1763)
* scaffolding in place

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* doing scaffolding for audio pipeline

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* WIP: got first transcription working

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* all working, time to start cleaning up

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* first working ASR pipeline

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* added openai-whisper as a first transcription model

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* updating with asr_options

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* finalised the first working ASR pipeline with Whisper

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* use whisper from the latest git commit

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>

* Update docling/datamodel/pipeline_options.py

Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>
Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com>

* Update docling/datamodel/pipeline_options.py

Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>
Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com>

* updated comment

Signed-off-by: Peter Staar <taa@zurich.ibm.com>

* AudioBackend -> DummyBackend

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* file rename

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Rename to NoOpBackend, add test for ASR pipeline

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Support every format in NoOpBackend

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Add missing audio file and test

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Install ffmpeg system dependency for ASR test

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

---------

Signed-off-by: Peter Staar <taa@zurich.ibm.com>
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com>
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
Co-authored-by: Michele Dolfi <dol@zurich.ibm.com>
Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com>
Co-authored-by: Christoph Auer <cau@zurich.ibm.com>
2025-06-23 14:47:26 +02:00

385 lines
14 KiB
Python

import hashlib
import logging
import sys
import time
from collections.abc import Iterable, Iterator
from functools import partial
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, Union
from pydantic import BaseModel, ConfigDict, model_validator, validate_call
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.asciidoc_backend import AsciiDocBackend
from docling.backend.csv_backend import CsvDocumentBackend
from docling.backend.docling_parse_v4_backend import DoclingParseV4DocumentBackend
from docling.backend.html_backend import HTMLDocumentBackend
from docling.backend.json.docling_json_backend import DoclingJSONBackend
from docling.backend.md_backend import MarkdownDocumentBackend
from docling.backend.msexcel_backend import MsExcelDocumentBackend
from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend
from docling.backend.msword_backend import MsWordDocumentBackend
from docling.backend.noop_backend import NoOpBackend
from docling.backend.xml.jats_backend import JatsDocumentBackend
from docling.backend.xml.uspto_backend import PatentUsptoDocumentBackend
from docling.datamodel.base_models import (
ConversionStatus,
DoclingComponentType,
DocumentStream,
ErrorItem,
InputFormat,
)
from docling.datamodel.document import (
ConversionResult,
InputDocument,
_DocumentConversionInput,
)
from docling.datamodel.pipeline_options import PipelineOptions
from docling.datamodel.settings import (
DEFAULT_PAGE_RANGE,
DocumentLimits,
PageRange,
settings,
)
from docling.exceptions import ConversionError
from docling.pipeline.asr_pipeline import AsrPipeline
from docling.pipeline.base_pipeline import BasePipeline
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.utils.utils import chunkify
_log = logging.getLogger(__name__)
class FormatOption(BaseModel):
pipeline_cls: Type[BasePipeline]
pipeline_options: Optional[PipelineOptions] = None
backend: Type[AbstractDocumentBackend]
model_config = ConfigDict(arbitrary_types_allowed=True)
@model_validator(mode="after")
def set_optional_field_default(self) -> "FormatOption":
if self.pipeline_options is None:
self.pipeline_options = self.pipeline_cls.get_default_options()
return self
class CsvFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = CsvDocumentBackend
class ExcelFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend
class WordFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend
class PowerpointFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MsPowerpointDocumentBackend
class MarkdownFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MarkdownDocumentBackend
class AsciiDocFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = AsciiDocBackend
class HTMLFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = HTMLDocumentBackend
class PatentUsptoFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[PatentUsptoDocumentBackend] = PatentUsptoDocumentBackend
class XMLJatsFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = JatsDocumentBackend
class ImageFormatOption(FormatOption):
pipeline_cls: Type = StandardPdfPipeline
backend: Type[AbstractDocumentBackend] = DoclingParseV4DocumentBackend
class PdfFormatOption(FormatOption):
pipeline_cls: Type = StandardPdfPipeline
backend: Type[AbstractDocumentBackend] = DoclingParseV4DocumentBackend
class AudioFormatOption(FormatOption):
pipeline_cls: Type = AsrPipeline
backend: Type[AbstractDocumentBackend] = NoOpBackend
def _get_default_option(format: InputFormat) -> FormatOption:
format_to_default_options = {
InputFormat.CSV: FormatOption(
pipeline_cls=SimplePipeline, backend=CsvDocumentBackend
),
InputFormat.XLSX: FormatOption(
pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend
),
InputFormat.DOCX: FormatOption(
pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend
),
InputFormat.PPTX: FormatOption(
pipeline_cls=SimplePipeline, backend=MsPowerpointDocumentBackend
),
InputFormat.MD: FormatOption(
pipeline_cls=SimplePipeline, backend=MarkdownDocumentBackend
),
InputFormat.ASCIIDOC: FormatOption(
pipeline_cls=SimplePipeline, backend=AsciiDocBackend
),
InputFormat.HTML: FormatOption(
pipeline_cls=SimplePipeline, backend=HTMLDocumentBackend
),
InputFormat.XML_USPTO: FormatOption(
pipeline_cls=SimplePipeline, backend=PatentUsptoDocumentBackend
),
InputFormat.XML_JATS: FormatOption(
pipeline_cls=SimplePipeline, backend=JatsDocumentBackend
),
InputFormat.IMAGE: FormatOption(
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV4DocumentBackend
),
InputFormat.PDF: FormatOption(
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV4DocumentBackend
),
InputFormat.JSON_DOCLING: FormatOption(
pipeline_cls=SimplePipeline, backend=DoclingJSONBackend
),
InputFormat.AUDIO: FormatOption(pipeline_cls=AsrPipeline, backend=NoOpBackend),
}
if (options := format_to_default_options.get(format)) is not None:
return options
else:
raise RuntimeError(f"No default options configured for {format}")
class DocumentConverter:
_default_download_filename = "file"
def __init__(
self,
allowed_formats: Optional[List[InputFormat]] = None,
format_options: Optional[Dict[InputFormat, FormatOption]] = None,
):
self.allowed_formats = (
allowed_formats if allowed_formats is not None else list(InputFormat)
)
self.format_to_options = {
format: (
_get_default_option(format=format)
if (custom_option := (format_options or {}).get(format)) is None
else custom_option
)
for format in self.allowed_formats
}
self.initialized_pipelines: Dict[
Tuple[Type[BasePipeline], str], BasePipeline
] = {}
def _get_initialized_pipelines(
self,
) -> dict[tuple[Type[BasePipeline], str], BasePipeline]:
return self.initialized_pipelines
def _get_pipeline_options_hash(self, pipeline_options: PipelineOptions) -> str:
"""Generate a hash of pipeline options to use as part of the cache key."""
options_str = str(pipeline_options.model_dump())
return hashlib.md5(
options_str.encode("utf-8"), usedforsecurity=False
).hexdigest()
def initialize_pipeline(self, format: InputFormat):
"""Initialize the conversion pipeline for the selected format."""
pipeline = self._get_pipeline(doc_format=format)
if pipeline is None:
raise ConversionError(
f"No pipeline could be initialized for format {format}"
)
@validate_call(config=ConfigDict(strict=True))
def convert(
self,
source: Union[Path, str, DocumentStream], # TODO review naming
headers: Optional[Dict[str, str]] = None,
raises_on_error: bool = True,
max_num_pages: int = sys.maxsize,
max_file_size: int = sys.maxsize,
page_range: PageRange = DEFAULT_PAGE_RANGE,
) -> ConversionResult:
all_res = self.convert_all(
source=[source],
raises_on_error=raises_on_error,
max_num_pages=max_num_pages,
max_file_size=max_file_size,
headers=headers,
page_range=page_range,
)
return next(all_res)
@validate_call(config=ConfigDict(strict=True))
def convert_all(
self,
source: Iterable[Union[Path, str, DocumentStream]], # TODO review naming
headers: Optional[Dict[str, str]] = None,
raises_on_error: bool = True, # True: raises on first conversion error; False: does not raise on conv error
max_num_pages: int = sys.maxsize,
max_file_size: int = sys.maxsize,
page_range: PageRange = DEFAULT_PAGE_RANGE,
) -> Iterator[ConversionResult]:
limits = DocumentLimits(
max_num_pages=max_num_pages,
max_file_size=max_file_size,
page_range=page_range,
)
conv_input = _DocumentConversionInput(
path_or_stream_iterator=source, limits=limits, headers=headers
)
conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error)
had_result = False
for conv_res in conv_res_iter:
had_result = True
if raises_on_error and conv_res.status not in {
ConversionStatus.SUCCESS,
ConversionStatus.PARTIAL_SUCCESS,
}:
raise ConversionError(
f"Conversion failed for: {conv_res.input.file} with status: {conv_res.status}"
)
else:
yield conv_res
if not had_result and raises_on_error:
raise ConversionError(
"Conversion failed because the provided file has no recognizable format or it wasn't in the list of allowed formats."
)
def _convert(
self, conv_input: _DocumentConversionInput, raises_on_error: bool
) -> Iterator[ConversionResult]:
start_time = time.monotonic()
for input_batch in chunkify(
conv_input.docs(self.format_to_options),
settings.perf.doc_batch_size, # pass format_options
):
_log.info("Going to convert document batch...")
# parallel processing only within input_batch
# with ThreadPoolExecutor(
# max_workers=settings.perf.doc_batch_concurrency
# ) as pool:
# yield from pool.map(self.process_document, input_batch)
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
for item in map(
partial(self._process_document, raises_on_error=raises_on_error),
input_batch,
):
elapsed = time.monotonic() - start_time
start_time = time.monotonic()
_log.info(
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
)
yield item
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""
fopt = self.format_to_options.get(doc_format)
if fopt is None or fopt.pipeline_options is None:
return None
pipeline_class = fopt.pipeline_cls
pipeline_options = fopt.pipeline_options
options_hash = self._get_pipeline_options_hash(pipeline_options)
# Use a composite key to cache pipelines
cache_key = (pipeline_class, options_hash)
if cache_key not in self.initialized_pipelines:
_log.info(
f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
self.initialized_pipelines[cache_key] = pipeline_class(
pipeline_options=pipeline_options
)
else:
_log.debug(
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
return self.initialized_pipelines[cache_key]
def _process_document(
self, in_doc: InputDocument, raises_on_error: bool
) -> ConversionResult:
valid = (
self.allowed_formats is not None and in_doc.format in self.allowed_formats
)
if valid:
conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error)
else:
error_message = f"File format not allowed: {in_doc.file}"
if raises_on_error:
raise ConversionError(error_message)
else:
error_item = ErrorItem(
component_type=DoclingComponentType.USER_INPUT,
module_name="",
error_message=error_message,
)
conv_res = ConversionResult(
input=in_doc, status=ConversionStatus.SKIPPED, errors=[error_item]
)
return conv_res
def _execute_pipeline(
self, in_doc: InputDocument, raises_on_error: bool
) -> ConversionResult:
if in_doc.valid:
pipeline = self._get_pipeline(in_doc.format)
if pipeline is not None:
conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error)
else:
if raises_on_error:
raise ConversionError(
f"No pipeline could be initialized for {in_doc.file}."
)
else:
conv_res = ConversionResult(
input=in_doc,
status=ConversionStatus.FAILURE,
)
else:
if raises_on_error:
raise ConversionError(f"Input document {in_doc.file} is not valid.")
else:
# invalid doc or not of desired format
conv_res = ConversionResult(
input=in_doc,
status=ConversionStatus.FAILURE,
)
# TODO add error log why it failed.
return conv_res