Skip to content

Commit

Permalink
reduce the getBytes operations
Browse files Browse the repository at this point in the history
  • Loading branch information
hffariel committed Sep 23, 2021
1 parent e7d8791 commit fe43e20
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ public class StarRocksFlushTuple {

private String label;
private Long bytes;
private List<String> rows;
private List<byte[]> rows;

public StarRocksFlushTuple(String label, Long bytes, List<String> rows) {
public StarRocksFlushTuple(String label, Long bytes, List<byte[]> rows) {
this.label = label;
this.bytes = bytes;
this.rows = rows;
}

public String getLabel() { return label; }
public Long getBytes() { return bytes; }
public List<String> getRows() { return rows; }
public List<byte[]> getRows() { return rows; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ private boolean tryHttpConnection(String host) {
}
}

private byte[] joinRows(List<String> rows, int totalBytes) {
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
byte[] lineDelimiter = StarRocksDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (String row : rows) {
bos.put(row.getBytes(StandardCharsets.UTF_8));
for (byte[] row : rows) {
bos.put(row);
bos.put(lineDelimiter);
}
return bos.array();
Expand All @@ -110,11 +110,11 @@ private byte[] joinRows(List<String> rows, int totalBytes) {
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
boolean isFirstElement = true;
for (String row : rows) {
for (byte[] row : rows) {
if (!isFirstElement) {
bos.put(jsonDelimiter);
}
bos.put(row.getBytes(StandardCharsets.UTF_8));
bos.put(row);
isFirstElement = false;
}
bos.put("]".getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class StarRocksWriterManager {
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private final StarRocksWriterOptions writerOptions;

private final List<String> buffer = new ArrayList<>();
private final List<byte[]> buffer = new ArrayList<>();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
Expand All @@ -37,9 +37,10 @@ public StarRocksWriterManager(StarRocksWriterOptions writerOptions) {
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
buffer.add(record);
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
buffer.add(bts);
batchCount++;
batchSize += record.getBytes(StandardCharsets.UTF_8).length;
batchSize += bts.length;
if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) {
String label = createBatchLabel();
LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
Expand Down

0 comments on commit fe43e20

Please sign in to comment.