
* add factory for ocr engines Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * apply pre-commit after rebase Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * add picture description factory Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * fix enable option Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * switch to create methods Signed-off-by: Panos Vagenas <pva@zurich.ibm.com> * make `options` an explicit kwarg Signed-off-by: Panos Vagenas <pva@zurich.ibm.com> * keep old lock of docling-core Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * fix lock Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * add allow_external_plugins option Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> * add factory return and ignore options type Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> --------- Signed-off-by: Michele Dolfi <dol@zurich.ibm.com> Signed-off-by: Panos Vagenas <pva@zurich.ibm.com> Co-authored-by: Panos Vagenas <pva@zurich.ibm.com>
280 lines
9.8 KiB
Python
280 lines
9.8 KiB
Python
import csv
|
|
import io
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from pathlib import Path
|
|
from subprocess import DEVNULL, PIPE, Popen
|
|
from typing import Iterable, List, Optional, Tuple, Type
|
|
|
|
import pandas as pd
|
|
from docling_core.types.doc import BoundingBox, CoordOrigin
|
|
from docling_core.types.doc.page import BoundingRectangle, TextCell
|
|
|
|
from docling.datamodel.base_models import Page
|
|
from docling.datamodel.document import ConversionResult
|
|
from docling.datamodel.pipeline_options import (
|
|
AcceleratorOptions,
|
|
OcrOptions,
|
|
TesseractCliOcrOptions,
|
|
)
|
|
from docling.datamodel.settings import settings
|
|
from docling.models.base_ocr_model import BaseOcrModel
|
|
from docling.utils.ocr_utils import map_tesseract_script
|
|
from docling.utils.profiling import TimeRecorder
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class TesseractOcrCliModel(BaseOcrModel):
|
|
def __init__(
|
|
self,
|
|
enabled: bool,
|
|
artifacts_path: Optional[Path],
|
|
options: TesseractCliOcrOptions,
|
|
accelerator_options: AcceleratorOptions,
|
|
):
|
|
super().__init__(
|
|
enabled=enabled,
|
|
artifacts_path=artifacts_path,
|
|
options=options,
|
|
accelerator_options=accelerator_options,
|
|
)
|
|
self.options: TesseractCliOcrOptions
|
|
|
|
self.scale = 3 # multiplier for 72 dpi == 216 dpi.
|
|
|
|
self._name: Optional[str] = None
|
|
self._version: Optional[str] = None
|
|
self._tesseract_languages: Optional[List[str]] = None
|
|
self._script_prefix: Optional[str] = None
|
|
|
|
if self.enabled:
|
|
try:
|
|
self._get_name_and_version()
|
|
self._set_languages_and_prefix()
|
|
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
f"Tesseract is not available, aborting: {exc} "
|
|
"Install tesseract on your system and the tesseract binary is discoverable. "
|
|
"The actual command for Tesseract can be specified in `pipeline_options.ocr_options.tesseract_cmd='tesseract'`. "
|
|
"Alternatively, Docling has support for other OCR engines. See the documentation."
|
|
)
|
|
|
|
def _get_name_and_version(self) -> Tuple[str, str]:
|
|
|
|
if self._name != None and self._version != None:
|
|
return self._name, self._version # type: ignore
|
|
|
|
cmd = [self.options.tesseract_cmd, "--version"]
|
|
|
|
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
|
|
stdout, stderr = proc.communicate()
|
|
|
|
proc.wait()
|
|
|
|
# HACK: Windows versions of Tesseract output the version to stdout, Linux versions
|
|
# to stderr, so check both.
|
|
version_line = (
|
|
(stdout.decode("utf8").strip() or stderr.decode("utf8").strip())
|
|
.split("\n")[0]
|
|
.strip()
|
|
)
|
|
|
|
# If everything else fails...
|
|
if not version_line:
|
|
version_line = "tesseract XXX"
|
|
|
|
name, version = version_line.split(" ")
|
|
|
|
self._name = name
|
|
self._version = version
|
|
|
|
return name, version
|
|
|
|
def _run_tesseract(self, ifilename: str):
|
|
r"""
|
|
Run tesseract CLI
|
|
"""
|
|
cmd = [self.options.tesseract_cmd]
|
|
|
|
if "auto" in self.options.lang:
|
|
lang = self._detect_language(ifilename)
|
|
if lang is not None:
|
|
cmd.append("-l")
|
|
cmd.append(lang)
|
|
elif self.options.lang is not None and len(self.options.lang) > 0:
|
|
cmd.append("-l")
|
|
cmd.append("+".join(self.options.lang))
|
|
|
|
if self.options.path is not None:
|
|
cmd.append("--tessdata-dir")
|
|
cmd.append(self.options.path)
|
|
|
|
cmd += [ifilename, "stdout", "tsv"]
|
|
_log.info("command: {}".format(" ".join(cmd)))
|
|
|
|
proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL)
|
|
output, _ = proc.communicate()
|
|
|
|
# _log.info(output)
|
|
|
|
# Decode the byte string to a regular string
|
|
decoded_data = output.decode("utf-8")
|
|
# _log.info(decoded_data)
|
|
|
|
# Read the TSV file generated by Tesseract
|
|
df = pd.read_csv(io.StringIO(decoded_data), quoting=csv.QUOTE_NONE, sep="\t")
|
|
|
|
# Display the dataframe (optional)
|
|
# _log.info("df: ", df.head())
|
|
|
|
# Filter rows that contain actual text (ignore header or empty rows)
|
|
df_filtered = df[
|
|
df["text"].notnull() & (df["text"].apply(str).str.strip() != "")
|
|
]
|
|
|
|
return df_filtered
|
|
|
|
def _detect_language(self, ifilename: str):
|
|
r"""
|
|
Run tesseract in PSM 0 mode to detect the language
|
|
"""
|
|
assert self._tesseract_languages is not None
|
|
|
|
cmd = [self.options.tesseract_cmd]
|
|
cmd.extend(["--psm", "0", "-l", "osd", ifilename, "stdout"])
|
|
_log.info("command: {}".format(" ".join(cmd)))
|
|
proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL)
|
|
output, _ = proc.communicate()
|
|
decoded_data = output.decode("utf-8")
|
|
df = pd.read_csv(
|
|
io.StringIO(decoded_data), sep=":", header=None, names=["key", "value"]
|
|
)
|
|
scripts = df.loc[df["key"] == "Script"].value.tolist()
|
|
if len(scripts) == 0:
|
|
_log.warning("Tesseract cannot detect the script of the page")
|
|
return None
|
|
|
|
script = map_tesseract_script(scripts[0].strip())
|
|
lang = f"{self._script_prefix}{script}"
|
|
|
|
# Check if the detected language has been installed
|
|
if lang not in self._tesseract_languages:
|
|
msg = f"Tesseract detected the script '{script}' and language '{lang}'."
|
|
msg += " However this language is not installed in your system and will be ignored."
|
|
_log.warning(msg)
|
|
return None
|
|
|
|
_log.debug(
|
|
f"Using tesseract model for the detected script '{script}' and language '{lang}'"
|
|
)
|
|
return lang
|
|
|
|
def _set_languages_and_prefix(self):
|
|
r"""
|
|
Read and set the languages installed in tesseract and decide the script prefix
|
|
"""
|
|
# Get all languages
|
|
cmd = [self.options.tesseract_cmd]
|
|
cmd.append("--list-langs")
|
|
_log.info("command: {}".format(" ".join(cmd)))
|
|
proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL)
|
|
output, _ = proc.communicate()
|
|
decoded_data = output.decode("utf-8")
|
|
df = pd.read_csv(io.StringIO(decoded_data), header=None)
|
|
self._tesseract_languages = df[0].tolist()[1:]
|
|
|
|
# Decide the script prefix
|
|
if any([l.startswith("script/") for l in self._tesseract_languages]):
|
|
script_prefix = "script/"
|
|
else:
|
|
script_prefix = ""
|
|
|
|
self._script_prefix = script_prefix
|
|
|
|
def __call__(
|
|
self, conv_res: ConversionResult, page_batch: Iterable[Page]
|
|
) -> Iterable[Page]:
|
|
|
|
if not self.enabled:
|
|
yield from page_batch
|
|
return
|
|
|
|
for page in page_batch:
|
|
assert page._backend is not None
|
|
if not page._backend.is_valid():
|
|
yield page
|
|
else:
|
|
with TimeRecorder(conv_res, "ocr"):
|
|
ocr_rects = self.get_ocr_rects(page)
|
|
|
|
all_ocr_cells = []
|
|
for ocr_rect in ocr_rects:
|
|
# Skip zero area boxes
|
|
if ocr_rect.area() == 0:
|
|
continue
|
|
high_res_image = page._backend.get_page_image(
|
|
scale=self.scale, cropbox=ocr_rect
|
|
)
|
|
try:
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix=".png", mode="w+b", delete=False
|
|
) as image_file:
|
|
fname = image_file.name
|
|
high_res_image.save(image_file)
|
|
|
|
df = self._run_tesseract(fname)
|
|
finally:
|
|
if os.path.exists(fname):
|
|
os.remove(fname)
|
|
|
|
# _log.info(df)
|
|
|
|
# Print relevant columns (bounding box and text)
|
|
for ix, row in df.iterrows():
|
|
text = row["text"]
|
|
conf = row["conf"]
|
|
|
|
l = float(row["left"])
|
|
b = float(row["top"])
|
|
w = float(row["width"])
|
|
h = float(row["height"])
|
|
|
|
t = b + h
|
|
r = l + w
|
|
|
|
cell = TextCell(
|
|
index=ix,
|
|
text=text,
|
|
orig=text,
|
|
from_ocr=True,
|
|
confidence=conf / 100.0,
|
|
rect=BoundingRectangle.from_bounding_box(
|
|
BoundingBox.from_tuple(
|
|
coord=(
|
|
(l / self.scale) + ocr_rect.l,
|
|
(b / self.scale) + ocr_rect.t,
|
|
(r / self.scale) + ocr_rect.l,
|
|
(t / self.scale) + ocr_rect.t,
|
|
),
|
|
origin=CoordOrigin.TOPLEFT,
|
|
)
|
|
),
|
|
)
|
|
all_ocr_cells.append(cell)
|
|
|
|
# Post-process the cells
|
|
page.cells = self.post_process_cells(all_ocr_cells, page.cells)
|
|
|
|
# DEBUG code:
|
|
if settings.debug.visualize_ocr:
|
|
self.draw_ocr_rects_and_cells(conv_res, page, ocr_rects)
|
|
|
|
yield page
|
|
|
|
@classmethod
|
|
def get_options_type(cls) -> Type[OcrOptions]:
|
|
return TesseractCliOcrOptions
|