Skip to content

Commit

Permalink
created separate thread for writer to speed up processing
Browse files Browse the repository at this point in the history
  • Loading branch information
datasetutil committed Nov 21, 2014
1 parent 4d98b7c commit 2161683
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 31 deletions.
64 changes: 46 additions & 18 deletions src/main/java/com/sforce/dataset/loader/DatasetLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
Expand All @@ -63,6 +65,9 @@
import org.supercsv.prefs.CsvPreference;





//import com.csvreader.CsvReader;
import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
Expand Down Expand Up @@ -136,7 +141,10 @@ public static boolean uploadDataset(String inputFileString,
boolean status = true;
long digestTime = 0L;
long uploadTime = 0L;
boolean isRecovery = false;
boolean isRecovery = false;
//we only want a small capacity otherwise the reader thread will runaway and the writer thread will become slower
BlockingQueue<String[]> q = new LinkedBlockingQueue<String[]>(3);


if(uploadFormat==null||uploadFormat.trim().isEmpty())
uploadFormat = "binary";
Expand Down Expand Up @@ -276,8 +284,12 @@ public static boolean uploadDataset(String inputFileString,
// System.out.println("CodingErrorAction: "+codingErrorAction);

// CsvReader reader = new CsvReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction, inputFileCharset)));
CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE);

CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE);
WriterThread writer = new WriterThread(q, w, ew);
Thread th = new Thread(writer);
th.setDaemon(true);
th.start();

try
{
// if(reader.readHeaders())
Expand All @@ -296,23 +308,26 @@ public static boolean uploadDataset(String inputFileString,
{
try
{
// hasmore = reader.readRecord();
row = reader.read();
if(row!=null)
if(row!=null && !writer.isDone())
{
totalRowCount++;
w.addrow(row.toArray(new String[row.size()]));
successRowCount = w.getNrows();
if(row.size()!=0 )
{
q.put(row.toArray(new String[row.size()]));
// w.addrow(row.toArray(new String[row.size()]));
// successRowCount = w.getNrows();
}
}else
{
hasmore = false;
}
}catch(Throwable t)
{
if(errorRowCount==0)
{
System.err.println();
}
// if(errorRowCount==0)
// {
// System.err.println();
// }
System.err.println("Row {"+totalRowCount+"} has error {"+t+"}");
if(t instanceof MalformedInputException)
{
Expand All @@ -321,18 +336,31 @@ public static boolean uploadDataset(String inputFileString,
System.err.println("*******************************************************************************\n");
status = false;
hasmore = false;
}else
{
if(row!=null)
{
ew.addError(row.toArray(new String[row.size()]), t.getMessage());
errorRowCount++;
}
// }else
// {
//
// System.err.println("\n*******************************************************************************");
// System.err.println("t");
// System.err.println("*******************************************************************************\n");
// status = false;
// hasmore = false;
// if(row!=null)
// {
// ew.addError(row.toArray(new String[row.size()]), t.getMessage());
// errorRowCount++;
// }
}
//t.printStackTrace();
}
}//end while
while(!writer.isDone())
{
q.put(new String[0]);
Thread.sleep(1000);
}
// }
successRowCount = w.getSuccessRowCount();
errorRowCount = writer.getErrorRowCount();
}finally
{
reader.close();
Expand Down
36 changes: 23 additions & 13 deletions src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ public class EbinFormatWriter {

private int numColumns = 0;
private OutputStream out;
protected long nrows;
int interval = 10;



private volatile int successRowCount = 0;
private volatile int totalRowCount = 0;


public static final NumberFormat nf = NumberFormat.getIntegerInstance();
long startTime = 0L;
Expand Down Expand Up @@ -95,7 +100,7 @@ public EbinFormatWriter(OutputStream out, FieldType[] dataTypes)
protected EbinFormatWriter(OutputStream out) throws IOException
{
this.out = out;
nrows = 0;
totalRowCount = 0;
out.write(magic, 0, 3);
out.write(version_high);
out.write(version_low);
Expand All @@ -113,23 +118,23 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P
int count = 0;
int key_value_count = 0;

nrows++;
totalRowCount++;
if (values.length != this.numColumns) {
String message = "Row " + nrows + " contains an invalid number of Values, expected " +
String message = "Row " + totalRowCount + " contains an invalid number of Values, expected " +
this.numColumns + " Value(s), got " + values.length + ".\n";
throw new IOException(message);
}

if(nrows%interval==0||nrows==1)
if(totalRowCount%interval==0||totalRowCount==1)
{
long newStartTime = System.currentTimeMillis();
if(startTime==0)
startTime = newStartTime;
System.out.println("Processing row {"+nf.format(nrows) +"} time {"+nf.format(newStartTime-startTime)+"}");
System.out.println("Processing row {"+nf.format(totalRowCount) +"} time {"+nf.format(newStartTime-startTime)+"}");
startTime = newStartTime;
}

if(nrows/interval>=10)
if(totalRowCount/interval>=10)
{
interval = interval*10;
}
Expand Down Expand Up @@ -285,6 +290,7 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P
}
arr(measure_values);
dict(dim_keys, dim_values);
successRowCount++;
}


Expand Down Expand Up @@ -375,9 +381,9 @@ protected void arr(LinkedList<Long> measure_values) throws IOException
/*
public void addrow(String[] keys, String[] values, long[] measure_values) throws IOException
{
nrows++;
totalRowCount++;
if (measure_values.length != measures.length) {
String message = "Row " + nrows + " contains an invalid number of measures, expected " +
String message = "Row " + totalRowCount + " contains an invalid number of measures, expected " +
measures.length + " measure(s), got " + measure_values.length + ".\n";
throw new IOException(message);
}
Expand All @@ -393,7 +399,7 @@ public void finish() throws IOException
out.close();
}
out = null;
if(nrows==0)
if(totalRowCount==0)
{
throw new IOException("Atleast one row must be written");
}
Expand Down Expand Up @@ -432,9 +438,13 @@ public static boolean isValidBin(byte[] startingFiveBytes)
return true;
}

public long getNrows() {
return nrows;
public int getSuccessRowCount() {
return successRowCount;
}


public int getTotalRowCount() {
return totalRowCount;
}


}
118 changes: 118 additions & 0 deletions src/main/java/com/sforce/dataset/loader/WriterThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2014, salesforce.com, inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions and the
* following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
* the following disclaimer in the documentation and/or other materials provided with the distribution.
*
* Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.sforce.dataset.loader;

import java.util.concurrent.BlockingQueue;

public class WriterThread implements Runnable {

private static final int max_error_threshhold = 10000;

private final BlockingQueue<String[]> queue;
private final EbinFormatWriter w;
private final ErrorWriter ew;

private volatile boolean isDone = false;
private volatile int errorRowCount = 0;
private volatile int totalRowCount = 0;


WriterThread(BlockingQueue<String[]> q,EbinFormatWriter w,ErrorWriter ew)
{
if(q==null || w == null || ew == null)
{
throw new IllegalArgumentException("Constrictor input cannot be null");
}
queue = q;
this.w = w;
this.ew = ew;
}

public void run() {
try {

System.out.println("Start: " + Thread.currentThread().getName());

String[] row = queue.take();
while (row != null && row.length!=0) {
try
{
totalRowCount++;
w.addrow(row);
}catch(Throwable t)
{
if(errorRowCount==0)
{
System.err.println();
}
System.err.println("Row {"+totalRowCount+"} has error {"+t+"}");
if(row!=null)
{
ew.addError(row, t.getMessage());
errorRowCount++;
if(errorRowCount>=max_error_threshhold)
{
System.err.println("Max error threshold reached. Aborting processing");
break;
}
}
//t.printStackTrace();
}
row = queue.take();
}
}catch (Throwable t) {
System.out.println (Thread.currentThread().getName() + " " + t.getMessage());
}
isDone = true;
System.out.println("END: " + Thread.currentThread().getName());
}

public boolean isDone() {
return isDone;
}

public void setDone(boolean isDone) {
this.isDone = isDone;
}

public int getErrorRowCount() {
return errorRowCount;
}

public void setErrorRowCount(int errorRowCount) {
this.errorRowCount = errorRowCount;
}

public int getTotalRowCount() {
return totalRowCount;
}

public void setTotalRowCount(int totalRowCount) {
this.totalRowCount = totalRowCount;
}


}

0 comments on commit 2161683

Please sign in to comment.