Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce memory footprint of large upload jobs #1386

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.salesforce.dataloader.exception.ParameterLoadException;
import com.salesforce.dataloader.mapping.LoadMapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.DAORowUtil;
import com.sforce.soap.partner.DescribeGlobalSObjectResult;
import com.sforce.soap.partner.DescribeSObjectResult;
Expand Down Expand Up @@ -85,12 +86,12 @@ protected boolean visit() throws DataAccessObjectException, ParameterLoadExcepti

final int loadBatchSize = this.getConfig().getImportBatchSize();
final int daoRowNumBase = getDao().getCurrentRowNumber();
final List<Row> daoRowList = getDao().readRowList(loadBatchSize);
final List<TableRow> daoRowList = getDao().readTableRowList(loadBatchSize);
if (daoRowList == null || daoRowList.size() == 0) return false;
int daoRowCount = 0;

for (final Row daoRow : daoRowList) {
if (!DAORowUtil.isValidRow(daoRow)) {
for (final TableRow daoRow : daoRowList) {
if (!DAORowUtil.isValidTableRow(daoRow)) {
getVisitor().setRowConversionStatus(daoRowNumBase + daoRowCount++,
false);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.*;

import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.DAORowUtil;

import org.apache.commons.beanutils.*;
Expand Down Expand Up @@ -80,7 +81,7 @@ public abstract class DAOLoadVisitor extends AbstractVisitor implements DAORowVi
protected DynaProperty[] dynaProps = null;

private final int batchSize;
protected List<Row> daoRowList = new ArrayList<Row>();
protected List<TableRow> daoRowList = new ArrayList<TableRow>();
protected ArrayList<Integer> batchRowToDAORowList = new ArrayList<Integer>();
private int processedDAORowCounter = 0;
private static final Logger logger = DLLogManager.getLogger(DAOLoadVisitor.class);
Expand Down Expand Up @@ -143,7 +144,7 @@ protected boolean isRowConversionSuccessful() {
}

@Override
public boolean visit(Row row) throws OperationException, DataAccessObjectException,
public boolean visit(TableRow row) throws OperationException, DataAccessObjectException,
ConnectionException {
AppConfig appConfig = controller.getAppConfig();
if (appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
Expand Down Expand Up @@ -221,7 +222,7 @@ public boolean visit(Row row) throws OperationException, DataAccessObjectExcepti
String errMsg = Messages.getMessage("Visitor", "conversionErrorMsg", conve.getMessage());
getLogger().error(errMsg, conve);

conversionFailed(row, errMsg);
conversionFailed(row.convertToRow(), errMsg);
// this row cannot be added since conversion has failed
return false;
} catch (InvocationTargetException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.exception.OperationException;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.sforce.ws.ConnectionException;

/**
Expand All @@ -38,6 +38,6 @@
*/
public interface DAORowVisitor {

public boolean visit(Row row) throws OperationException, DataAccessObjectException, ConnectionException;
public boolean visit(TableRow row) throws OperationException, DataAccessObjectException, ConnectionException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

package com.salesforce.dataloader.action.visitor;

import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;


/**
Expand All @@ -40,7 +40,7 @@ public class DAOSizeVisitor implements DAORowVisitor {
private int numberOfRows = 0;

@Override
public boolean visit(Row row) {
public boolean visit(TableRow row) {
numberOfRows++;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.salesforce.dataloader.model.NADateOnlyCalendarValue;
import com.salesforce.dataloader.model.NATextValue;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.AppUtil;
import com.salesforce.dataloader.util.DAORowUtil;
import com.sforce.async.AsyncApiException;
Expand Down Expand Up @@ -489,14 +490,14 @@ private void processResults(final DataReader dataReader, final BatchInfo batch,
int lastDAORowForCurrentBatch = this.batchRowToDAORowList.get(lastRowInCurrentBatch);

final int totalRowsInDAOInCurrentBatch = lastDAORowForCurrentBatch - this.firstDAORowForCurrentBatch + 1;
List<Row> rows;
List<TableRow> rows;
if (controller.getAppConfig().getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)) {
rows = new ArrayList<Row>();
rows = new ArrayList<TableRow>();
for (int i=0; i<totalRowsInDAOInCurrentBatch; i++) {
rows.add(i, this.daoRowList.get(i + this.firstDAORowForCurrentBatch));
}
} else {
rows = dataReader.readRowList(totalRowsInDAOInCurrentBatch);
rows = dataReader.readTableRowList(totalRowsInDAOInCurrentBatch);
}
if (batch.getState() == BatchStateEnum.Completed || batch.getNumberRecordsProcessed() > 0) {
try {
Expand All @@ -505,16 +506,16 @@ private void processResults(final DataReader dataReader, final BatchInfo batch,
throw new LoadException("IOException while reading batch results", e);
}
} else {
for (final Row row : rows) {
writeError(row, errorMessage);
for (final TableRow row : rows) {
writeError(row.convertToRow(), errorMessage);
}
}
// update to process the next batch
this.firstDAORowForCurrentBatch = lastDAORowForCurrentBatch + 1;
}

private void processBatchResults(final BatchInfo batch, final String errorMessage,
final BatchStateEnum state, final List<Row> rows, final int firstDataReaderRowInBatch) throws DataAccessObjectException, IOException, AsyncApiException {
final BatchStateEnum state, final List<TableRow> rows, final int firstDataReaderRowInBatch) throws DataAccessObjectException, IOException, AsyncApiException {

// get the batch csv result stream from sfdc
final CSVReader resultRdr = this.jobUtil.getBatchResults(batch.getId());
Expand All @@ -531,7 +532,7 @@ private void processBatchResults(final BatchInfo batch, final String errorMessag
final int errIdx = hdrIndices.get(ERROR_RESULT_COL);
hdrIndices = null;

for (final Row row : rows) {
for (final TableRow row : rows) {
boolean conversionSuccessOfRow = isRowConversionSuccessful();
if (!conversionSuccessOfRow) {
continue; // this DAO row failed to convert and was not part of the batch sent to the server. Go to the next one
Expand All @@ -542,16 +543,16 @@ private void processBatchResults(final BatchInfo batch, final String errorMessag
if (state == BatchStateEnum.Failed || errorMessage != null) {
getLogger().warn(
Messages.getMessage(getClass(), "logBatchInfoWithMessage", batch.getId(), state, errorMessage));
writeError(row, errorMessage);
writeError(row.convertToRow(), errorMessage);
} else if (res == null || res.isEmpty()) {
String msg = Messages.getMessage(getClass(), "noResultForRow", row.toString(), batch.getId());
writeError(row, msg);
writeError(row.convertToRow(), msg);
getLogger().warn(msg);
} else {
// convert the row into a RowResults so its easy to inspect
final RowResult rowResult = new RowResult(Boolean.valueOf(res.get(successIdx)), isDelete ? false
: Boolean.valueOf(res.get(createdIdx)), res.get(idIdx), res.get(errIdx));
writeRowResult(row, rowResult);
writeRowResult(row.convertToRow(), rowResult);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void writeOutputToWriter(Object[] results)
// are a) not the same class yet b) not subclassed
int batchRowCounter = 0;
for (int i = 0; i < this.daoRowList.size(); i++) {
Row daoRow = this.daoRowList.get(i);
Row daoRow = this.daoRowList.get(i).convertToRow();
if (!isRowConversionSuccessful()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void writeOutputToWriter(Object[] results)
// are a) not the same class yet b) not subclassed
int batchRowCounter = 0;
for (int i = 0; i < this.daoRowList.size(); i++) {
Row daoRow = this.daoRowList.get(i);
Row daoRow = this.daoRowList.get(i).convertToRow();
if (!isRowConversionSuccessful()) {
continue;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/salesforce/dataloader/dao/DAORowCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.ArrayList;

import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;

public class DAORowCache {
private ArrayList<Row> rowList = new ArrayList<Row>();
private ArrayList<TableRow> rowList = new ArrayList<TableRow>();
private int currentRowIndex = 0;
private int totalRows = 0;

Expand All @@ -42,7 +42,7 @@ public void resetCurrentRowIndex() {
currentRowIndex = 0;
}

public Row getCurrentRow() {
public TableRow getCurrentRow() {
AppConfig appConfig = AppConfig.getCurrentConfig();
if (currentRowIndex >= totalRows
|| !appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)) {
Expand All @@ -51,7 +51,7 @@ public Row getCurrentRow() {
return rowList.get(currentRowIndex++);
}

public void addRow(Row row) {
public void addRow(TableRow row) {
rowList.add(row);
currentRowIndex++;
totalRows++;
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/salesforce/dataloader/dao/DataReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;

/**
* Interface to be implemented for data readers -- data access objects that are used for reading rows of data.
Expand All @@ -47,6 +48,14 @@ public interface DataReader extends DataAccessObject {
*/
Row readRow() throws DataAccessObjectException;

/**
* Get a row of data from a data source
*
* @return a {@link Row} containing all the keys and values of a row
* @throws DataAccessObjectException
*/
TableRow readTableRow() throws DataAccessObjectException;

/**
* Get a list of rows of data from a data source
*
Expand All @@ -56,6 +65,15 @@ public interface DataReader extends DataAccessObject {
*/
List<Row> readRowList(int maxRows) throws DataAccessObjectException;

/**
* Get a list of rows of data from a data source
*
* @param maxRows Maximum number of rows to read in one call
* @return a list of up to maxRows {@link Row} objects, each of them containing all the keys and values of a row
* @throws DataAccessObjectException
*/
List<TableRow> readTableRowList(int maxRows) throws DataAccessObjectException;

/**
* @return Total number of rows that will be read by the current Data Access Object
* @throws DataAccessObjectException
Expand Down
45 changes: 37 additions & 8 deletions src/main/java/com/salesforce/dataloader/dao/csv/CSVFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.salesforce.dataloader.exception.DataAccessObjectInitializationException;
import com.salesforce.dataloader.exception.DataAccessRowException;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableHeader;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.AppUtil;
import com.salesforce.dataloader.util.DAORowUtil;
import com.sforce.async.CSVReader;
Expand All @@ -68,6 +70,7 @@ public class CSVFileReader implements DataReader {
private CSVReader csvReader;
private int currentRowNumber;
private List<String> headerRow;
private TableHeader tableHeader;
private boolean isOpen;
private char[] csvDelimiters;
private AppConfig appConfig;
Expand Down Expand Up @@ -161,21 +164,37 @@ public List<Row> readRowList(int maxRows) throws DataAccessObjectException {
}
return outputRows;
}

@Override
public List<TableRow> readTableRowList(int maxRows) throws DataAccessObjectException {
List<TableRow> outputRows = new ArrayList<TableRow>();
for (int i = 0; i < maxRows; i++) {
TableRow outputRow = readTableRow();
if (outputRow != null) {
// if row has been returned, add it to the output
outputRows.add(outputRow);
} else {
// if encountered null, the reading is over
break;
}
}
return outputRows;
}

/**
* Gets the next row from the current data access object data source. <i>Side effect:</i>
* Updates the current record number
*/
@Override
public Row readRow() throws DataAccessObjectException {
public TableRow readTableRow() throws DataAccessObjectException {
if (!isOpen) {
open();
}

Row row = rowCache.getCurrentRow();
if (row != null) {
TableRow trow = rowCache.getCurrentRow();
if (trow != null) {
currentRowNumber++;
return row;
return trow;
}

if (appConfig.getBoolean(AppConfig.PROP_PROCESS_BULK_CACHE_DATA_FROM_DAO)
Expand Down Expand Up @@ -207,18 +226,27 @@ record = csvReader.nextRecord();
throw new DataAccessRowException(errMsg);
}

row = new Row(record.size());
trow = new TableRow(this.tableHeader);

for (int i = 0; i < headerRow.size(); i++) {
String value = record.get(i);
if (value == null) {
value = "";
}
row.put(headerRow.get(i), value);
trow.put(headerRow.get(i), value);
}
currentRowNumber++;
rowCache.addRow(row);
return row;
rowCache.addRow(trow);
return trow;
}

@Override
public Row readRow() throws DataAccessObjectException {
TableRow tableRow = readTableRow();
if (tableRow == null) {
return null;
}
return tableRow.convertToRow();
}

/**
Expand Down Expand Up @@ -260,6 +288,7 @@ private void readHeaderRow() throws DataAccessObjectInitializationException {
LOGGER.error(Messages.getString("CSVFileDAO.errorHeaderRow"));
throw new DataAccessObjectInitializationException(Messages.getString("CSVFileDAO.errorHeaderRow"));
}
tableHeader = new TableHeader(headerRow);
LOGGER.debug(Messages.getFormattedString(
"CSVFileDAO.debugMessageHeaderRowSize", headerRow.size()));

Expand Down
Loading
Loading