Skip to content

Commit

Permalink
Merge pull request #1390 from ashitsalesforce/master
Browse files Browse the repository at this point in the history
refactor common code in CSV and Database readers
  • Loading branch information
ashitsalesforce authored Nov 29, 2024
2 parents 1fb10bc + c989127 commit 0640fa3
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2015, salesforce.com, inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the
* following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
* the following disclaimer in the documentation and/or other materials provided with the distribution.
*
* Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.dataloader.dao;

import java.util.List;

import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.DAORowUtil;

public abstract class AbstractDataReaderImpl implements DataReader {
private AppConfig appConfig;
private DAORowCache rowCache = new DAORowCache();
private int currentRowNumber;
private int totalRows = 0;

public AbstractDataReaderImpl(AppConfig appConfig) {
this.appConfig = appConfig;
}

public List<TableRow> readTableRowList(int maxRows) throws DataAccessObjectException {
List<TableRow> rowList = null;
if (this.rowCache.size() >= this.currentRowNumber + maxRows) {
rowList = this.rowCache.getRows(currentRowNumber, maxRows);
currentRowNumber = currentRowNumber + rowList.size();
return rowList;
} else {
return readTableRowListFromDAO(maxRows);
}
}

public void open() throws DataAccessObjectInitializationException{
if (isOpenFlag()) {
close();
}
if (!appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
|| rowCache.size() == 0) {
openDAO();
}
currentRowNumber = 0;
rowCache.resetCurrentRowIndex();
setOpenFlag(true);
}

public TableRow readTableRow() throws DataAccessObjectException {
if (!isOpenFlag()) {
open();
}

// look in the cache first
TableRow trow = rowCache.getCurrentRow();
if (trow != null) {
currentRowNumber++;
return trow;
}
// not found in cache. Try from DAO.
trow = readTableRowFromDAO();
if (trow == null) {
this.totalRows = currentRowNumber;
return null;
}
currentRowNumber++;
if (appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)) {
rowCache.addRow(trow);
}
return trow;
}

public int getCurrentRowNumber() {
return this.currentRowNumber;
}

public int getTotalRows() throws DataAccessObjectException {
if (totalRows == 0) {
if (!isOpenFlag()) {
open();
}
totalRows = DAORowUtil.calculateTotalRows(this);
}
return totalRows;
}

protected AppConfig getAppConfig() {
return this.appConfig;
}

abstract protected void setOpenFlag(boolean open);
abstract protected boolean isOpenFlag();
abstract protected void openDAO() throws DataAccessObjectInitializationException;
abstract protected List<TableRow> readTableRowListFromDAO(int maxRows) throws DataAccessObjectException;
abstract protected TableRow readTableRowFromDAO() throws DataAccessObjectException;
}
22 changes: 16 additions & 6 deletions src/main/java/com/salesforce/dataloader/dao/DAORowCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
package com.salesforce.dataloader.dao;

import java.util.ArrayList;
import java.util.List;

import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.model.TableRow;

public class DAORowCache {
private ArrayList<TableRow> rowList = new ArrayList<TableRow>();
private int currentRowIndex = 0;
private int cachedRows = 0;

public DAORowCache() {
}
Expand All @@ -44,7 +44,7 @@ public void resetCurrentRowIndex() {

public TableRow getCurrentRow() {
AppConfig appConfig = AppConfig.getCurrentConfig();
if (currentRowIndex >= cachedRows
if (currentRowIndex >= rowList.size()
|| !appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)) {
return null;
}
Expand All @@ -53,14 +53,24 @@ public TableRow getCurrentRow() {

public void addRow(TableRow row) {
// add a row to the cache only if it is not cached already
if (currentRowIndex >= cachedRows) {
if (currentRowIndex >= rowList.size()) {
rowList.add(row);
cachedRows++;
}
currentRowIndex++;
}

public int getCachedRows() {
return cachedRows;
public int size() {
return rowList.size();
}

public List<TableRow> getRows(int startRow, int numRows) {
if (rowList.size() <= startRow) {
return null;
}
if (rowList.size() < startRow + numRows) {
numRows = rowList.size()-startRow;
}
currentRowIndex = startRow + numRows;
return rowList.subList(startRow, startRow + numRows);
}
}
84 changes: 21 additions & 63 deletions src/main/java/com/salesforce/dataloader/dao/csv/CSVFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@

import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.config.Messages;
import com.salesforce.dataloader.dao.DAORowCache;
import com.salesforce.dataloader.dao.DataReader;
import com.salesforce.dataloader.dao.AbstractDataReaderImpl;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.exception.DataAccessRowException;
Expand All @@ -60,21 +59,17 @@
*
* @author Federico Recio
*/
public class CSVFileReader implements DataReader {
public class CSVFileReader extends AbstractDataReaderImpl {

private static final Logger LOGGER = DLLogManager.getLogger(CSVFileReader.class);
private final Object lock = new Object();
private File file;
private FileInputStream input;
private int totalRows;
private CSVReader csvReader;
private int currentRowNumber;
private List<String> headerRow;
private TableHeader tableHeader;
private boolean isOpen;
private char[] csvDelimiters;
private AppConfig appConfig;
private DAORowCache rowCache = new DAORowCache();
private boolean endOfFileReached = false;

// Handles 3 types of CSV files:
Expand All @@ -83,8 +78,8 @@ public class CSVFileReader implements DataReader {
// 3. CSV files that capture successes/failures when performing an upload operation: ignoreDelimiterConfig = true, isQueryOperationResult = <value ignored>
// isQueryOperationsResult value is ignored if ignoreDelimiterConfig is 'true'.
public CSVFileReader(File file, AppConfig appConfig, boolean ignoreDelimiterConfig, boolean isQueryOperationResult) {
super(appConfig);
this.file = file;
this.appConfig = appConfig;
StringBuilder separator = new StringBuilder();
if (ignoreDelimiterConfig) {
separator.append(AppUtil.COMMA);
Expand Down Expand Up @@ -123,18 +118,9 @@ public void checkConnection() throws DataAccessObjectInitializationException {
}

@Override
public void open() throws DataAccessObjectInitializationException {
if (isOpen) {
close();
}
if (!appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
|| rowCache.getCachedRows() == 0) {
initalizeInput(csvDelimiters);
readHeaderRow();
}
currentRowNumber = 0;
rowCache.resetCurrentRowIndex();
isOpen = true;
protected void openDAO() throws DataAccessObjectInitializationException {
initalizeInput(csvDelimiters);
readHeaderRow();
}

/**
Expand All @@ -151,6 +137,14 @@ public void close() {
}
}

protected void setOpenFlag(boolean openFlag) {
this.isOpen = openFlag;
}

protected boolean isOpenFlag() {
return this.isOpen;
}

@Override
public List<Row> readRowList(int maxRows) throws DataAccessObjectException {
List<Row> outputRows = new ArrayList<Row>();
Expand All @@ -168,7 +162,7 @@ public List<Row> readRowList(int maxRows) throws DataAccessObjectException {
}

@Override
public List<TableRow> readTableRowList(int maxRows) throws DataAccessObjectException {
protected List<TableRow> readTableRowListFromDAO(int maxRows) throws DataAccessObjectException {
List<TableRow> outputRows = new ArrayList<TableRow>();
for (int i = 0; i < maxRows; i++) {
TableRow outputRow = readTableRow();
Expand All @@ -188,22 +182,10 @@ public List<TableRow> readTableRowList(int maxRows) throws DataAccessObjectExcep
* Updates the current record number
*/
@Override
public TableRow readTableRow() throws DataAccessObjectException {
if (!isOpen) {
open();
}

TableRow trow = rowCache.getCurrentRow();
if (trow != null) {
currentRowNumber++;
return trow;
}

if (appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
&& endOfFileReached) {
protected TableRow readTableRowFromDAO() throws DataAccessObjectException {
if (endOfFileReached) {
return null;
}

List<String> record;
synchronized (lock) {
try {
Expand All @@ -220,15 +202,15 @@ record = csvReader.nextRecord();

if (record.size() > headerRow.size()) {
String errMsg = Messages.getFormattedString("CSVFileDAO.errorRowTooLarge", new String[]{
String.valueOf(currentRowNumber), String.valueOf(record.size()), String.valueOf(headerRow.size())});
String.valueOf(getCurrentRowNumber()), String.valueOf(record.size()), String.valueOf(headerRow.size())});
throw new DataAccessRowException(errMsg);
} else if (record.size() < headerRow.size()) {
String errMsg = Messages.getFormattedString("CSVFileDAO.errorRowTooSmall", new String[]{
String.valueOf(currentRowNumber), String.valueOf(record.size()), String.valueOf(headerRow.size())});
String.valueOf(getCurrentRowNumber()), String.valueOf(record.size()), String.valueOf(headerRow.size())});
throw new DataAccessRowException(errMsg);
}

trow = new TableRow(this.tableHeader);
TableRow trow = new TableRow(this.tableHeader);

for (int i = 0; i < headerRow.size(); i++) {
String value = record.get(i);
Expand All @@ -237,8 +219,6 @@ record = csvReader.nextRecord();
}
trow.put(headerRow.get(i), value);
}
currentRowNumber++;
rowCache.addRow(trow);
return trow;
}

Expand All @@ -259,28 +239,6 @@ public List<String> getColumnNames() {
return headerRow;
}

/*
* Returns the number of rows in the file. <i>Side effect:</i> Moves the row pointer to the first row
*/
@Override
public int getTotalRows() throws DataAccessObjectException {
if (totalRows == 0) {
if (!isOpen) {
open();
}
totalRows = DAORowUtil.calculateTotalRows(this);
}
return totalRows;
}

/**
* @return Current record number that has been read
*/
@Override
public int getCurrentRowNumber() {
return currentRowNumber;
}

private void readHeaderRow() throws DataAccessObjectInitializationException {
try {
synchronized (lock) {
Expand Down Expand Up @@ -311,7 +269,7 @@ private void initalizeInput(char[] csvDelimiters) throws DataAccessObjectInitial

try {
input = new FileInputStream(file);
String encoding = this.appConfig.getCsvEncoding(false);
String encoding = this.getAppConfig().getCsvEncoding(false);
if (StandardCharsets.UTF_8.name().equals(encoding)
|| StandardCharsets.UTF_16BE.name().equals(encoding)
|| StandardCharsets.UTF_16LE.name().equals(encoding)
Expand Down
Loading

0 comments on commit 0640fa3

Please sign in to comment.