diff --git a/starrockswriter/doc/starrockswriter.md b/starrockswriter/doc/starrockswriter.md
index f471b75212..1080d072cb 100644
--- a/starrockswriter/doc/starrockswriter.md
+++ b/starrockswriter/doc/starrockswriter.md
@@ -173,6 +173,14 @@ StarRocksWriter 插件实现了写入数据到 StarRocks 主库的目的表的
* 默认值:104857600 (100M)
+* **flushInterval**
+
+ * 描述:上一次StreamLoad结束至下一次开始的时间间隔(单位:ms)。
+
+ * 必选:否
+
+ * 默认值:300000 (ms)
+
* **loadProps**
* 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java
index 9e4abd1248..be3fa3ecb8 100644
--- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java
+++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java
@@ -18,6 +18,7 @@ public class StarRocksWriterOptions implements Serializable {
private static final int MAX_RETRIES = 1;
private static final int BATCH_ROWS = 500000;
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
+ private static final long FLUSH_INTERVAL = 300000;
private static final String KEY_LOAD_PROPS_FORMAT = "format";
public enum StreamLoadFormat {
@@ -34,6 +35,7 @@ public enum StreamLoadFormat {
private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
+ private static final String KEY_FLUSH_INTERVAL = "flushInterval";
private static final String KEY_LOAD_URL = "loadUrl";
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String KEY_LOAD_PROPS = "loadProps";
@@ -120,6 +122,11 @@ public long getBatchSize() {
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
return null == size ? BATCH_BYTES : size;
}
+
+ public long getFlushInterval() {
+ Long interval = options.getLong(KEY_FLUSH_INTERVAL);
+ return null == interval ? FLUSH_INTERVAL : interval;
+ }
public int getFlushQueueLength() {
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java
index 0e89005f1d..f32eccdefe 100644
--- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java
+++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/manager/StarRocksWriterManager.java
@@ -1,5 +1,6 @@
package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -8,7 +9,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Strings;
import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions;
@@ -26,14 +31,45 @@ public class StarRocksWriterManager {
private volatile boolean closed = false;
private volatile Exception flushException;
private final LinkedBlockingDeque flushQueue;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture> scheduledFuture;
public StarRocksWriterManager(StarRocksWriterOptions writerOptions) {
this.writerOptions = writerOptions;
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions);
flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength());
+ this.startScheduler();
this.startAsyncFlushing();
}
+ public void startScheduler() {
+ stopScheduler();
+ this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("starrocks-interval-flush").daemon(true).build());
+ this.scheduledFuture = this.scheduler.schedule(() -> {
+ synchronized (StarRocksWriterManager.this) {
+ if (!closed) {
+ try {
+ String label = createBatchLabel();
+ LOG.info(String.format("StarRocks interval Sinking triggered: label[%s].", label));
+ if (batchCount == 0) {
+ startScheduler();
+ }
+ flush(label, false);
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ }, writerOptions.getFlushInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public void stopScheduler() {
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ }
+
public final synchronized void writeRecord(String record) throws IOException {
checkFlushException();
try {
@@ -109,6 +145,7 @@ private void waitAsyncFlushingDone() throws InterruptedException {
for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) {
flushQueue.put(new StarRocksFlushTuple("", 0l, null));
}
+ checkFlushException();
}
private void asyncFlush() throws Exception {
@@ -116,12 +153,14 @@ private void asyncFlush() throws Exception {
if (Strings.isNullOrEmpty(flushData.getLabel())) {
return;
}
+ stopScheduler();
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
for (int i = 0; i <= writerOptions.getMaxRetries(); i++) {
try {
// flush to StarRocks with stream load
starrocksStreamLoadVisitor.doStreamLoad(flushData);
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
+ startScheduler();
break;
} catch (Exception e) {
LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e);