Skip to content

Commit

Permalink
add interval sinking
Browse files Browse the repository at this point in the history
  • Loading branch information
hffariel committed Sep 24, 2021
1 parent fe43e20 commit cbc93b9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
8 changes: 8 additions & 0 deletions starrockswriter/doc/starrockswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ StarRocksWriter 插件实现了写入数据到 StarRocks 主库的目的表的

* 默认值:104857600 (100M)

* **flushInterval**

* 描述:上一次StreamLoad结束至下一次开始的时间间隔(单位:ms)。 <br />

* 必选:否 <br />

* 默认值:300000 (ms)

* **loadProps**

* 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。 <br />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class StarRocksWriterOptions implements Serializable {
private static final int MAX_RETRIES = 1;
private static final int BATCH_ROWS = 500000;
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
private static final long FLUSH_INTERVAL = 300000;

private static final String KEY_LOAD_PROPS_FORMAT = "format";
public enum StreamLoadFormat {
Expand All @@ -34,6 +35,7 @@ public enum StreamLoadFormat {
private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
private static final String KEY_FLUSH_INTERVAL = "flushInterval";
private static final String KEY_LOAD_URL = "loadUrl";
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String KEY_LOAD_PROPS = "loadProps";
Expand Down Expand Up @@ -120,6 +122,11 @@ public long getBatchSize() {
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
return null == size ? BATCH_BYTES : size;
}

public long getFlushInterval() {
Long interval = options.getLong(KEY_FLUSH_INTERVAL);
return null == interval ? FLUSH_INTERVAL : interval;
}

public int getFlushQueueLength() {
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -8,7 +9,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Strings;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
Expand All @@ -26,14 +31,45 @@ public class StarRocksWriterManager {
private volatile boolean closed = false;
private volatile Exception flushException;
private final LinkedBlockingDeque<StarRocksFlushTuple> flushQueue;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;

public StarRocksWriterManager(StarRocksWriterOptions writerOptions) {
this.writerOptions = writerOptions;
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions);
flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
}

public void startScheduler() {
stopScheduler();
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("starrocks-interval-flush").daemon(true).build());
this.scheduledFuture = this.scheduler.schedule(() -> {
synchronized (StarRocksWriterManager.this) {
if (!closed) {
try {
String label = createBatchLabel();
LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", label));
if (batchCount == 0) {
startScheduler();
}
flush(label, false);
} catch (Exception e) {
flushException = e;
}
}
}
}, writerOptions.getFlushInterval(), TimeUnit.MILLISECONDS);
}

public void stopScheduler() {
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}

public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
Expand Down Expand Up @@ -109,19 +145,22 @@ private void waitAsyncFlushingDone() throws InterruptedException {
for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
flushQueue.put(new StarRocksFlushTuple("", 0l, null));
}
checkFlushException();
}

private void asyncFlush() throws Exception {
StarRocksFlushTuple flushData = flushQueue.take();
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
stopScheduler();
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
try {
// flush to StarRocks with stream load
starrocksStreamLoadVisitor.doStreamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler();
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e);
Expand Down

0 comments on commit cbc93b9

Please sign in to comment.