From fef09cf481bdcb9c8c43ac4964f6ba72631be342 Mon Sep 17 00:00:00 2001 From: psainics Date: Mon, 4 Mar 2024 05:18:26 +0530 Subject: [PATCH] MAGIC ! --- core-plugins/pom.xml | 12 ++++++ .../plugin/batch/source/ExcelInputFormat.java | 38 +++++++++++++++---- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/core-plugins/pom.xml b/core-plugins/pom.xml index ffa628271..74f445c69 100644 --- a/core-plugins/pom.xml +++ b/core-plugins/pom.xml @@ -193,6 +193,18 @@ poi-ooxml 5.2.4 + + com.github.pjfanning + excel-streaming-reader + compile + 4.2.1 + + + com.github.pjfanning + poi-shared-strings + compile + 2.8.0 + test org.apache.sshd diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java index 1b755539d..f8e011881 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/source/ExcelInputFormat.java @@ -31,6 +31,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.poi.EmptyFileException; +import org.apache.poi.poifs.filesystem.FileMagic; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.DateUtil; import org.apache.poi.ss.usermodel.Row; @@ -40,6 +42,7 @@ import org.apache.poi.ss.util.CellReference; import java.io.IOException; +import java.io.InputStream; import java.util.Iterator; @@ -146,11 +149,31 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro String sheet = job.get(SHEET); String sheetValue = job.get(SHEET_VALUE); - Sheet workSheet; // sheet can be used as common for XSSF and HSSF workbook - // match regex pattern *.xls or *.xlsx + Sheet workSheet; + Workbook workbook; + boolean isStreaming = false; try { - // Workbook workbook = WorkbookFactory.create(fileIn); - Workbook workbook = StreamingReader.builder().rowCacheSize(10).open(fileIn); + // Use Magic Bytes to detect the file type + InputStream is = FileMagic.prepareToCheckMagic(fileIn); + byte[] emptyFileCheck = new byte[1]; + is.mark(emptyFileCheck.length); + if (is.read(emptyFileCheck) < emptyFileCheck.length) { + throw new EmptyFileException(); + } + is.reset(); + + final FileMagic fm = FileMagic.valueOf(is); + switch (fm) { + case OOXML: + workbook = StreamingReader.builder().rowCacheSize(10).open(fileIn); + isStreaming = true; + break; + case OLE2: + workbook = WorkbookFactory.create(fileIn); + break; + default: + throw new IOException("Can't open workbook - unsupported file type: " + fm); + } if (sheet.equalsIgnoreCase(SHEET_NAME)) { workSheet = workbook.getSheet(sheetValue); } else { @@ -160,8 +183,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e); } -// rowCount = job.getInt(ROWS_LIMIT, workSheet.getPhysicalNumberOfRows()); - rowCount = job.getInt(ROWS_LIMIT, 10000); + // As we cannot get the number of rows in a sheet while streaming. + // -1 is used as rowCount to indicate that all rows should be read. + rowCount = job.getInt(ROWS_LIMIT, isStreaming ? -1 : workSheet.getPhysicalNumberOfRows()); rows = workSheet.iterator(); lastRowNum = workSheet.getLastRowNum(); rowIdx = 0; @@ -175,7 +199,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro } @Override - public boolean nextKeyValue() throws IOException, InterruptedException { + public boolean nextKeyValue() { if (!rows.hasNext() || rowCount == 0) { return false; }