"""Backend to parse patents from the United States Patent Office (USPTO). The parsers included in this module can handle patent grants pubished since 1976 and patent applications since 2001. The original files can be found in https://bulkdata.uspto.gov. """ import html import logging import re import xml.sax import xml.sax.xmlreader from abc import ABC, abstractmethod from enum import Enum, unique from io import BytesIO from pathlib import Path from typing import Any, Final, Optional, Union from bs4 import BeautifulSoup, Tag from docling_core.types.doc import ( DocItem, DocItemLabel, DoclingDocument, DocumentOrigin, TableCell, TableData, TextItem, ) from docling_core.types.doc.document import LevelNumber from pydantic import NonNegativeInt from typing_extensions import Self, TypedDict, override from docling.backend.abstract_backend import DeclarativeDocumentBackend from docling.datamodel.base_models import InputFormat from docling.datamodel.document import InputDocument _log = logging.getLogger(__name__) XML_DECLARATION: Final = '' @unique class PatentHeading(Enum): """Text of docling headings for tagged sections in USPTO patent documents.""" ABSTRACT = "ABSTRACT", 2 CLAIMS = "CLAIMS", 2 @override def __new__(cls, value: str, _) -> Self: obj = object.__new__(cls) obj._value_ = value return obj @override def __init__(self, _, level: LevelNumber) -> None: self.level: LevelNumber = level class PatentUsptoDocumentBackend(DeclarativeDocumentBackend): @override def __init__( self, in_doc: InputDocument, path_or_stream: Union[BytesIO, Path] ) -> None: super().__init__(in_doc, path_or_stream) self.patent_content: str = "" self.parser: Optional[PatentUspto] = None try: if isinstance(self.path_or_stream, BytesIO): while line := self.path_or_stream.readline().decode("utf-8"): if line.startswith(" None: doctype_line = doctype.lower() if doctype == "PATN\n": self.parser = PatentUsptoGrantAps() elif "us-patent-application-v4" in doctype_line: self.parser = PatentUsptoIce() elif "us-patent-grant-v4" in doctype_line: self.parser = PatentUsptoIce() elif "us-grant-025" in doctype_line: self.parser = PatentUsptoGrantV2() elif all( item in doctype_line for item in ("patent-application-publication", "pap-v1") ): self.parser = PatentUsptoAppV1() else: self.parser = None @override def is_valid(self) -> bool: return bool(self.patent_content) and bool(self.parser) @classmethod @override def supports_pagination(cls) -> bool: return False @override def unload(self) -> None: return @classmethod @override def supported_formats(cls) -> set[InputFormat]: return {InputFormat.XML_USPTO} @override def convert(self) -> DoclingDocument: if self.parser is not None: doc = self.parser.parse(self.patent_content) if doc is None: raise RuntimeError( f"Failed to convert doc (hash={self.document_hash}, " f"name={self.file.name})." ) doc.name = self.file.name or "file" mime_type = ( "text/plain" if isinstance(self.parser, PatentUsptoGrantAps) else "application/xml" ) doc.origin = DocumentOrigin( mimetype=mime_type, binary_hash=self.document_hash, filename=self.file.name or "file", ) return doc else: raise RuntimeError( f"Cannot convert doc (hash={self.document_hash}, " f"name={self.file.name}) because the backend failed to init." ) class PatentUspto(ABC): """Parser of patent documents from the US Patent Office.""" @abstractmethod def parse(self, patent_content: str) -> Optional[DoclingDocument]: """Parse a USPTO patent. Parameters: patent_content: The content of a single patent in a USPTO file. Returns: The patent parsed as a docling document. """ pass class PatentUsptoIce(PatentUspto): """Parser of patent documents from the US Patent Office (ICE). The compatible formats are: - Patent Grant Full Text Data/XML Version 4.x ICE (from January 2005) - Patent Application Full Text Data/XML Version 4.x ICE (from January 2005) """ def __init__(self) -> None: """Build an instance of PatentUsptoIce class.""" self.handler = PatentUsptoIce.PatentHandler() self.pattern = re.compile(r"^()", re.MULTILINE | re.DOTALL) def parse(self, patent_content: str) -> Optional[DoclingDocument]: try: xml.sax.parseString(patent_content, self.handler) except xml.sax._exceptions.SAXParseException as exc_sax: _log.error(f"Error in parsing USPTO document: {exc_sax}") return None doc = self.handler.doc if doc: raw_tables = re.findall(self.pattern, patent_content) parsed_tables: list[TableData] = [] _log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") for table in raw_tables: table_parser = XmlTable(XML_DECLARATION + "\n" + table) try: table_data = table_parser.parse() if table_data: parsed_tables.append(table_data) except Exception as exc_table: _log.error(f"Error in parsing USPTO tables: {exc_table}") if len(parsed_tables) != len(doc.tables): _log.error( f"Number of referenced ({len(doc.tables)}) and parsed " f"({len(parsed_tables)}) tables differ." ) else: for idx, item in enumerate(parsed_tables): doc.tables[idx].data = item return doc class PatentHandler(xml.sax.handler.ContentHandler): """SAX ContentHandler for patent documents.""" APP_DOC_ELEMENT: Final = "us-patent-application" GRANT_DOC_ELEMENT: Final = "us-patent-grant" @unique class Element(Enum): """Represents an element of interest in the patent application document.""" ABSTRACT = "abstract", True TITLE = "invention-title", True CLAIMS = "claims", False CLAIM = "claim", False CLAIM_TEXT = "claim-text", True PARAGRAPH = "p", True HEADING = "heading", True DESCRIPTION = "description", False TABLE = "table", False # to track its position, without text DRAWINGS = "description-of-drawings", True STYLE_SUPERSCRIPT = "sup", True STYLE_SUBSCRIPT = "sub", True MATHS = "maths", False # to avoid keeping formulas @override def __new__(cls, value: str, _) -> Self: obj = object.__new__(cls) obj._value_ = value return obj @override def __init__(self, _, is_text: bool) -> None: self.is_text: bool = is_text @override def __init__(self) -> None: """Build an instance of the patent handler.""" # Current patent being parsed self.doc: Optional[DoclingDocument] = None # Keep track of docling hierarchy level self.level: LevelNumber = 1 # Keep track of docling parents by level self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} # Content to retain for the current patent self.property: list[str] self.claim: str self.claims: list[str] self.abstract: str self.text: str self._clean_data() # To handle mathematical styling self.style_html = HtmlEntity() @override def startElement(self, tag, attributes): # noqa: N802 """Signal the start of an element. Args: tag: The element tag. attributes: The element attributes. """ if tag in ( self.APP_DOC_ELEMENT, self.GRANT_DOC_ELEMENT, ): self.doc = DoclingDocument(name="file") self.text = "" self._start_registered_elements(tag, attributes) @override def skippedEntity(self, name): # noqa: N802 """Receive notification of a skipped entity. HTML entities will be skipped by the parser. This method will unescape them and add them to the text. Args: name: Entity name. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: escaped = self.style_html.get_greek_from_iso8879(f"&{name};") unescaped = html.unescape(escaped) if unescaped == escaped: _log.debug(f"Unrecognized HTML entity: {name}") return if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(unescaped, elm_val) else: self.text += unescaped @override def endElement(self, tag): # noqa: N802 """Signal the end of an element. Args: tag: The element tag. """ if tag in ( self.APP_DOC_ELEMENT, self.GRANT_DOC_ELEMENT, ): self._clean_data() self._end_registered_element(tag) @override def characters(self, content): """Receive notification of character data. Args: content: Data reported by the handler. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(content, elm_val) else: self.text += content def _start_registered_elements( self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl ) -> None: if tag in [member.value for member in self.Element]: # special case for claims: claim lines may start before the # previous one is closed if ( tag == self.Element.CLAIM_TEXT.value and self.property and self.property[-1] == tag and self.text.strip() ): self.claim += " " + self.text.strip() self.text = "" elif tag == self.Element.HEADING.value: level_attr: str = attributes.get("level", "") new_level: int = int(level_attr) if level_attr.isnumeric() else 1 max_level = min(self.parents.keys()) # increase heading level with 1 for title, if any self.level = ( new_level + 1 if (new_level + 1) in self.parents else max_level ) self.property.append(tag) def _end_registered_element(self, tag: str) -> None: if tag in [item.value for item in self.Element] and self.property: current_tag = self.property.pop() self._add_property(current_tag, self.text.strip()) def _add_property(self, name: str, text: str) -> None: if not name or not self.doc: return if name == self.Element.TITLE.value: if text: self.parents[self.level + 1] = self.doc.add_title( parent=self.parents[self.level], text=text, ) self.level += 1 self.text = "" elif name == self.Element.ABSTRACT.value: if self.abstract: heading_text = PatentHeading.ABSTRACT.value heading_level = ( PatentHeading.ABSTRACT.level if PatentHeading.ABSTRACT.level in self.parents else 1 ) abstract_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=self.abstract, parent=abstract_item, ) elif name == self.Element.CLAIM_TEXT.value: text = re.sub("\\s+", " ", text).strip() if text: self.claim += " " + text self.text = "" elif name == self.Element.CLAIM.value and self.claim: self.claims.append(self.claim.strip()) self.claim = "" elif name == self.Element.CLAIMS.value and self.claims: heading_text = PatentHeading.CLAIMS.value heading_level = ( PatentHeading.CLAIMS.level if PatentHeading.CLAIMS.level in self.parents else 1 ) claims_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) for text in self.claims: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item ) elif name == self.Element.PARAGRAPH.value and text: # remmove blank spaces added in paragraphs text = re.sub("\\s+", " ", text) if self.Element.ABSTRACT.value in self.property: self.abstract = ( (self.abstract + " " + text) if self.abstract else text ) else: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=text, parent=self.parents[self.level], ) self.text = "" elif name == self.Element.HEADING.value and text: self.parents[self.level + 1] = self.doc.add_heading( text=text, level=self.level, parent=self.parents[self.level], ) self.level += 1 self.text = "" elif name == self.Element.TABLE.value: # set an empty table as placeholder empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) self.doc.add_table( data=empty_table, parent=self.parents[self.level], ) def _apply_style(self, text: str, style_tag: str) -> str: """Apply an HTML style to text. Args: text: A string containing plain text. style_tag: An HTML tag name for styling text. If the tag name is not recognized as one of the supported styles, the method will return the original `text`. Returns: A string after applying the style. """ formatted = text if style_tag == self.Element.STYLE_SUPERSCRIPT.value: formatted = html.unescape(self.style_html.get_superscript(text)) elif style_tag == self.Element.STYLE_SUBSCRIPT.value: formatted = html.unescape(self.style_html.get_subscript(text)) return formatted def _clean_data(self) -> None: """Reset the variables from stream data.""" self.property = [] self.claim = "" self.claims = [] self.abstract = "" class PatentUsptoGrantV2(PatentUspto): """Parser of patent documents from the US Patent Office (grants v2.5). The compatible format is: - Patent Grant Full Text Data/XML Version 2.5 (from January 2002 till December 2004) """ @override def __init__(self) -> None: """Build an instance of PatentUsptoGrantV2 class.""" self.handler = PatentUsptoGrantV2.PatentHandler() self.pattern = re.compile(r"^(
)", re.MULTILINE | re.DOTALL) @override def parse(self, patent_content: str) -> Optional[DoclingDocument]: try: xml.sax.parseString(patent_content, self.handler) except xml.sax._exceptions.SAXParseException as exc_sax: _log.error(f"Error in parsing USPTO document: {exc_sax}") return None doc = self.handler.doc if doc: raw_tables = re.findall(self.pattern, patent_content) parsed_tables: list[TableData] = [] _log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") for table in raw_tables: table_parser = XmlTable(XML_DECLARATION + "\n" + table) try: table_data = table_parser.parse() if table_data: parsed_tables.append(table_data) except Exception as exc_table: _log.error(f"Error in parsing USPTO tables: {exc_table}") if len(parsed_tables) != len(doc.tables): _log.error( f"Number of referenced ({len(doc.tables)}) and parsed " f"({len(parsed_tables)}) tables differ." ) else: for idx, item in enumerate(parsed_tables): doc.tables[idx].data = item return doc class PatentHandler(xml.sax.handler.ContentHandler): """SAX ContentHandler for patent documents.""" GRANT_DOC_ELEMENT: Final = "PATDOC" CLAIM_STATEMENT: Final = "What is claimed is:" @unique class Element(Enum): """Represents an element of interest in the patent application document.""" PDAT = "PDAT", True # any type of data ABSTRACT = ("SDOAB", False) SDOCL = ("SDOCL", False) TITLE = ("B540", False) CLAIMS = ("CL", False) CLAIM = ("CLM", False) PARAGRAPH = ("PARA", True) HEADING = ("H", True) DRAWINGS = ("DRWDESC", False) STYLE_SUPERSCRIPT = ("SP", False) STYLE_SUBSCRIPT = ("SB", False) STYLE_ITALIC = ("ITALIC", False) CWU = ("CWU", False) # avoid tables, chemicals, formulas TABLE = ("table", False) # to keep track of table positions @override def __new__(cls, value: str, _) -> Self: obj = object.__new__(cls) obj._value_ = value return obj @override def __init__(self, _, is_text: bool) -> None: self.is_text: bool = is_text @override def __init__(self) -> None: """Build an instance of the patent handler.""" # Current patent being parsed self.doc: Optional[DoclingDocument] = None # Keep track of docling hierarchy level self.level: LevelNumber = 1 # Keep track of docling parents by level self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} # Content to retain for the current patent self.property: list[str] self.claim: str self.claims: list[str] self.paragraph: str self.abstract: str self._clean_data() # To handle mathematical styling self.style_html = HtmlEntity() @override def startElement(self, tag, attributes): # noqa: N802 """Signal the start of an element. Args: tag: The element tag. attributes: The element attributes. """ if tag == self.GRANT_DOC_ELEMENT: self.doc = DoclingDocument(name="file") self.text = "" self._start_registered_elements(tag, attributes) @override def skippedEntity(self, name): # noqa: N802 """Receive notification of a skipped entity. HTML entities will be skipped by the parser. This method will unescape them and add them to the text. Args: name: Entity name. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: escaped = self.style_html.get_greek_from_iso8879(f"&{name};") unescaped = html.unescape(escaped) if unescaped == escaped: logging.debug("Unrecognized HTML entity: " + name) return if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(unescaped, elm_val) else: self.text += unescaped @override def endElement(self, tag): # noqa: N802 """Signal the end of an element. Args: tag: The element tag. """ if tag == self.GRANT_DOC_ELEMENT: self._clean_data() self._end_registered_element(tag) @override def characters(self, content): """Receive notification of character data. Args: content: Data reported by the handler. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(content, elm_val) else: self.text += content def _start_registered_elements( self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl ) -> None: if tag in [member.value for member in self.Element]: if ( tag == self.Element.HEADING.value and not self.Element.SDOCL.value in self.property ): level_attr: str = attributes.get("LVL", "") new_level: int = int(level_attr) if level_attr.isnumeric() else 1 max_level = min(self.parents.keys()) # increase heading level with 1 for title, if any self.level = ( new_level + 1 if (new_level + 1) in self.parents else max_level ) self.property.append(tag) def _end_registered_element(self, tag: str) -> None: if tag in [elm.value for elm in self.Element] and self.property: current_tag = self.property.pop() self._add_property(current_tag, self.text) def _add_property(self, name: str, text: str) -> None: if not name or not self.doc: return if name == self.Element.PDAT.value and text: if not self.property: self.text = "" return wrapper = self.property[-1] text = self._apply_style(text, wrapper) if self.Element.TITLE.value in self.property and text.strip(): title = text.strip() self.parents[self.level + 1] = self.doc.add_title( parent=self.parents[self.level], text=title, ) self.level += 1 elif self.Element.ABSTRACT.value in self.property: self.abstract += text elif self.Element.CLAIM.value in self.property: self.claim += text # Paragraph text not in claims or abstract elif ( self.Element.PARAGRAPH.value in self.property and self.Element.CLAIM.value not in self.property and self.Element.ABSTRACT.value not in self.property ): self.paragraph += text # headers except claims statement elif ( self.Element.HEADING.value in self.property and not self.Element.SDOCL.value in self.property and text.strip() ): self.parents[self.level + 1] = self.doc.add_heading( text=text.strip(), level=self.level, parent=self.parents[self.level], ) self.level += 1 self.text = "" elif name == self.Element.CLAIM.value and self.claim.strip(): self.claims.append(self.claim.strip()) self.claim = "" elif name == self.Element.CLAIMS.value and self.claims: heading_text = PatentHeading.CLAIMS.value heading_level = ( PatentHeading.CLAIMS.level if PatentHeading.CLAIMS.level in self.parents else 1 ) claims_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) for text in self.claims: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item ) elif name == self.Element.ABSTRACT.value and self.abstract.strip(): abstract = self.abstract.strip() heading_text = PatentHeading.ABSTRACT.value heading_level = ( PatentHeading.ABSTRACT.level if PatentHeading.ABSTRACT.level in self.parents else 1 ) abstract_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=abstract, parent=abstract_item ) elif name == self.Element.PARAGRAPH.value: paragraph = self.paragraph.strip() if paragraph and self.Element.CLAIM.value not in self.property: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=paragraph, parent=self.parents[self.level], ) elif self.Element.CLAIM.value in self.property: # we may need a space after a paragraph in claim text self.claim += " " self.paragraph = "" elif name == self.Element.TABLE.value: # set an empty table as placeholder empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) self.doc.add_table( data=empty_table, parent=self.parents[self.level], ) def _apply_style(self, text: str, style_tag: str) -> str: """Apply an HTML style to text. Args: text: A string containing plain text. style_tag: An HTML tag name for styling text. If the tag name is not recognized as one of the supported styles, the method will return the original `text`. Returns: A string after applying the style. """ formatted = text if style_tag == self.Element.STYLE_SUPERSCRIPT.value: formatted = html.unescape(self.style_html.get_superscript(text)) elif style_tag == self.Element.STYLE_SUBSCRIPT.value: formatted = html.unescape(self.style_html.get_subscript(text)) elif style_tag == self.Element.STYLE_ITALIC.value: formatted = html.unescape(self.style_html.get_math_italic(text)) return formatted def _clean_data(self) -> None: """Reset the variables from stream data.""" self.text = "" self.property = [] self.claim = "" self.claims = [] self.paragraph = "" self.abstract = "" class PatentUsptoGrantAps(PatentUspto): """Parser of patents documents from the US Patent Office (grants APS). The compatible format is: - Patent Grant Full Text Data/APS (from January 1976 till December 2001) """ @unique class Section(Enum): """Represent a section in a patent APS document.""" ABSTRACT = "ABST" SUMMARY = "BSUM" DETAILS = "DETD" CLAIMS = "CLMS" DRAWINGS = "DRWD" @unique class Field(Enum): """Represent a field in a patent APS document.""" DOC_NUMBER = "WKU" TITLE = "TTL" PARAGRAPH = "PAR" PARAGRAPH_1 = "PA1" PARAGRAPH_2 = "PA2" PARAGRAPH_3 = "PA3" TEXT = "PAL" CAPTION = "PAC" NUMBER = "NUM" NAME = "NAM" IPC = "ICL" ISSUED = "ISD" FILED = "APD" PATENT_NUMBER = "PNO" APPLICATION_NUMBER = "APN" APPLICATION_TYPE = "APT" COUNTRY = "CNT" @override def __init__(self) -> None: """Build an instance of PatentUsptoGrantAps class.""" self.doc: Optional[DoclingDocument] = None # Keep track of docling hierarchy level self.level: LevelNumber = 1 # Keep track of docling parents by level self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} def get_last_text_item(self) -> Optional[TextItem]: """Get the last text item at the current document level. Returns: The text item or None, if the current level parent has no children.""" if self.doc: parent = self.parents[self.level] children = parent.children if parent is not None else [] else: return None text_list: list[TextItem] = [ item for item in self.doc.texts if isinstance(item, TextItem) and item.get_ref() in children ] if text_list: return text_list[-1] else: return None def store_section(self, section: str) -> None: """Store the section heading in the docling document. Only the predefined sections from PatentHeading will be handled. The other sections are created by the Field.CAPTION field. Args: section: A patent section name.""" heading: PatentHeading if self.doc is None: return elif section == self.Section.ABSTRACT.value: heading = PatentHeading.ABSTRACT elif section == self.Section.CLAIMS.value: heading = PatentHeading.CLAIMS else: return None self.level = heading.level if heading.level in self.parents else 1 self.parents[self.level + 1] = self.doc.add_heading( heading.value, level=self.level, parent=self.parents[self.level], ) self.level += 1 def store_content(self, section: str, field: str, value: str) -> None: """Store the key value within a document section in the docling document. Args: section: A patent section name. field: A field name. value: A field value name. """ if ( not self.doc or not field or field not in [item.value for item in PatentUsptoGrantAps.Field] ): return if field == self.Field.TITLE.value: self.parents[self.level + 1] = self.doc.add_title( parent=self.parents[self.level], text=value ) self.level += 1 elif field == self.Field.TEXT.value and section == self.Section.ABSTRACT.value: abst_item = self.get_last_text_item() if abst_item: abst_item.text += " " + value else: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=value, parent=self.parents[self.level], ) elif field == self.Field.NUMBER.value and section == self.Section.CLAIMS.value: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text="", parent=self.parents[self.level], ) elif ( field in ( self.Field.PARAGRAPH.value, self.Field.PARAGRAPH_1.value, self.Field.PARAGRAPH_2.value, self.Field.PARAGRAPH_3.value, ) and section == self.Section.CLAIMS.value ): last_claim = self.get_last_text_item() if last_claim is None: last_claim = self.doc.add_text( label=DocItemLabel.PARAGRAPH, text="", parent=self.parents[self.level], ) last_claim.text += f" {value}" if last_claim.text else value elif field == self.Field.CAPTION.value and section in ( self.Section.SUMMARY.value, self.Section.DETAILS.value, self.Section.DRAWINGS.value, ): # captions are siblings of abstract since no level info is provided head_item = PatentHeading.ABSTRACT self.level = head_item.level if head_item.level in self.parents else 1 self.parents[self.level + 1] = self.doc.add_heading( value, level=self.level, parent=self.parents[self.level], ) self.level += 1 elif field in ( self.Field.PARAGRAPH.value, self.Field.PARAGRAPH_1.value, self.Field.PARAGRAPH_2.value, self.Field.PARAGRAPH_3.value, ) and section in ( self.Section.SUMMARY.value, self.Section.DETAILS.value, self.Section.DRAWINGS.value, ): self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=value, parent=self.parents[self.level], ) def parse(self, patent_content: str) -> Optional[DoclingDocument]: self.doc = self.doc = DoclingDocument(name="file") section: str = "" key: str = "" value: str = "" line_num = 0 for line in patent_content.splitlines(): cols = re.split("\\s{2,}", line, maxsplit=1) if key and value and (len(cols) == 1 or (len(cols) == 2 and cols[0])): self.store_content(section, key, value) key = "" value = "" if len(cols) == 1: # section title section = cols[0] self.store_section(section) _log.debug(f"Parsing section {section}") elif len(cols) == 2: # key value if cols[0]: # key present key = cols[0] value = cols[1] elif not re.match(r"^##STR\d+##$", cols[1]): # line continues value += " " + cols[1] line_num += 1 if key and value: self.store_content(section, key, value) # TODO: parse tables return self.doc class PatentUsptoAppV1(PatentUspto): """Parser of patent documents from the US Patent Office (applications v1.x) The compatible format is: - Patent Application Full Text Data/XML Version 1.x (from March 2001 till December 2004) """ @override def __init__(self) -> None: """Build an instance of PatentUsptoAppV1 class.""" self.handler = PatentUsptoAppV1.PatentHandler() self.pattern = re.compile(r"^(
)", re.MULTILINE | re.DOTALL) @override def parse(self, patent_content: str) -> Optional[DoclingDocument]: try: xml.sax.parseString(patent_content, self.handler) except xml.sax._exceptions.SAXParseException as exc_sax: _log.error(f"Error in parsing USPTO document: {exc_sax}") return None doc = self.handler.doc if doc: raw_tables = re.findall(self.pattern, patent_content) parsed_tables: list[TableData] = [] _log.debug(f"Found {len(raw_tables)} tables to be parsed with XmlTable.") for table in raw_tables: table_parser = XmlTable(XML_DECLARATION + "\n" + table) try: table_data = table_parser.parse() if table_data: parsed_tables.append(table_data) except Exception as exc_table: _log.error(f"Error in parsing USPTO tables: {exc_table}") if len(parsed_tables) != len(doc.tables): _log.error( f"Number of referenced ({len(doc.tables)}) and parsed " f"({len(parsed_tables)}) tables differ." ) else: for idx, item in enumerate(parsed_tables): doc.tables[idx].data = item return doc class PatentHandler(xml.sax.handler.ContentHandler): """SAX ContentHandler for patent documents.""" APP_DOC_ELEMENT: Final = "patent-application-publication" @unique class Element(Enum): """Represents an element of interest in the patent application document.""" DRAWINGS = "brief-description-of-drawings", False ABSTRACT = "subdoc-abstract", False TITLE = "title-of-invention", True CLAIMS = "subdoc-claims", False CLAIM = "claim", False CLAIM_TEXT = "claim-text", True NUMBER = ("number", False) PARAGRAPH = "paragraph", True HEADING = "heading", True STYLE_SUPERSCRIPT = "superscript", True STYLE_SUBSCRIPT = "subscript", True # do not store text of a table, since it can be within paragraph TABLE = "table", False # do not store text of a formula, since it can be within paragraph MATH = "math-cwu", False @override def __new__(cls, value: str, _) -> Self: obj = object.__new__(cls) obj._value_ = value return obj @override def __init__(self, _, is_text: bool) -> None: self.is_text: bool = is_text @override def __init__(self) -> None: """Build an instance of the patent handler.""" # Current patent being parsed self.doc: Optional[DoclingDocument] = None # Keep track of docling hierarchy level self.level: LevelNumber = 1 # Keep track of docling parents by level self.parents: dict[LevelNumber, Optional[DocItem]] = {1: None} # Content to retain for the current patent self.property: list[str] self.claim: str self.claims: list[str] self.abstract: str self.text: str self._clean_data() # To handle mathematical styling self.style_html = HtmlEntity() @override def startElement(self, tag, attributes): # noqa: N802 """Signal the start of an element. Args: tag: The element tag. attributes: The element attributes. """ if tag == self.APP_DOC_ELEMENT: self.doc = DoclingDocument(name="file") self.text = "" self._start_registered_elements(tag, attributes) @override def skippedEntity(self, name): # noqa: N802 """Receive notification of a skipped entity. HTML entities will be skipped by the parser. This method will unescape them and add them to the text. Args: name: Entity name. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: escaped = self.style_html.get_greek_from_iso8879(f"&{name};") unescaped = html.unescape(escaped) if unescaped == escaped: logging.debug("Unrecognized HTML entity: " + name) return if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(unescaped, elm_val) else: self.text += unescaped @override def endElement(self, tag): # noqa: N802 """Signal the end of an element. Args: tag: The element tag. """ if tag == self.APP_DOC_ELEMENT: self._clean_data() self._end_registered_element(tag) @override def characters(self, content): """Receive notification of character data. Args: content: Data reported by the handler. """ if self.property: elm_val = self.property[-1] element = self.Element(elm_val) if element.is_text: if element in ( self.Element.STYLE_SUPERSCRIPT, self.Element.STYLE_SUBSCRIPT, ): # superscripts and subscripts need to be under text elements if len(self.property) < 2: return parent_val = self.property[-2] parent = self.Element(parent_val) if parent.is_text: self.text += self._apply_style(content, elm_val) else: self.text += content def _start_registered_elements( self, tag: str, attributes: xml.sax.xmlreader.AttributesImpl ) -> None: if tag in [member.value for member in self.Element]: # special case for claims: claim lines may start before the # previous one is closed if ( tag == self.Element.CLAIM_TEXT.value and self.property and self.property[-1] == tag and self.text.strip() ): self.claim += " " + self.text.strip("\n") self.text = "" elif tag == self.Element.HEADING.value: level_attr: str = attributes.get("lvl", "") new_level: int = int(level_attr) if level_attr.isnumeric() else 1 max_level = min(self.parents.keys()) # increase heading level with 1 for title, if any self.level = ( new_level + 1 if (new_level + 1) in self.parents else max_level ) self.property.append(tag) def _end_registered_element(self, tag: str) -> None: if tag in [elm.value for elm in self.Element] and self.property: current_tag = self.property.pop() self._add_property(current_tag, self.text) def _add_property(self, name: str, text: str) -> None: if not name or not self.doc: return if name == self.Element.TITLE.value: title = text.strip() if title: self.parents[self.level + 1] = self.doc.add_text( parent=self.parents[self.level], label=DocItemLabel.TITLE, text=title, ) self.level += 1 self.text = "" elif name == self.Element.ABSTRACT.value: abstract = self.abstract.strip() if abstract: heading_text = PatentHeading.ABSTRACT.value heading_level = ( PatentHeading.ABSTRACT.level if PatentHeading.ABSTRACT.level in self.parents else 1 ) abstract_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=self.abstract, parent=abstract_item, ) self.abstract = "" self.text = "" elif name == self.Element.CLAIM_TEXT.value: if text: self.claim += self.text.strip("\n") self.text = "" elif name == self.Element.CLAIM.value: claim = self.claim.strip() if claim: self.claims.append(claim) self.claim = "" elif name == self.Element.CLAIMS.value and self.claims: heading_text = PatentHeading.CLAIMS.value heading_level = ( PatentHeading.CLAIMS.level if PatentHeading.CLAIMS.level in self.parents else 1 ) claims_item = self.doc.add_heading( heading_text, level=heading_level, parent=self.parents[heading_level], ) for text in self.claims: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=text, parent=claims_item ) elif name in ( self.Element.PARAGRAPH.value, self.Element.HEADING.value, ): if text and self.Element.ABSTRACT.value in self.property: self.abstract = (self.abstract + text) if self.abstract else text elif text.strip(): text = re.sub("\\s+", " ", text).strip() if name == self.Element.HEADING.value: self.parents[self.level + 1] = self.doc.add_heading( text=text, level=self.level, parent=self.parents[self.level], ) self.level += 1 else: self.doc.add_text( label=DocItemLabel.PARAGRAPH, text=text, parent=self.parents[self.level], ) self.text = "" elif name == self.Element.TABLE.value: # set an empty table as placeholder empty_table = TableData(num_rows=0, num_cols=0, table_cells=[]) self.doc.add_table( data=empty_table, parent=self.parents[self.level], ) def _apply_style(self, text: str, style_tag: str) -> str: """Apply an HTML style to text. Args: text: A string containing plain text. style_tag: An HTML tag name for styling text. If the tag name is not recognized as one of the supported styles, the method will return the original `text`. Returns: A string after applying the style. """ formatted = html.unescape(text) if style_tag == self.Element.STYLE_SUPERSCRIPT.value: formatted = html.unescape(self.style_html.get_superscript(formatted)) elif style_tag == self.Element.STYLE_SUBSCRIPT.value: formatted = html.unescape(self.style_html.get_subscript(formatted)) return formatted def _clean_data(self): """Reset the variables from stream data.""" self.property = [] self.abstract = "" self.claim = "" self.claims = [] self.text = "" class XmlTable: """Provide a table parser for xml tables in USPTO patent documents. The OASIS Open XML Exchange Table Model can be downloaded from: http://oasis-open.org/specs/soextblx.dtd """ class MinColInfoType(TypedDict): offset: list[int] colwidth: list[int] class ColInfoType(MinColInfoType): cell_range: list[int] cell_offst: list[int] def __init__(self, input: str) -> None: """Initialize the table parser with the xml content. Args: input: The xml content. """ self.max_nbr_messages = 2 self.nbr_messages = 0 self.empty_text = "" self._soup = BeautifulSoup(input, features="xml") def _create_tg_range(self, tgs: list[dict[str, Any]]) -> dict[int, ColInfoType]: """Create a unified range along the table groups. Args: tgs: Table group column specifications. Returns: Unified group column specifications. """ colinfo: dict[int, XmlTable.ColInfoType] = {} if len(tgs) == 0: return colinfo for itg, tg in enumerate(tgs): colinfo[itg] = { "offset": [], "colwidth": [], "cell_range": [], "cell_offst": [0], } offst = 0 for info in tg["colinfo"]: cw = info["colwidth"] cw = re.sub("pt", "", cw, flags=re.I) cw = re.sub("mm", "", cw, flags=re.I) try: cw = int(cw) except BaseException: cw = float(cw) colinfo[itg]["colwidth"].append(cw) colinfo[itg]["offset"].append(offst) offst += cw colinfo[itg]["offset"].append(offst) min_colinfo: XmlTable.MinColInfoType = {"offset": [], "colwidth": []} min_colinfo["offset"] = colinfo[0]["offset"] offset_w0 = [] for itg, col in colinfo.items(): # keep track of col with 0 width for ic, cw in enumerate(col["colwidth"]): if cw == 0: offset_w0.append(col["offset"][ic]) min_colinfo["offset"] = sorted( list(set(col["offset"] + min_colinfo["offset"])) ) # add back the 0 width cols to offset list offset_w0 = list(set(offset_w0)) min_colinfo["offset"] = sorted(min_colinfo["offset"] + offset_w0) for i in range(len(min_colinfo["offset"]) - 1): min_colinfo["colwidth"].append( min_colinfo["offset"][i + 1] - min_colinfo["offset"][i] ) for itg, col in colinfo.items(): i = 1 range_ = 1 for min_i in range(1, len(min_colinfo["offset"])): min_offst = min_colinfo["offset"][min_i] offst = col["offset"][i] if min_offst == offst: if ( len(col["offset"]) == i + 1 and len(min_colinfo["offset"]) > min_i + 1 ): range_ += 1 else: col["cell_range"].append(range_) col["cell_offst"].append(col["cell_offst"][-1] + range_) range_ = 1 i += 1 elif min_offst < offst: range_ += 1 else: _log.debug("A USPTO XML table has wrong offsets.") return {} return colinfo def _get_max_ncols(self, tgs_info: dict[int, ColInfoType]) -> NonNegativeInt: """Get the maximum number of columns across table groups. Args: tgs_info: Unified group column specifications. Return: The maximum number of columns. """ ncols_max = 0 for rowinfo in tgs_info.values(): ncols_max = max(ncols_max, len(rowinfo["colwidth"])) return ncols_max def _parse_table(self, table: Tag) -> TableData: """Parse the content of a table tag. Args: The table element. Returns: A docling table object. """ tgs_align = [] tg_secs = table.find_all("tgroup") if tg_secs: for tg_sec in tg_secs: ncols = tg_sec.get("cols", None) if ncols: ncols = int(ncols) tg_align = {"ncols": ncols, "colinfo": []} cs_secs = tg_sec.find_all("colspec") if cs_secs: for cs_sec in cs_secs: colname = cs_sec.get("colname", None) colwidth = cs_sec.get("colwidth", None) tg_align["colinfo"].append( {"colname": colname, "colwidth": colwidth} ) tgs_align.append(tg_align) # create unified range along the table groups tgs_range = self._create_tg_range(tgs_align) # if the structure is broken, return an empty table if not tgs_range: dl_table = TableData(num_rows=0, num_cols=0, table_cells=[]) return dl_table ncols_max = self._get_max_ncols(tgs_range) # extract table data table_data: list[TableCell] = [] i_row_global = 0 is_row_empty: bool = True tg_secs = table.find_all("tgroup") if tg_secs: for itg, tg_sec in enumerate(tg_secs): tg_range = tgs_range[itg] row_secs = tg_sec.find_all(["row", "tr"]) if row_secs: for row_sec in row_secs: entry_secs = row_sec.find_all(["entry", "td"]) is_header: bool = row_sec.parent.name in ["thead"] ncols = 0 local_row: list[TableCell] = [] is_row_empty = True if entry_secs: wrong_nbr_cols = False for ientry, entry_sec in enumerate(entry_secs): text = entry_sec.get_text().strip() # start-end namest = entry_sec.attrs.get("namest", None) nameend = entry_sec.attrs.get("nameend", None) if isinstance(namest, str) and namest.isnumeric(): namest = int(namest) else: namest = ientry + 1 if isinstance(nameend, str) and nameend.isnumeric(): nameend = int(nameend) shift = 0 else: nameend = ientry + 2 shift = 1 if nameend > len(tg_range["cell_offst"]): wrong_nbr_cols = True self.nbr_messages += 1 if self.nbr_messages <= self.max_nbr_messages: _log.debug( "USPTO table has # entries != # columns" ) break range_ = [ tg_range["cell_offst"][namest - 1], tg_range["cell_offst"][nameend - 1] - shift, ] # add row and replicate cell if needed cell_text = text if text else self.empty_text if cell_text != self.empty_text: is_row_empty = False for irep in range(range_[0], range_[1] + 1): ncols += 1 local_row.append( TableCell( column_header=is_header, text=cell_text, start_row_offset_idx=i_row_global, end_row_offset_idx=i_row_global + 1, row_span=1, start_col_offset_idx=range_[0], end_col_offset_idx=range_[1] + 1, col_span=range_[1] - range_[0] + 1, ) ) if wrong_nbr_cols: # keep empty text, not to introduce noise local_row = [] ncols = 0 # add empty cell up to ncols_max for irep in range(ncols, ncols_max): local_row.append( TableCell( column_header=is_header, text=self.empty_text, start_row_offset_idx=i_row_global, end_row_offset_idx=i_row_global + 1, row_span=1, start_col_offset_idx=irep, end_col_offset_idx=irep + 1, col_span=1, ) ) # do not add empty rows if not is_row_empty: table_data.extend(local_row) i_row_global += 1 dl_table = TableData( num_rows=i_row_global, num_cols=ncols_max, table_cells=table_data ) return dl_table def parse(self) -> Optional[TableData]: """Parse the first table from an xml content. Returns: A docling table data. """ section = self._soup.find("table") if section is not None: table = self._parse_table(section) if table.num_rows == 0 or table.num_cols == 0: _log.warning("The parsed USPTO table is empty") return table else: return None class HtmlEntity: """Provide utility functions to get the HTML entities of styled characters. This class has been developped from: https://unicode-table.com/en/html-entities/ https://www.w3.org/TR/WD-math-970515/table03.html """ def __init__(self): """Initialize this class by loading the HTML entity dictionaries.""" self.superscript = str.maketrans( { "1": "¹", "2": "²", "3": "³", "4": "⁴", "5": "⁵", "6": "⁶", "7": "⁷", "8": "⁸", "9": "⁹", "0": "⁰", "+": "⁺", "-": "⁻", "−": "⁻", "=": "⁼", "(": "⁽", ")": "⁾", "a": "ª", "o": "º", "i": "ⁱ", "n": "ⁿ", } ) self.subscript = str.maketrans( { "1": "₁", "2": "₂", "3": "₃", "4": "₄", "5": "₅", "6": "₆", "7": "₇", "8": "₈", "9": "₉", "0": "₀", "+": "₊", "-": "₋", "−": "₋", "=": "₌", "(": "₍", ")": "₎", "a": "ₐ", "e": "ₑ", "o": "ₒ", "x": "ₓ", } ) self.mathematical_italic = str.maketrans( { "A": "𝐴", "B": "𝐵", "C": "𝐶", "D": "𝐷", "E": "𝐸", "F": "𝐹", "G": "𝐺", "H": "𝐻", "I": "𝐼", "J": "𝐽", "K": "𝐾", "L": "𝐿", "M": "𝑀", "N": "𝑁", "O": "𝑂", "P": "𝑃", "Q": "𝑄", "R": "𝑅", "S": "𝑆", "T": "𝑇", "U": "𝑈", "V": "𝑉", "W": "𝑊", "Y": "𝑌", "Z": "𝑍", "a": "𝑎", "b": "𝑏", "c": "𝑐", "d": "𝑑", "e": "𝑒", "f": "𝑓", "g": "𝑔", "h": "𝑕", "i": "𝑖", "j": "𝑗", "k": "𝑘", "l": "𝑙", "m": "𝑚", "n": "𝑛", "o": "𝑜", "p": "𝑝", "q": "𝑞", "r": "𝑟", "s": "𝑠", "t": "𝑡", "u": "𝑢", "v": "𝑣", "w": "𝑤", "x": "𝑥", "y": "𝑦", "z": "𝑧", } ) self.lookup_iso8879 = { "&Agr;": "Α", "&Bgr;": "Β", "&Ggr;": "Γ", "&Dgr;": "Δ", "&Egr;": "Ε", "&Zgr;": "Ζ", "&EEgr;": "Η", "&THgr;": "Θ", "&Igr;": "Ι", "&Kgr;": "Κ", "&Lgr;": "Λ", "&Mgr;": "Μ", "&Ngr;": "Ν", "&Xgr;": "Ξ", "&Ogr;": "Ο", "&Pgr;": "Π", "&Rgr;": "Ρ", "&Sgr;": "Σ", "&Tgr;": "Τ", "&Ugr;": "Υ", "&PHgr;": "Φ", "&KHgr;": "Χ", "&PSgr;": "Ψ", "&OHgr;": "Ω", "&agr;": "α", "&bgr;": "β", "&ggr;": "γ", "&dgr;": "δ", "&egr;": "ε", "&zgr;": "ζ", "&eegr;": "η", "&thgr;": "θ", "&igr;": "ι", "&kgr;": "κ", "&lgr;": "λ", "&mgr;": "μ", "&ngr;": "ν", "&xgr;": "ξ", "&ogr;": "ο", "&pgr;": "π", "&rgr;": "ρ", "&sgr;": "ς", "&tgr;": "τ", "&ugr;": "υ", "&phgr;": "φ", "&khgr;": "χ", "&psgr;": "ψ", "&ohgr;": "ω", } def get_superscript(self, text: str) -> str: """Get a text in superscript as HTML entities. Args: text: The text to transform. Returns: The text in superscript as HTML entities. """ return text.translate(self.superscript) def get_subscript(self, text: str) -> str: """Get a text in subscript as HTML entities. Args: The text to transform. Returns: The text in subscript as HTML entities. """ return text.translate(self.subscript) def get_math_italic(self, text: str) -> str: """Get a text in italic as HTML entities. Args: The text to transform. Returns: The text in italics as HTML entities. """ return text.translate(self.mathematical_italic) def get_greek_from_iso8879(self, text: str) -> str: """Get an HTML entity of a greek letter in ISO 8879. Args: The text to transform, as an ISO 8879 entitiy. Returns: The HTML entity representing a greek letter. If the input text is not supported, the original text is returned. """ return self.lookup_iso8879.get(text, text)