Skip to content

Commit

Permalink
reduce memory use by using TableRow instead of Row
Browse files Browse the repository at this point in the history
More extensively use TableRow by replacing Row usage. This leads to about 100% reduction in memory use for large loads.
  • Loading branch information
ashitsalesforce committed Dec 2, 2024
1 parent d2414e1 commit 2743590
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import com.salesforce.dataloader.action.progress.ILoaderProgress;
import com.salesforce.dataloader.action.visitor.DAOLoadVisitor;
import com.salesforce.dataloader.client.PartnerClient;
import com.salesforce.dataloader.config.AppConfig;
import com.salesforce.dataloader.controller.Controller;
import com.salesforce.dataloader.dao.DataAccessObject;
Expand All @@ -40,17 +39,11 @@
import com.salesforce.dataloader.exception.OperationException;
import com.salesforce.dataloader.exception.ParameterLoadException;
import com.salesforce.dataloader.mapping.LoadMapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.DAORowUtil;
import com.sforce.soap.partner.DescribeGlobalSObjectResult;
import com.sforce.soap.partner.DescribeSObjectResult;
import com.sforce.soap.partner.Field;
import com.sforce.soap.partner.FieldType;
import com.sforce.ws.ConnectionException;

import java.util.List;
import java.util.Map;

/**
* @author Lexi Viripaeff
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.salesforce.dataloader.exception.ParameterLoadException;
import com.salesforce.dataloader.mapping.SOQLMapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableHeader;
import com.sforce.async.AsyncApiException;
import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.ws.ConnectionException;
Expand Down Expand Up @@ -247,14 +248,35 @@ private void writeBatch() throws DataAccessObjectException {
private void writeSuccesses() throws DataAccessObjectException {
final String msg = Messages.getMessage(getClass(), "statusItemQueried");
final Iterator<String> ids = this.batchIds.iterator();
if (this.batchRows == null || this.batchRows.isEmpty()) {
return;
}
ArrayList<String> headerColumnList = new ArrayList<String>();
headerColumnList.add(AppConfig.ID_COLUMN_NAME);
Row firstRow = this.batchRows.get(0);
for (String fieldName : firstRow.keySet()) {
headerColumnList.add(fieldName);
}
headerColumnList.add(AppConfig.STATUS_COLUMN_NAME);
TableHeader header = new TableHeader(headerColumnList);
for (final Row row : this.batchRows) {
writeSuccess(row, ids.next(), msg);
writeSuccess(row.convertToTableRow(header), ids.next(), msg);
}
}

private void writeErrors(String errorMessage) throws DataAccessObjectException {
if (this.batchRows == null || this.batchRows.isEmpty()) {
return;
}
ArrayList<String> headerColumnList = new ArrayList<String>();
Row firstRow = this.batchRows.get(0);
for (String fieldName : firstRow.keySet()) {
headerColumnList.add(fieldName);
}
headerColumnList.add(AppConfig.ERROR_COLUMN_NAME);
TableHeader header = new TableHeader(headerColumnList);
for (final Row row : this.batchRows) {
writeError(row, errorMessage);
writeError(row.convertToTableRow(header), errorMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.salesforce.dataloader.dao.DataWriter;
import com.salesforce.dataloader.exception.DataAccessObjectException;
import com.salesforce.dataloader.mapping.Mapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.LoadRateCalculator;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -131,27 +131,27 @@ protected DataWriter getSuccessWriter() {
return this.successWriter;
}

protected void writeSuccess(Row row, String id, String message) throws DataAccessObjectException {
protected void writeSuccess(TableRow row, String id, String message) throws DataAccessObjectException {
if (writeStatus()) {
if (id != null && id.length() > 0) {
row.put(AppConfig.ID_COLUMN_NAME, id);
}
if (message != null && message.length() > 0) {
row.put(AppConfig.STATUS_COLUMN_NAME, message);
}
this.successWriter.writeRow(row);
this.successWriter.writeTableRow(row);
}
addSuccess();
}

protected void writeError(Row row, String errorMessage) throws DataAccessObjectException {
protected void writeError(TableRow row, String errorMessage) throws DataAccessObjectException {
if (writeStatus()) {
if (row == null) {
row = Row.singleEntryImmutableRow(AppConfig.ERROR_COLUMN_NAME, errorMessage);
row = TableRow.singleEntryImmutableRow(AppConfig.ERROR_COLUMN_NAME, errorMessage);
} else {
row.put(AppConfig.ERROR_COLUMN_NAME, errorMessage);
}
this.errorWriter.writeRow(row);
this.errorWriter.writeTableRow(row);
}
addErrors();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
this.daoRowList.add(row);
}
// the result are sforce fields mapped to data
Row sforceDataRow = getMapper().mapData(row);

TableRow sforceDataRow = getMapper().mapData(row, processedDAORowCounter == 0);
if (this.getConfig().getBoolean(AppConfig.PROP_TRUNCATE_FIELDS)
&& this.getConfig().isRESTAPIEnabled()
&& "update".equalsIgnoreCase(this.getConfig().getString(AppConfig.PROP_OPERATION))) {
Expand All @@ -163,13 +162,13 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
cachedFieldAttributesForOperation = partnerClient.getSObjectFieldAttributesForRow(
this.getConfig().getString(AppConfig.PROP_ENTITY), sforceDataRow);
}
for (Map.Entry<String, Object> field : sforceDataRow.entrySet()) {
for (String fieldName : sforceDataRow.getHeader().getColumns()) {
for (Field fieldDescribe : cachedFieldAttributesForOperation) {
// Field truncation is applicable to certain field types only.
// See https://developer.salesforce.com/docs/atlas.en-us.api_tooling.meta/api_tooling/sforce_api_header_allowfieldtruncation.htm
// for the list of field types that field truncation is applicable to.
FieldType type = fieldDescribe.getType();
if (fieldDescribe.getName().equalsIgnoreCase(field.getKey())
if (fieldDescribe.getName().equalsIgnoreCase(fieldName)
&& (type == FieldType.email
|| type == FieldType.string
|| type == FieldType.picklist
Expand All @@ -178,17 +177,17 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
|| type == FieldType.multipicklist)
) {
int fieldLength = fieldDescribe.getLength();
if (field.getValue().toString().length() > fieldLength) {
if (row.get(fieldName).toString().length() > fieldLength) {
if (type == FieldType.email) {
String[] emailParts = field.getValue().toString().split("@");
String[] emailParts = row.get(fieldName).toString().split("@");
if (emailParts.length == 2) {
String firstPart = emailParts[0].substring(0,
fieldLength - emailParts[1].length() - 1);
field.setValue(firstPart + "@" + emailParts[1]);
row.put(fieldName, firstPart + "@" + emailParts[1]);
continue;
}
}
field.setValue(field.getValue().toString().substring(0, fieldLength));
row.put(fieldName, row.get(fieldName).toString().substring(0, fieldLength));
}
}
}
Expand All @@ -206,7 +205,7 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
}
try {
convertBulkAPINulls(sforceDataRow);
DynaBean dynaBean = SforceDynaBean.convertToDynaBean(dynaClass, sforceDataRow);
DynaBean dynaBean = SforceDynaBean.convertToDynaBean(dynaClass, sforceDataRow.convertToRow());
Map<String, String> fieldMap = BeanUtils.describe(dynaBean);
for (String fName : fieldMap.keySet()) {
if (fieldMap.get(fName) != null) {
Expand All @@ -222,7 +221,7 @@ public boolean visit(TableRow row) throws OperationException, DataAccessObjectEx
String errMsg = Messages.getMessage("Visitor", "conversionErrorMsg", conve.getMessage());
getLogger().error(errMsg, conve);

conversionFailed(row.convertToRow(), errMsg);
conversionFailed(row, errMsg);
// this row cannot be added since conversion has failed
return false;
} catch (InvocationTargetException e) {
Expand Down Expand Up @@ -252,12 +251,12 @@ protected boolean maxBatchBytesReached(List<DynaBean> dynaArray) {
* @throws DataAccessObjectException
* @throws OperationException
*/
protected void conversionFailed(Row row, String errMsg) throws DataAccessObjectException,
protected void conversionFailed(TableRow row, String errMsg) throws DataAccessObjectException,
OperationException {
writeError(row, errMsg);
}

protected void convertBulkAPINulls(Row row) {}
protected void convertBulkAPINulls(TableRow row) {}

public void flushRemaining() throws OperationException, DataAccessObjectException {
// check if there are any entities left
Expand Down Expand Up @@ -441,7 +440,7 @@ private Object getPhoneFieldValue(String fieldName, Object fieldValue) {
}


protected void processResult(Row dataRow, boolean isSuccess, String id, Error[] errors)
protected void processResult(TableRow dataRow, boolean isSuccess, String id, Error[] errors)
throws DataAccessObjectException {
// process success vs. error
// extract error message from error result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private void processResults(final DataReader dataReader, final BatchInfo batch,
}
} else {
for (final TableRow row : rows) {
writeError(row.convertToRow(), errorMessage);
writeError(row, errorMessage);
}
}
// update to process the next batch
Expand Down Expand Up @@ -540,16 +540,16 @@ private void processBatchResults(final BatchInfo batch, final String errorMessag
if (state == BatchStateEnum.Failed || errorMessage != null) {
getLogger().warn(
Messages.getMessage(getClass(), "logBatchInfoWithMessage", batch.getId(), state, errorMessage));
writeError(row.convertToRow(), errorMessage);
writeError(row, errorMessage);
} else if (res == null || res.isEmpty()) {
String msg = Messages.getMessage(getClass(), "noResultForRow", row.toString(), batch.getId());
writeError(row.convertToRow(), msg);
writeError(row, msg);
getLogger().warn(msg);
} else {
// convert the row into a RowResults so its easy to inspect
final RowResult rowResult = new RowResult(Boolean.valueOf(res.get(successIdx)), isDelete ? false
: Boolean.valueOf(res.get(createdIdx)), res.get(idIdx), res.get(errIdx));
writeRowResult(row.convertToRow(), rowResult);
writeRowResult(row, rowResult);
}
}
}
Expand All @@ -573,7 +573,7 @@ private DataReader resetDAO() throws DataAccessObjectInitializationException, Lo
return dataReader;
}

private void writeRowResult(Row row, RowResult resultRow) throws DataAccessObjectException {
private void writeRowResult(TableRow row, RowResult resultRow) throws DataAccessObjectException {
if (resultRow.success) {
String successMessage;
switch (getConfig().getOperationInfo()) {
Expand Down Expand Up @@ -632,16 +632,16 @@ private String parseAsyncApiError(final String errString) {
}

@Override
protected void convertBulkAPINulls(Row row) {
for (final Map.Entry<String, Object> entry : row.entrySet()) {
if (NATextValue.isNA(entry.getValue())) {
entry.setValue(NATextValue.getInstance());
protected void convertBulkAPINulls(TableRow row) {
for (String columnName : row.getHeader().getColumns()) {
if (NATextValue.isNA(row.get(columnName))) {
row.put(columnName, NATextValue.getInstance());
}
}
}

@Override
protected void conversionFailed(Row row, String errMsg) throws DataAccessObjectException,
protected void conversionFailed(TableRow row, String errMsg) throws DataAccessObjectException,
OperationException {
super.conversionFailed(row, errMsg);
getLogger().warn("Skipping results for row " + row + " which failed before upload to Saleforce.com");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

import java.util.List;

import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;

import org.apache.commons.beanutils.DynaBean;

import com.salesforce.dataloader.action.OperationInfo;
Expand Down Expand Up @@ -115,7 +116,7 @@ private void writeOutputToWriter(Object[] results)
// are a) not the same class yet b) not subclassed
int batchRowCounter = 0;
for (int i = 0; i < this.daoRowList.size(); i++) {
Row daoRow = this.daoRowList.get(i).convertToRow();
TableRow daoRow = this.daoRowList.get(i);
if (!isRowConversionSuccessful()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.salesforce.dataloader.exception.LoadException;
import com.salesforce.dataloader.exception.OperationException;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.ws.ConnectionException;
Expand Down Expand Up @@ -82,7 +83,7 @@ private void writeOutputToWriter(Object[] results)
// are a) not the same class yet b) not subclassed
int batchRowCounter = 0;
for (int i = 0; i < this.daoRowList.size(); i++) {
Row daoRow = this.daoRowList.get(i).convertToRow();
TableRow daoRow = this.daoRowList.get(i);
if (!isRowConversionSuccessful()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import com.salesforce.dataloader.exception.PasswordExpiredException;
import com.salesforce.dataloader.exception.RelationshipFormatException;
import com.salesforce.dataloader.mapping.LoadMapper;
import com.salesforce.dataloader.model.Row;
import com.salesforce.dataloader.model.TableRow;
import com.salesforce.dataloader.util.AppUtil;
import com.sforce.soap.partner.Connector;
import com.sforce.soap.partner.DeleteResult;
Expand Down Expand Up @@ -942,13 +942,13 @@ public DescribeSObjectResult describeSObject(String entity) throws ConnectionExc
return result;
}

public Field[] getSObjectFieldAttributesForRow(String sObjectName, Row dataRow) throws ConnectionException {
public Field[] getSObjectFieldAttributesForRow(String sObjectName, TableRow dataRow) throws ConnectionException {
ArrayList<Field> attributesForRow = new ArrayList<Field>();
DescribeSObjectResult entityDescribe = describeSObject(sObjectName);
for (Map.Entry<String, Object> dataRowField : dataRow.entrySet()) {
for (String headerColumnName : dataRow.getHeader().getColumns()) {
Field[] fieldAttributesArray = entityDescribe.getFields();
for (Field fieldAttributes : fieldAttributesArray) {
if (fieldAttributes.getName().equalsIgnoreCase(dataRowField.getKey())) {
if (fieldAttributes.getName().equalsIgnoreCase(headerColumnName)) {
attributesForRow.add(fieldAttributes);
}
}
Expand Down
Loading

0 comments on commit 2743590

Please sign in to comment.