
* scaffolding in place Signed-off-by: Peter Staar <taa@zurich.ibm.com> * doing scaffolding for audio pipeline Signed-off-by: Peter Staar <taa@zurich.ibm.com> * WIP: got first transcription working Signed-off-by: Peter Staar <taa@zurich.ibm.com> * all working, time to start cleaning up Signed-off-by: Peter Staar <taa@zurich.ibm.com> * first working ASR pipeline Signed-off-by: Peter Staar <taa@zurich.ibm.com> * added openai-whisper as a first transcription model Signed-off-by: Peter Staar <taa@zurich.ibm.com> * updating with asr_options Signed-off-by: Peter Staar <taa@zurich.ibm.com> * finalised the first working ASR pipeline with Whisper Signed-off-by: Peter Staar <taa@zurich.ibm.com> * use whisper from the latest git commit Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * Update docling/datamodel/pipeline_options.py Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com> Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com> * Update docling/datamodel/pipeline_options.py Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com> Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com> * updated comment Signed-off-by: Peter Staar <taa@zurich.ibm.com> * AudioBackend -> DummyBackend Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * file rename Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Rename to NoOpBackend, add test for ASR pipeline Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Support every format in NoOpBackend Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Add missing audio file and test Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * Install ffmpeg system dependency for ASR test Signed-off-by: Christoph Auer <cau@zurich.ibm.com> --------- Signed-off-by: Peter Staar <taa@zurich.ibm.com> Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> Signed-off-by: Peter W. J. Staar <91719829+PeterStaar-IBM@users.noreply.github.com> Signed-off-by: Christoph Auer <cau@zurich.ibm.com> Co-authored-by: Michele Dolfi <dol@zurich.ibm.com> Co-authored-by: Michele Dolfi <97102151+dolfim-ibm@users.noreply.github.com> Co-authored-by: Christoph Auer <cau@zurich.ibm.com>
385 lines
14 KiB
Python
385 lines
14 KiB
Python
import hashlib
|
|
import logging
|
|
import sys
|
|
import time
|
|
from collections.abc import Iterable, Iterator
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Type, Union
|
|
|
|
from pydantic import BaseModel, ConfigDict, model_validator, validate_call
|
|
|
|
from docling.backend.abstract_backend import AbstractDocumentBackend
|
|
from docling.backend.asciidoc_backend import AsciiDocBackend
|
|
from docling.backend.csv_backend import CsvDocumentBackend
|
|
from docling.backend.docling_parse_v4_backend import DoclingParseV4DocumentBackend
|
|
from docling.backend.html_backend import HTMLDocumentBackend
|
|
from docling.backend.json.docling_json_backend import DoclingJSONBackend
|
|
from docling.backend.md_backend import MarkdownDocumentBackend
|
|
from docling.backend.msexcel_backend import MsExcelDocumentBackend
|
|
from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend
|
|
from docling.backend.msword_backend import MsWordDocumentBackend
|
|
from docling.backend.noop_backend import NoOpBackend
|
|
from docling.backend.xml.jats_backend import JatsDocumentBackend
|
|
from docling.backend.xml.uspto_backend import PatentUsptoDocumentBackend
|
|
from docling.datamodel.base_models import (
|
|
ConversionStatus,
|
|
DoclingComponentType,
|
|
DocumentStream,
|
|
ErrorItem,
|
|
InputFormat,
|
|
)
|
|
from docling.datamodel.document import (
|
|
ConversionResult,
|
|
InputDocument,
|
|
_DocumentConversionInput,
|
|
)
|
|
from docling.datamodel.pipeline_options import PipelineOptions
|
|
from docling.datamodel.settings import (
|
|
DEFAULT_PAGE_RANGE,
|
|
DocumentLimits,
|
|
PageRange,
|
|
settings,
|
|
)
|
|
from docling.exceptions import ConversionError
|
|
from docling.pipeline.asr_pipeline import AsrPipeline
|
|
from docling.pipeline.base_pipeline import BasePipeline
|
|
from docling.pipeline.simple_pipeline import SimplePipeline
|
|
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
|
|
from docling.utils.utils import chunkify
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class FormatOption(BaseModel):
|
|
pipeline_cls: Type[BasePipeline]
|
|
pipeline_options: Optional[PipelineOptions] = None
|
|
backend: Type[AbstractDocumentBackend]
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
@model_validator(mode="after")
|
|
def set_optional_field_default(self) -> "FormatOption":
|
|
if self.pipeline_options is None:
|
|
self.pipeline_options = self.pipeline_cls.get_default_options()
|
|
return self
|
|
|
|
|
|
class CsvFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = CsvDocumentBackend
|
|
|
|
|
|
class ExcelFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend
|
|
|
|
|
|
class WordFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend
|
|
|
|
|
|
class PowerpointFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = MsPowerpointDocumentBackend
|
|
|
|
|
|
class MarkdownFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = MarkdownDocumentBackend
|
|
|
|
|
|
class AsciiDocFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = AsciiDocBackend
|
|
|
|
|
|
class HTMLFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = HTMLDocumentBackend
|
|
|
|
|
|
class PatentUsptoFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[PatentUsptoDocumentBackend] = PatentUsptoDocumentBackend
|
|
|
|
|
|
class XMLJatsFormatOption(FormatOption):
|
|
pipeline_cls: Type = SimplePipeline
|
|
backend: Type[AbstractDocumentBackend] = JatsDocumentBackend
|
|
|
|
|
|
class ImageFormatOption(FormatOption):
|
|
pipeline_cls: Type = StandardPdfPipeline
|
|
backend: Type[AbstractDocumentBackend] = DoclingParseV4DocumentBackend
|
|
|
|
|
|
class PdfFormatOption(FormatOption):
|
|
pipeline_cls: Type = StandardPdfPipeline
|
|
backend: Type[AbstractDocumentBackend] = DoclingParseV4DocumentBackend
|
|
|
|
|
|
class AudioFormatOption(FormatOption):
|
|
pipeline_cls: Type = AsrPipeline
|
|
backend: Type[AbstractDocumentBackend] = NoOpBackend
|
|
|
|
|
|
def _get_default_option(format: InputFormat) -> FormatOption:
|
|
format_to_default_options = {
|
|
InputFormat.CSV: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=CsvDocumentBackend
|
|
),
|
|
InputFormat.XLSX: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend
|
|
),
|
|
InputFormat.DOCX: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend
|
|
),
|
|
InputFormat.PPTX: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=MsPowerpointDocumentBackend
|
|
),
|
|
InputFormat.MD: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=MarkdownDocumentBackend
|
|
),
|
|
InputFormat.ASCIIDOC: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=AsciiDocBackend
|
|
),
|
|
InputFormat.HTML: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=HTMLDocumentBackend
|
|
),
|
|
InputFormat.XML_USPTO: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=PatentUsptoDocumentBackend
|
|
),
|
|
InputFormat.XML_JATS: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=JatsDocumentBackend
|
|
),
|
|
InputFormat.IMAGE: FormatOption(
|
|
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV4DocumentBackend
|
|
),
|
|
InputFormat.PDF: FormatOption(
|
|
pipeline_cls=StandardPdfPipeline, backend=DoclingParseV4DocumentBackend
|
|
),
|
|
InputFormat.JSON_DOCLING: FormatOption(
|
|
pipeline_cls=SimplePipeline, backend=DoclingJSONBackend
|
|
),
|
|
InputFormat.AUDIO: FormatOption(pipeline_cls=AsrPipeline, backend=NoOpBackend),
|
|
}
|
|
if (options := format_to_default_options.get(format)) is not None:
|
|
return options
|
|
else:
|
|
raise RuntimeError(f"No default options configured for {format}")
|
|
|
|
|
|
class DocumentConverter:
|
|
_default_download_filename = "file"
|
|
|
|
def __init__(
|
|
self,
|
|
allowed_formats: Optional[List[InputFormat]] = None,
|
|
format_options: Optional[Dict[InputFormat, FormatOption]] = None,
|
|
):
|
|
self.allowed_formats = (
|
|
allowed_formats if allowed_formats is not None else list(InputFormat)
|
|
)
|
|
self.format_to_options = {
|
|
format: (
|
|
_get_default_option(format=format)
|
|
if (custom_option := (format_options or {}).get(format)) is None
|
|
else custom_option
|
|
)
|
|
for format in self.allowed_formats
|
|
}
|
|
self.initialized_pipelines: Dict[
|
|
Tuple[Type[BasePipeline], str], BasePipeline
|
|
] = {}
|
|
|
|
def _get_initialized_pipelines(
|
|
self,
|
|
) -> dict[tuple[Type[BasePipeline], str], BasePipeline]:
|
|
return self.initialized_pipelines
|
|
|
|
def _get_pipeline_options_hash(self, pipeline_options: PipelineOptions) -> str:
|
|
"""Generate a hash of pipeline options to use as part of the cache key."""
|
|
options_str = str(pipeline_options.model_dump())
|
|
return hashlib.md5(
|
|
options_str.encode("utf-8"), usedforsecurity=False
|
|
).hexdigest()
|
|
|
|
def initialize_pipeline(self, format: InputFormat):
|
|
"""Initialize the conversion pipeline for the selected format."""
|
|
pipeline = self._get_pipeline(doc_format=format)
|
|
if pipeline is None:
|
|
raise ConversionError(
|
|
f"No pipeline could be initialized for format {format}"
|
|
)
|
|
|
|
@validate_call(config=ConfigDict(strict=True))
|
|
def convert(
|
|
self,
|
|
source: Union[Path, str, DocumentStream], # TODO review naming
|
|
headers: Optional[Dict[str, str]] = None,
|
|
raises_on_error: bool = True,
|
|
max_num_pages: int = sys.maxsize,
|
|
max_file_size: int = sys.maxsize,
|
|
page_range: PageRange = DEFAULT_PAGE_RANGE,
|
|
) -> ConversionResult:
|
|
all_res = self.convert_all(
|
|
source=[source],
|
|
raises_on_error=raises_on_error,
|
|
max_num_pages=max_num_pages,
|
|
max_file_size=max_file_size,
|
|
headers=headers,
|
|
page_range=page_range,
|
|
)
|
|
return next(all_res)
|
|
|
|
@validate_call(config=ConfigDict(strict=True))
|
|
def convert_all(
|
|
self,
|
|
source: Iterable[Union[Path, str, DocumentStream]], # TODO review naming
|
|
headers: Optional[Dict[str, str]] = None,
|
|
raises_on_error: bool = True, # True: raises on first conversion error; False: does not raise on conv error
|
|
max_num_pages: int = sys.maxsize,
|
|
max_file_size: int = sys.maxsize,
|
|
page_range: PageRange = DEFAULT_PAGE_RANGE,
|
|
) -> Iterator[ConversionResult]:
|
|
limits = DocumentLimits(
|
|
max_num_pages=max_num_pages,
|
|
max_file_size=max_file_size,
|
|
page_range=page_range,
|
|
)
|
|
conv_input = _DocumentConversionInput(
|
|
path_or_stream_iterator=source, limits=limits, headers=headers
|
|
)
|
|
conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error)
|
|
|
|
had_result = False
|
|
for conv_res in conv_res_iter:
|
|
had_result = True
|
|
if raises_on_error and conv_res.status not in {
|
|
ConversionStatus.SUCCESS,
|
|
ConversionStatus.PARTIAL_SUCCESS,
|
|
}:
|
|
raise ConversionError(
|
|
f"Conversion failed for: {conv_res.input.file} with status: {conv_res.status}"
|
|
)
|
|
else:
|
|
yield conv_res
|
|
|
|
if not had_result and raises_on_error:
|
|
raise ConversionError(
|
|
"Conversion failed because the provided file has no recognizable format or it wasn't in the list of allowed formats."
|
|
)
|
|
|
|
def _convert(
|
|
self, conv_input: _DocumentConversionInput, raises_on_error: bool
|
|
) -> Iterator[ConversionResult]:
|
|
start_time = time.monotonic()
|
|
|
|
for input_batch in chunkify(
|
|
conv_input.docs(self.format_to_options),
|
|
settings.perf.doc_batch_size, # pass format_options
|
|
):
|
|
_log.info("Going to convert document batch...")
|
|
|
|
# parallel processing only within input_batch
|
|
# with ThreadPoolExecutor(
|
|
# max_workers=settings.perf.doc_batch_concurrency
|
|
# ) as pool:
|
|
# yield from pool.map(self.process_document, input_batch)
|
|
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
|
|
|
|
for item in map(
|
|
partial(self._process_document, raises_on_error=raises_on_error),
|
|
input_batch,
|
|
):
|
|
elapsed = time.monotonic() - start_time
|
|
start_time = time.monotonic()
|
|
_log.info(
|
|
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
|
|
)
|
|
yield item
|
|
|
|
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
|
|
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""
|
|
fopt = self.format_to_options.get(doc_format)
|
|
|
|
if fopt is None or fopt.pipeline_options is None:
|
|
return None
|
|
|
|
pipeline_class = fopt.pipeline_cls
|
|
pipeline_options = fopt.pipeline_options
|
|
options_hash = self._get_pipeline_options_hash(pipeline_options)
|
|
|
|
# Use a composite key to cache pipelines
|
|
cache_key = (pipeline_class, options_hash)
|
|
|
|
if cache_key not in self.initialized_pipelines:
|
|
_log.info(
|
|
f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}"
|
|
)
|
|
self.initialized_pipelines[cache_key] = pipeline_class(
|
|
pipeline_options=pipeline_options
|
|
)
|
|
else:
|
|
_log.debug(
|
|
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
|
|
)
|
|
|
|
return self.initialized_pipelines[cache_key]
|
|
|
|
def _process_document(
|
|
self, in_doc: InputDocument, raises_on_error: bool
|
|
) -> ConversionResult:
|
|
valid = (
|
|
self.allowed_formats is not None and in_doc.format in self.allowed_formats
|
|
)
|
|
if valid:
|
|
conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error)
|
|
else:
|
|
error_message = f"File format not allowed: {in_doc.file}"
|
|
if raises_on_error:
|
|
raise ConversionError(error_message)
|
|
else:
|
|
error_item = ErrorItem(
|
|
component_type=DoclingComponentType.USER_INPUT,
|
|
module_name="",
|
|
error_message=error_message,
|
|
)
|
|
conv_res = ConversionResult(
|
|
input=in_doc, status=ConversionStatus.SKIPPED, errors=[error_item]
|
|
)
|
|
|
|
return conv_res
|
|
|
|
def _execute_pipeline(
|
|
self, in_doc: InputDocument, raises_on_error: bool
|
|
) -> ConversionResult:
|
|
if in_doc.valid:
|
|
pipeline = self._get_pipeline(in_doc.format)
|
|
if pipeline is not None:
|
|
conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error)
|
|
else:
|
|
if raises_on_error:
|
|
raise ConversionError(
|
|
f"No pipeline could be initialized for {in_doc.file}."
|
|
)
|
|
else:
|
|
conv_res = ConversionResult(
|
|
input=in_doc,
|
|
status=ConversionStatus.FAILURE,
|
|
)
|
|
else:
|
|
if raises_on_error:
|
|
raise ConversionError(f"Input document {in_doc.file} is not valid.")
|
|
else:
|
|
# invalid doc or not of desired format
|
|
conv_res = ConversionResult(
|
|
input=in_doc,
|
|
status=ConversionStatus.FAILURE,
|
|
)
|
|
# TODO add error log why it failed.
|
|
|
|
return conv_res
|