| | import logging |
| | import math |
| | import sys |
| | import time |
| | from functools import partial |
| | from pathlib import Path |
| | from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union |
| |
|
| | from pydantic import BaseModel, ConfigDict, model_validator, validate_call |
| |
|
| | from docling.backend.abstract_backend import AbstractDocumentBackend |
| | from docling.backend.asciidoc_backend import AsciiDocBackend |
| | from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend |
| | from docling.backend.html_backend import HTMLDocumentBackend |
| | from docling.backend.json.docling_json_backend import DoclingJSONBackend |
| | from docling.backend.md_backend import MarkdownDocumentBackend |
| | from docling.backend.msexcel_backend import MsExcelDocumentBackend |
| | from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend |
| | from docling.backend.msword_backend import MsWordDocumentBackend |
| | from docling.backend.xml.pubmed_backend import PubMedDocumentBackend |
| | from docling.backend.xml.uspto_backend import PatentUsptoDocumentBackend |
| | from docling.datamodel.base_models import ( |
| | ConversionStatus, |
| | DoclingComponentType, |
| | DocumentStream, |
| | ErrorItem, |
| | InputFormat, |
| | ) |
| | from docling.datamodel.document import ( |
| | ConversionResult, |
| | InputDocument, |
| | _DocumentConversionInput, |
| | ) |
| | from docling.datamodel.pipeline_options import PipelineOptions |
| | from docling.datamodel.settings import ( |
| | DEFAULT_PAGE_RANGE, |
| | DocumentLimits, |
| | PageRange, |
| | settings, |
| | ) |
| | from docling.exceptions import ConversionError |
| | from docling.pipeline.base_pipeline import BasePipeline |
| | from docling.pipeline.simple_pipeline import SimplePipeline |
| | from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline |
| | from docling.utils.utils import chunkify |
| |
|
| | _log = logging.getLogger(__name__) |
| |
|
| |
|
| | class FormatOption(BaseModel): |
| | pipeline_cls: Type[BasePipeline] |
| | pipeline_options: Optional[PipelineOptions] = None |
| | backend: Type[AbstractDocumentBackend] |
| |
|
| | model_config = ConfigDict(arbitrary_types_allowed=True) |
| |
|
| | @model_validator(mode="after") |
| | def set_optional_field_default(self) -> "FormatOption": |
| | if self.pipeline_options is None: |
| | self.pipeline_options = self.pipeline_cls.get_default_options() |
| | return self |
| |
|
| |
|
| | class ExcelFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend |
| |
|
| |
|
| | class WordFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend |
| |
|
| |
|
| | class PowerpointFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = MsPowerpointDocumentBackend |
| |
|
| |
|
| | class MarkdownFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = MarkdownDocumentBackend |
| |
|
| |
|
| | class AsciiDocFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = AsciiDocBackend |
| |
|
| |
|
| | class HTMLFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = HTMLDocumentBackend |
| |
|
| |
|
| | class PatentUsptoFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[PatentUsptoDocumentBackend] = PatentUsptoDocumentBackend |
| |
|
| |
|
| | class XMLPubMedFormatOption(FormatOption): |
| | pipeline_cls: Type = SimplePipeline |
| | backend: Type[AbstractDocumentBackend] = PubMedDocumentBackend |
| |
|
| |
|
| | class ImageFormatOption(FormatOption): |
| | pipeline_cls: Type = StandardPdfPipeline |
| | backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
| |
|
| |
|
| | class PdfFormatOption(FormatOption): |
| | pipeline_cls: Type = StandardPdfPipeline |
| | backend: Type[AbstractDocumentBackend] = DoclingParseV2DocumentBackend |
| |
|
| |
|
| | def _get_default_option(format: InputFormat) -> FormatOption: |
| | format_to_default_options = { |
| | InputFormat.XLSX: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend |
| | ), |
| | InputFormat.DOCX: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend |
| | ), |
| | InputFormat.PPTX: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=MsPowerpointDocumentBackend |
| | ), |
| | InputFormat.MD: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=MarkdownDocumentBackend |
| | ), |
| | InputFormat.ASCIIDOC: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=AsciiDocBackend |
| | ), |
| | InputFormat.HTML: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=HTMLDocumentBackend |
| | ), |
| | InputFormat.XML_USPTO: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=PatentUsptoDocumentBackend |
| | ), |
| | InputFormat.XML_PUBMED: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=PubMedDocumentBackend |
| | ), |
| | InputFormat.IMAGE: FormatOption( |
| | pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
| | ), |
| | InputFormat.PDF: FormatOption( |
| | pipeline_cls=StandardPdfPipeline, backend=DoclingParseV2DocumentBackend |
| | ), |
| | InputFormat.JSON_DOCLING: FormatOption( |
| | pipeline_cls=SimplePipeline, backend=DoclingJSONBackend |
| | ), |
| | } |
| | if (options := format_to_default_options.get(format)) is not None: |
| | return options |
| | else: |
| | raise RuntimeError(f"No default options configured for {format}") |
| |
|
| |
|
| | class DocumentConverter: |
| | _default_download_filename = "file" |
| |
|
| | def __init__( |
| | self, |
| | allowed_formats: Optional[List[InputFormat]] = None, |
| | format_options: Optional[Dict[InputFormat, FormatOption]] = None, |
| | ): |
| | self.allowed_formats = ( |
| | allowed_formats if allowed_formats is not None else [e for e in InputFormat] |
| | ) |
| | self.format_to_options = { |
| | format: ( |
| | _get_default_option(format=format) |
| | if (custom_option := (format_options or {}).get(format)) is None |
| | else custom_option |
| | ) |
| | for format in self.allowed_formats |
| | } |
| | self.initialized_pipelines: Dict[Type[BasePipeline], BasePipeline] = {} |
| |
|
| | def initialize_pipeline(self, format: InputFormat): |
| | """Initialize the conversion pipeline for the selected format.""" |
| | pipeline = self._get_pipeline(doc_format=format) |
| | if pipeline is None: |
| | raise ConversionError( |
| | f"No pipeline could be initialized for format {format}" |
| | ) |
| |
|
| | @validate_call(config=ConfigDict(strict=True)) |
| | def convert( |
| | self, |
| | source: Union[Path, str, DocumentStream], |
| | headers: Optional[Dict[str, str]] = None, |
| | raises_on_error: bool = True, |
| | max_num_pages: int = sys.maxsize, |
| | max_file_size: int = sys.maxsize, |
| | page_range: PageRange = DEFAULT_PAGE_RANGE, |
| | ) -> ConversionResult: |
| | all_res = self.convert_all( |
| | source=[source], |
| | raises_on_error=raises_on_error, |
| | max_num_pages=max_num_pages, |
| | max_file_size=max_file_size, |
| | headers=headers, |
| | page_range=page_range, |
| | ) |
| | return next(all_res) |
| |
|
| | @validate_call(config=ConfigDict(strict=True)) |
| | def convert_all( |
| | self, |
| | source: Iterable[Union[Path, str, DocumentStream]], |
| | headers: Optional[Dict[str, str]] = None, |
| | raises_on_error: bool = True, |
| | max_num_pages: int = sys.maxsize, |
| | max_file_size: int = sys.maxsize, |
| | page_range: PageRange = DEFAULT_PAGE_RANGE, |
| | ) -> Iterator[ConversionResult]: |
| | limits = DocumentLimits( |
| | max_num_pages=max_num_pages, |
| | max_file_size=max_file_size, |
| | page_range=page_range, |
| | ) |
| | conv_input = _DocumentConversionInput( |
| | path_or_stream_iterator=source, limits=limits, headers=headers |
| | ) |
| | conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error) |
| |
|
| | had_result = False |
| | for conv_res in conv_res_iter: |
| | had_result = True |
| | if raises_on_error and conv_res.status not in { |
| | ConversionStatus.SUCCESS, |
| | ConversionStatus.PARTIAL_SUCCESS, |
| | }: |
| | raise ConversionError( |
| | f"Conversion failed for: {conv_res.input.file} with status: {conv_res.status}" |
| | ) |
| | else: |
| | yield conv_res |
| |
|
| | if not had_result and raises_on_error: |
| | raise ConversionError( |
| | f"Conversion failed because the provided file has no recognizable format or it wasn't in the list of allowed formats." |
| | ) |
| |
|
| | def _convert( |
| | self, conv_input: _DocumentConversionInput, raises_on_error: bool |
| | ) -> Iterator[ConversionResult]: |
| | start_time = time.monotonic() |
| |
|
| | for input_batch in chunkify( |
| | conv_input.docs(self.format_to_options), |
| | settings.perf.doc_batch_size, |
| | ): |
| | _log.info(f"Going to convert document batch...") |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | for item in map( |
| | partial(self._process_document, raises_on_error=raises_on_error), |
| | input_batch, |
| | ): |
| | elapsed = time.monotonic() - start_time |
| | start_time = time.monotonic() |
| | _log.info( |
| | f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." |
| | ) |
| | yield item |
| |
|
| | def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: |
| | fopt = self.format_to_options.get(doc_format) |
| |
|
| | if fopt is None: |
| | return None |
| | else: |
| | pipeline_class = fopt.pipeline_cls |
| | pipeline_options = fopt.pipeline_options |
| |
|
| | if pipeline_options is None: |
| | return None |
| | |
| | if ( |
| | pipeline_class not in self.initialized_pipelines |
| | or self.initialized_pipelines[pipeline_class].pipeline_options |
| | != pipeline_options |
| | ): |
| | self.initialized_pipelines[pipeline_class] = pipeline_class( |
| | pipeline_options=pipeline_options |
| | ) |
| | return self.initialized_pipelines[pipeline_class] |
| |
|
| | def _process_document( |
| | self, in_doc: InputDocument, raises_on_error: bool |
| | ) -> ConversionResult: |
| |
|
| | valid = ( |
| | self.allowed_formats is not None and in_doc.format in self.allowed_formats |
| | ) |
| | if valid: |
| | conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error) |
| | else: |
| | error_message = f"File format not allowed: {in_doc.file}" |
| | if raises_on_error: |
| | raise ConversionError(error_message) |
| | else: |
| | error_item = ErrorItem( |
| | component_type=DoclingComponentType.USER_INPUT, |
| | module_name="", |
| | error_message=error_message, |
| | ) |
| | conv_res = ConversionResult( |
| | input=in_doc, status=ConversionStatus.SKIPPED, errors=[error_item] |
| | ) |
| |
|
| | return conv_res |
| |
|
| | def _execute_pipeline( |
| | self, in_doc: InputDocument, raises_on_error: bool |
| | ) -> ConversionResult: |
| | if in_doc.valid: |
| | pipeline = self._get_pipeline(in_doc.format) |
| | if pipeline is not None: |
| | conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error) |
| | else: |
| | if raises_on_error: |
| | raise ConversionError( |
| | f"No pipeline could be initialized for {in_doc.file}." |
| | ) |
| | else: |
| | conv_res = ConversionResult( |
| | input=in_doc, |
| | status=ConversionStatus.FAILURE, |
| | ) |
| | else: |
| | if raises_on_error: |
| | raise ConversionError(f"Input document {in_doc.file} is not valid.") |
| |
|
| | else: |
| | |
| | conv_res = ConversionResult( |
| | input=in_doc, |
| | status=ConversionStatus.FAILURE, |
| | ) |
| | |
| |
|
| | return conv_res |
| |
|