Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added excel backend #334

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions docling/backend/msexcel_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
import logging
from io import BytesIO
from pathlib import Path
from typing import Set, Tuple, Union

from docling_core.types.doc import (
DocItemLabel,
DoclingDocument,
DocumentOrigin,
GroupLabel,
TableCell,
TableData,
)
from lxml import etree
from openpyxl import Workbook, load_workbook
from openpyxl.cell.cell import Cell
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image

from docling.backend.abstract_backend import DeclarativeDocumentBackend
from docling.datamodel.base_models import InputFormat
from docling.datamodel.document import InputDocument

_log = logging.getLogger(__name__)


class MsExcelDocumentBackend(DeclarativeDocumentBackend):

def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]):
super().__init__(in_doc, path_or_stream)

# Initialise the parents for the hierarchy
self.max_levels = 10

self.parents = {} # type: ignore
for i in range(-1, self.max_levels):
self.parents[i] = None

self.workbook = None
try:
if isinstance(self.path_or_stream, BytesIO):
self.workbook = load_workbook(filename=self.path_or_stream)

elif isinstance(self.path_or_stream, Path):
self.workbook = load_workbook(filename=str(self.path_or_stream))

self.valid = True
except Exception as e:
self.valid = False

raise RuntimeError(
f"MsPowerpointDocumentBackend could not load document with hash {self.document_hash}"
) from e

def is_valid(self) -> bool:
_log.info(f"valid: {self.valid}")
return self.valid

@classmethod
def supports_pagination(cls) -> bool:
return True

def unload(self):
if isinstance(self.path_or_stream, BytesIO):
self.path_or_stream.close()

self.path_or_stream = None

@classmethod
def supported_formats(cls) -> Set[InputFormat]:
return {InputFormat.XLSX}

def convert(self) -> DoclingDocument:
# Parses the DOCX into a structured document model.

origin = DocumentOrigin(
filename=self.file.name or "file",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
binary_hash=self.document_hash,
)

doc = DoclingDocument(name=self.file.stem or "file", origin=origin)

if self.is_valid():
doc = self.convert_workbook(doc)
else:
raise RuntimeError(
f"Cannot convert doc with {self.document_hash} because the backend failed to init."
)

return doc

def get_level(self) -> int:
"""Return the first None index."""
for k, v in self.parents.items():
if k >= 0 and v == None:
return k
return 0

def convert_workbook(self, doc: DoclingDocument) -> DoclingDocument:

if self.workbook is not None:

# Iterate over all sheets
for sheet_name in self.workbook.sheetnames:
_log.info(f"Processing sheet: {sheet_name}")

sheet = self.workbook[sheet_name] # Access the sheet by name

# level = self.get_level()
self.parents[0] = doc.add_group(
parent=None, # self.parents[level-1],
label=GroupLabel.SECTION,
name=f"sheet: {sheet_name}",
)

doc = self.convert_sheet(doc, sheet)
else:
_log.error("Workbook is not initialized.")

return doc

def convert_sheet(self, doc: DoclingDocument, sheet: Worksheet):

doc = self.find_tables_in_sheet(doc, sheet)

doc = self.find_images_in_sheet(doc, sheet)

return doc

def find_tables_in_sheet(self, doc: DoclingDocument, sheet: Worksheet):

tables = self.find_data_tables(sheet)

for excel_table in tables:
num_rows = excel_table["num_rows"]
num_cols = excel_table["num_cols"]

table_data = TableData(
num_rows=num_rows,
num_cols=num_cols,
table_cells=[],
)

for excel_cell in excel_table["data"]:

cell = TableCell(
text=str(excel_cell["cell"].value),
row_span=excel_cell["row_span"],
col_span=excel_cell["col_span"],
start_row_offset_idx=excel_cell["row"],
end_row_offset_idx=excel_cell["row"] + excel_cell["row_span"],
start_col_offset_idx=excel_cell["col"],
end_col_offset_idx=excel_cell["col"] + excel_cell["col_span"],
col_header=False, # col_header,
row_header=False, # ((not col_header) and html_cell.name=='th')
)
table_data.table_cells.append(cell)

doc.add_table(data=table_data, parent=self.parents[0])

return doc

def find_data_tables(self, sheet: Worksheet):
"""
Find all compact rectangular data tables in a sheet.
"""

tables = [] # List to store found tables
visited: set[Tuple[int, int]] = set() # Track already visited cells

# Iterate over all cells in the sheet
for ri, row in enumerate(sheet.iter_rows(values_only=False)):
for rj, cell in enumerate(row):

# Skip empty or already visited cells
if cell.value is None or (ri, rj) in visited:
continue

# If the cell starts a new table, find its bounds
table_bounds, visited_cells = self.find_table_bounds(
sheet, ri, rj, visited
)

visited.update(visited_cells) # Mark these cells as visited
tables.append(table_bounds)

return tables

def find_table_bounds(
self,
sheet: Worksheet,
start_row: int,
start_col: int,
visited: set[Tuple[int, int]],
):
"""
Determine the bounds of a compact rectangular table.
Returns:
- A dictionary with the bounds and data.
- A set of visited cell coordinates.
"""
_log.info("find_table_bounds")

max_row = start_row
max_col = start_col

# Expand downward to find the table's bottom boundary
while (
max_row < sheet.max_row - 1
and sheet.cell(row=max_row + 2, column=start_col + 1).value is not None
):
max_row += 1

# Expand rightward to find the table's right boundary
while (
max_col < sheet.max_column - 1
and sheet.cell(row=start_row + 1, column=max_col + 2).value is not None
):
max_col += 1

# Collect the data within the bounds
data = []
visited_cells = set()
for ri in range(start_row, max_row + 1):
# row_data = []
for rj in range(start_col, max_col + 1):

cell = sheet.cell(row=ri + 1, column=rj + 1) # 1-based indexing

# Check if the cell belongs to a merged range
row_span = 1
col_span = 1
for merged_range in sheet.merged_cells.ranges:
if (ri + 1, rj + 1) in merged_range:
# Calculate the spans
row_span = merged_range.max_row - merged_range.min_row + 1
col_span = merged_range.max_col - merged_range.min_col + 1
break

data.append(
{
"row": ri - start_row,
"col": rj - start_col,
"cell": cell,
"row_span": row_span,
"col_span": col_span,
}
)

# Mark all cells in the span as visited
for span_row in range(ri, ri + row_span):
for span_col in range(rj, rj + col_span):
visited_cells.add((span_row, span_col))

return {
"beg_row": start_row,
"beg_col": start_col,
"end_row": max_row,
"end_col": max_col,
"num_rows": max_row + 1 - start_row,
"num_cols": max_col + 1 - start_col,
"data": data,
}, visited_cells

def find_images_in_sheet(self, doc: DoclingDocument, sheet: Worksheet) -> DoclingDocument:

# FIXME
"""
# Iterate over images in the sheet
for idx, image in enumerate(sheet._images): # Access embedded images
# Save the image to the output folder
image_path = f"{output_folder}/{sheet_name}_image_{idx + 1}.png"
with open(image_path, "wb") as img_file:
img_file.write(image.ref.blob)
print(f"Image saved to: {image_path}")
"""

return doc
6 changes: 6 additions & 0 deletions docling/datamodel/base_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class InputFormat(str, Enum):
PDF = "pdf"
ASCIIDOC = "asciidoc"
MD = "md"
XLSX = "xlsx"


class OutputFormat(str, Enum):
Expand All @@ -49,6 +50,7 @@ class OutputFormat(str, Enum):
InputFormat.HTML: ["html", "htm", "xhtml"],
InputFormat.IMAGE: ["jpg", "jpeg", "png", "tif", "tiff", "bmp"],
InputFormat.ASCIIDOC: ["adoc", "asciidoc", "asc"],
InputFormat.XLSX: ["xlsx"],
}

FormatToMimeType: Dict[InputFormat, List[str]] = {
Expand All @@ -72,7 +74,11 @@ class OutputFormat(str, Enum):
InputFormat.PDF: ["application/pdf"],
InputFormat.ASCIIDOC: ["text/asciidoc"],
InputFormat.MD: ["text/markdown", "text/x-markdown"],
InputFormat.XLSX: [
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
],
}

MimeTypeToFormat = {
mime: fmt for fmt, mimes in FormatToMimeType.items() for mime in mimes
}
Expand Down
9 changes: 9 additions & 0 deletions docling/document_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from docling.backend.docling_parse_backend import DoclingParseDocumentBackend
from docling.backend.html_backend import HTMLDocumentBackend
from docling.backend.md_backend import MarkdownDocumentBackend
from docling.backend.msexcel_backend import MsExcelDocumentBackend
from docling.backend.mspowerpoint_backend import MsPowerpointDocumentBackend
from docling.backend.msword_backend import MsWordDocumentBackend
from docling.datamodel.base_models import ConversionStatus, DocumentStream, InputFormat
Expand Down Expand Up @@ -44,6 +45,11 @@ def set_optional_field_default(self) -> "FormatOption":
return self


class ExcelFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MsExcelDocumentBackend


class WordFormatOption(FormatOption):
pipeline_cls: Type = SimplePipeline
backend: Type[AbstractDocumentBackend] = MsWordDocumentBackend
Expand Down Expand Up @@ -80,6 +86,9 @@ class ImageFormatOption(FormatOption):


_format_to_default_options = {
InputFormat.XLSX: FormatOption(
pipeline_cls=SimplePipeline, backend=MsExcelDocumentBackend
),
InputFormat.DOCX: FormatOption(
pipeline_cls=SimplePipeline, backend=MsWordDocumentBackend
),
Expand Down
Loading
Loading