diff --git a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java index 243c185..ba0a256 100644 --- a/src/main/java/com/sforce/dataset/loader/DatasetLoader.java +++ b/src/main/java/com/sforce/dataset/loader/DatasetLoader.java @@ -49,6 +49,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -63,6 +65,9 @@ import org.supercsv.prefs.CsvPreference; + + + //import com.csvreader.CsvReader; import com.sforce.async.AsyncApiException; import com.sforce.async.BatchInfo; @@ -136,7 +141,10 @@ public static boolean uploadDataset(String inputFileString, boolean status = true; long digestTime = 0L; long uploadTime = 0L; - boolean isRecovery = false; + boolean isRecovery = false; + //we only want a small capacity otherwise the reader thread will runaway and the writer thread will become slower + BlockingQueue q = new LinkedBlockingQueue(3); + if(uploadFormat==null||uploadFormat.trim().isEmpty()) uploadFormat = "binary"; @@ -276,8 +284,12 @@ public static boolean uploadDataset(String inputFileString, // System.out.println("CodingErrorAction: "+codingErrorAction); // CsvReader reader = new CsvReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction, inputFileCharset))); - CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE); - + CsvListReader reader = new CsvListReader(new InputStreamReader(new BOMInputStream(new FileInputStream(inputFile), false), DatasetUtils.utf8Decoder(codingErrorAction , inputFileCharset )), CsvPreference.STANDARD_PREFERENCE); + WriterThread writer = new WriterThread(q, w, ew); + Thread th = new Thread(writer); + th.setDaemon(true); + th.start(); + try { // if(reader.readHeaders()) @@ -296,23 +308,26 @@ public static boolean uploadDataset(String inputFileString, { try { -// hasmore = reader.readRecord(); row = reader.read(); - if(row!=null) + if(row!=null && !writer.isDone()) { totalRowCount++; - w.addrow(row.toArray(new String[row.size()])); - successRowCount = w.getNrows(); + if(row.size()!=0 ) + { + q.put(row.toArray(new String[row.size()])); +// w.addrow(row.toArray(new String[row.size()])); +// successRowCount = w.getNrows(); + } }else { hasmore = false; } }catch(Throwable t) { - if(errorRowCount==0) - { - System.err.println(); - } +// if(errorRowCount==0) +// { +// System.err.println(); +// } System.err.println("Row {"+totalRowCount+"} has error {"+t+"}"); if(t instanceof MalformedInputException) { @@ -321,18 +336,31 @@ public static boolean uploadDataset(String inputFileString, System.err.println("*******************************************************************************\n"); status = false; hasmore = false; - }else - { - if(row!=null) - { - ew.addError(row.toArray(new String[row.size()]), t.getMessage()); - errorRowCount++; - } +// }else +// { +// +// System.err.println("\n*******************************************************************************"); +// System.err.println("t"); +// System.err.println("*******************************************************************************\n"); +// status = false; +// hasmore = false; +// if(row!=null) +// { +// ew.addError(row.toArray(new String[row.size()]), t.getMessage()); +// errorRowCount++; +// } } //t.printStackTrace(); } }//end while + while(!writer.isDone()) + { + q.put(new String[0]); + Thread.sleep(1000); + } // } + successRowCount = w.getSuccessRowCount(); + errorRowCount = writer.getErrorRowCount(); }finally { reader.close(); diff --git a/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java b/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java index f5a6d08..440090b 100644 --- a/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java +++ b/src/main/java/com/sforce/dataset/loader/EbinFormatWriter.java @@ -56,8 +56,13 @@ public class EbinFormatWriter { private int numColumns = 0; private OutputStream out; - protected long nrows; int interval = 10; + + + + private volatile int successRowCount = 0; + private volatile int totalRowCount = 0; + public static final NumberFormat nf = NumberFormat.getIntegerInstance(); long startTime = 0L; @@ -95,7 +100,7 @@ public EbinFormatWriter(OutputStream out, FieldType[] dataTypes) protected EbinFormatWriter(OutputStream out) throws IOException { this.out = out; - nrows = 0; + totalRowCount = 0; out.write(magic, 0, 3); out.write(version_high); out.write(version_low); @@ -113,23 +118,23 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P int count = 0; int key_value_count = 0; - nrows++; + totalRowCount++; if (values.length != this.numColumns) { - String message = "Row " + nrows + " contains an invalid number of Values, expected " + + String message = "Row " + totalRowCount + " contains an invalid number of Values, expected " + this.numColumns + " Value(s), got " + values.length + ".\n"; throw new IOException(message); } - if(nrows%interval==0||nrows==1) + if(totalRowCount%interval==0||totalRowCount==1) { long newStartTime = System.currentTimeMillis(); if(startTime==0) startTime = newStartTime; - System.out.println("Processing row {"+nf.format(nrows) +"} time {"+nf.format(newStartTime-startTime)+"}"); + System.out.println("Processing row {"+nf.format(totalRowCount) +"} time {"+nf.format(newStartTime-startTime)+"}"); startTime = newStartTime; } - if(nrows/interval>=10) + if(totalRowCount/interval>=10) { interval = interval*10; } @@ -285,6 +290,7 @@ public void addrow(String[] values) throws IOException,NumberFormatException, P } arr(measure_values); dict(dim_keys, dim_values); + successRowCount++; } @@ -375,9 +381,9 @@ protected void arr(LinkedList measure_values) throws IOException /* public void addrow(String[] keys, String[] values, long[] measure_values) throws IOException { - nrows++; + totalRowCount++; if (measure_values.length != measures.length) { - String message = "Row " + nrows + " contains an invalid number of measures, expected " + + String message = "Row " + totalRowCount + " contains an invalid number of measures, expected " + measures.length + " measure(s), got " + measure_values.length + ".\n"; throw new IOException(message); } @@ -393,7 +399,7 @@ public void finish() throws IOException out.close(); } out = null; - if(nrows==0) + if(totalRowCount==0) { throw new IOException("Atleast one row must be written"); } @@ -432,9 +438,13 @@ public static boolean isValidBin(byte[] startingFiveBytes) return true; } - public long getNrows() { - return nrows; + public int getSuccessRowCount() { + return successRowCount; + } + + + public int getTotalRowCount() { + return totalRowCount; } - } diff --git a/src/main/java/com/sforce/dataset/loader/WriterThread.java b/src/main/java/com/sforce/dataset/loader/WriterThread.java new file mode 100644 index 0000000..ce8b014 --- /dev/null +++ b/src/main/java/com/sforce/dataset/loader/WriterThread.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2014, salesforce.com, inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, are permitted provided + * that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this list of conditions and the + * following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or other materials provided with the distribution. + * + * Neither the name of salesforce.com, inc. nor the names of its contributors may be used to endorse or + * promote products derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED + * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.sforce.dataset.loader; + +import java.util.concurrent.BlockingQueue; + +public class WriterThread implements Runnable { + + private static final int max_error_threshhold = 10000; + + private final BlockingQueue queue; + private final EbinFormatWriter w; + private final ErrorWriter ew; + + private volatile boolean isDone = false; + private volatile int errorRowCount = 0; + private volatile int totalRowCount = 0; + + +WriterThread(BlockingQueue q,EbinFormatWriter w,ErrorWriter ew) + { + if(q==null || w == null || ew == null) + { + throw new IllegalArgumentException("Constrictor input cannot be null"); + } + queue = q; + this.w = w; + this.ew = ew; + } + + public void run() { + try { + + System.out.println("Start: " + Thread.currentThread().getName()); + + String[] row = queue.take(); + while (row != null && row.length!=0) { + try + { + totalRowCount++; + w.addrow(row); + }catch(Throwable t) + { + if(errorRowCount==0) + { + System.err.println(); + } + System.err.println("Row {"+totalRowCount+"} has error {"+t+"}"); + if(row!=null) + { + ew.addError(row, t.getMessage()); + errorRowCount++; + if(errorRowCount>=max_error_threshhold) + { + System.err.println("Max error threshold reached. Aborting processing"); + break; + } + } + //t.printStackTrace(); + } + row = queue.take(); + } + }catch (Throwable t) { + System.out.println (Thread.currentThread().getName() + " " + t.getMessage()); + } + isDone = true; + System.out.println("END: " + Thread.currentThread().getName()); + } + +public boolean isDone() { + return isDone; +} + +public void setDone(boolean isDone) { + this.isDone = isDone; +} + +public int getErrorRowCount() { + return errorRowCount; +} + +public void setErrorRowCount(int errorRowCount) { + this.errorRowCount = errorRowCount; +} + + public int getTotalRowCount() { + return totalRowCount; +} + +public void setTotalRowCount(int totalRowCount) { + this.totalRowCount = totalRowCount; +} + + +} \ No newline at end of file