import csv import io import logging import os import tempfile from subprocess import DEVNULL, PIPE, Popen from typing import Iterable, List, Optional, Tuple import pandas as pd from docling_core.types.doc import BoundingBox, CoordOrigin from docling_core.types.doc.page import BoundingRectangle, TextCell from docling.datamodel.base_models import Page from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options import TesseractCliOcrOptions from docling.datamodel.settings import settings from docling.models.base_ocr_model import BaseOcrModel from docling.utils.ocr_utils import map_tesseract_script from docling.utils.profiling import TimeRecorder _log = logging.getLogger(__name__) class TesseractOcrCliModel(BaseOcrModel): def __init__(self, enabled: bool, options: TesseractCliOcrOptions): super().__init__(enabled=enabled, options=options) self.options: TesseractCliOcrOptions self.scale = 3 # multiplier for 72 dpi == 216 dpi. self._name: Optional[str] = None self._version: Optional[str] = None self._tesseract_languages: Optional[List[str]] = None self._script_prefix: Optional[str] = None if self.enabled: try: self._get_name_and_version() self._set_languages_and_prefix() except Exception as exc: raise RuntimeError( f"Tesseract is not available, aborting: {exc} " "Install tesseract on your system and the tesseract binary is discoverable. " "The actual command for Tesseract can be specified in `pipeline_options.ocr_options.tesseract_cmd='tesseract'`. " "Alternatively, Docling has support for other OCR engines. See the documentation." ) def _get_name_and_version(self) -> Tuple[str, str]: if self._name != None and self._version != None: return self._name, self._version # type: ignore cmd = [self.options.tesseract_cmd, "--version"] proc = Popen(cmd, stdout=PIPE, stderr=PIPE) stdout, stderr = proc.communicate() proc.wait() # HACK: Windows versions of Tesseract output the version to stdout, Linux versions # to stderr, so check both. version_line = ( (stdout.decode("utf8").strip() or stderr.decode("utf8").strip()) .split("\n")[0] .strip() ) # If everything else fails... if not version_line: version_line = "tesseract XXX" name, version = version_line.split(" ") self._name = name self._version = version return name, version def _run_tesseract(self, ifilename: str): r""" Run tesseract CLI """ cmd = [self.options.tesseract_cmd] if "auto" in self.options.lang: lang = self._detect_language(ifilename) if lang is not None: cmd.append("-l") cmd.append(lang) elif self.options.lang is not None and len(self.options.lang) > 0: cmd.append("-l") cmd.append("+".join(self.options.lang)) if self.options.path is not None: cmd.append("--tessdata-dir") cmd.append(self.options.path) cmd += [ifilename, "stdout", "tsv"] _log.info("command: {}".format(" ".join(cmd))) proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) output, _ = proc.communicate() # _log.info(output) # Decode the byte string to a regular string decoded_data = output.decode("utf-8") # _log.info(decoded_data) # Read the TSV file generated by Tesseract df = pd.read_csv(io.StringIO(decoded_data), quoting=csv.QUOTE_NONE, sep="\t") # Display the dataframe (optional) # _log.info("df: ", df.head()) # Filter rows that contain actual text (ignore header or empty rows) df_filtered = df[ df["text"].notnull() & (df["text"].apply(str).str.strip() != "") ] return df_filtered def _detect_language(self, ifilename: str): r""" Run tesseract in PSM 0 mode to detect the language """ assert self._tesseract_languages is not None cmd = [self.options.tesseract_cmd] cmd.extend(["--psm", "0", "-l", "osd", ifilename, "stdout"]) _log.info("command: {}".format(" ".join(cmd))) proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) output, _ = proc.communicate() decoded_data = output.decode("utf-8") df = pd.read_csv( io.StringIO(decoded_data), sep=":", header=None, names=["key", "value"] ) scripts = df.loc[df["key"] == "Script"].value.tolist() if len(scripts) == 0: _log.warning("Tesseract cannot detect the script of the page") return None script = map_tesseract_script(scripts[0].strip()) lang = f"{self._script_prefix}{script}" # Check if the detected language has been installed if lang not in self._tesseract_languages: msg = f"Tesseract detected the script '{script}' and language '{lang}'." msg += " However this language is not installed in your system and will be ignored." _log.warning(msg) return None _log.debug( f"Using tesseract model for the detected script '{script}' and language '{lang}'" ) return lang def _set_languages_and_prefix(self): r""" Read and set the languages installed in tesseract and decide the script prefix """ # Get all languages cmd = [self.options.tesseract_cmd] cmd.append("--list-langs") _log.info("command: {}".format(" ".join(cmd))) proc = Popen(cmd, stdout=PIPE, stderr=DEVNULL) output, _ = proc.communicate() decoded_data = output.decode("utf-8") df = pd.read_csv(io.StringIO(decoded_data), header=None) self._tesseract_languages = df[0].tolist()[1:] # Decide the script prefix if any([l.startswith("script/") for l in self._tesseract_languages]): script_prefix = "script/" else: script_prefix = "" self._script_prefix = script_prefix def __call__( self, conv_res: ConversionResult, page_batch: Iterable[Page] ) -> Iterable[Page]: if not self.enabled: yield from page_batch return for page in page_batch: assert page._backend is not None if not page._backend.is_valid(): yield page else: with TimeRecorder(conv_res, "ocr"): ocr_rects = self.get_ocr_rects(page) all_ocr_cells = [] for ocr_rect in ocr_rects: # Skip zero area boxes if ocr_rect.area() == 0: continue high_res_image = page._backend.get_page_image( scale=self.scale, cropbox=ocr_rect ) try: with tempfile.NamedTemporaryFile( suffix=".png", mode="w+b", delete=False ) as image_file: fname = image_file.name high_res_image.save(image_file) df = self._run_tesseract(fname) finally: if os.path.exists(fname): os.remove(fname) # _log.info(df) # Print relevant columns (bounding box and text) for ix, row in df.iterrows(): text = row["text"] conf = row["conf"] l = float(row["left"]) b = float(row["top"]) w = float(row["width"]) h = float(row["height"]) t = b + h r = l + w cell = TextCell( index=ix, text=text, orig=text, from_ocr=True, confidence=conf / 100.0, rect=BoundingRectangle.from_bounding_box( BoundingBox.from_tuple( coord=( (l / self.scale) + ocr_rect.l, (b / self.scale) + ocr_rect.t, (r / self.scale) + ocr_rect.l, (t / self.scale) + ocr_rect.t, ), origin=CoordOrigin.TOPLEFT, ) ), ) all_ocr_cells.append(cell) # Post-process the cells page.cells = self.post_process_cells(all_ocr_cells, page.cells) # DEBUG code: if settings.debug.visualize_ocr: self.draw_ocr_rects_and_cells(conv_res, page, ocr_rects) yield page