diff --git a/build.gradle b/build.gradle index e77cb7a7c..e29baf00e 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,7 @@ repositories { } dependencies { - api 'com.redislabs:picocli-redis:1.14.1' + api 'com.redislabs:picocli-redis:1.14.2' implementation 'org.slf4j:slf4j-jdk14:1.7.28' implementation 'org.springframework.batch:spring-batch-core:4.2.1.RELEASE' implementation 'org.springframework.boot:spring-boot-autoconfigure:2.2.5.RELEASE' @@ -148,8 +148,7 @@ asciidoctor { outputOptions { separateOutputDirs = false } - attributes \ - 'commandsdir': 'src/test/resources/commands' + attributes 'commandsdir': 'src/test/resources/commands' } task printVersion { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index f3d88b1c2..490fda857 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a2bf1313b..a4b442974 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/com/redislabs/riot/cli/GeneratorCommand.java b/src/main/java/com/redislabs/riot/cli/GeneratorCommand.java index 963826adb..423b75908 100644 --- a/src/main/java/com/redislabs/riot/cli/GeneratorCommand.java +++ b/src/main/java/com/redislabs/riot/cli/GeneratorCommand.java @@ -81,6 +81,11 @@ private Map fakerFields() { return fields; } + @Override + protected String getMainFlowName() { + return "generator"; + } + @Override protected String taskName() { return "Generating"; diff --git a/src/main/java/com/redislabs/riot/cli/MapExportCommand.java b/src/main/java/com/redislabs/riot/cli/MapExportCommand.java index e823c5726..00c833bac 100644 --- a/src/main/java/com/redislabs/riot/cli/MapExportCommand.java +++ b/src/main/java/com/redislabs/riot/cli/MapExportCommand.java @@ -55,6 +55,11 @@ protected ItemWriter> writer() throws Exception { return file.writer(); } + @Override + protected String getMainFlowName() { + return "export"; + } + @Override protected String taskName() { return "Exporting"; diff --git a/src/main/java/com/redislabs/riot/cli/MapImportCommand.java b/src/main/java/com/redislabs/riot/cli/MapImportCommand.java index 3dd02ddf0..ab3b77496 100644 --- a/src/main/java/com/redislabs/riot/cli/MapImportCommand.java +++ b/src/main/java/com/redislabs/riot/cli/MapImportCommand.java @@ -45,7 +45,11 @@ protected ItemProcessor, Map> processor() th return composite; } return processor; + } + @Override + protected String getMainFlowName() { + return "import"; } @Override diff --git a/src/main/java/com/redislabs/riot/cli/NoopProgressReporter.java b/src/main/java/com/redislabs/riot/cli/NoopProgressReporter.java deleted file mode 100644 index a0c4c6178..000000000 --- a/src/main/java/com/redislabs/riot/cli/NoopProgressReporter.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.redislabs.riot.cli; - -import com.redislabs.riot.transfer.Metrics; - -public class NoopProgressReporter implements ProgressReporter { - - @Override - public void start() { - } - - @Override - public void onUpdate(Metrics update) { - } - - @Override - public void stop() { - } - -} diff --git a/src/main/java/com/redislabs/riot/cli/ProgressBarReporter.java b/src/main/java/com/redislabs/riot/cli/ProgressBarReporter.java index 0437847a9..6746d035a 100644 --- a/src/main/java/com/redislabs/riot/cli/ProgressBarReporter.java +++ b/src/main/java/com/redislabs/riot/cli/ProgressBarReporter.java @@ -1,46 +1,74 @@ package com.redislabs.riot.cli; import com.redislabs.riot.transfer.Metrics; - +import com.redislabs.riot.transfer.MetricsProvider; +import com.redislabs.riot.transfer.Transfer; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; import me.tongfei.progressbar.ProgressBar; import me.tongfei.progressbar.ProgressBarBuilder; -public class ProgressBarReporter implements ProgressReporter { - - private ProgressBarBuilder builder = new ProgressBarBuilder(); - private ProgressBar bar; - - public ProgressBarReporter initialMax(long initialMax) { - builder.setInitialMax(initialMax); - return this; - } +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; - public ProgressBarReporter taskName(String taskName) { - builder.setTaskName(taskName); - return this; - } +@Slf4j +public class ProgressBarReporter implements Runnable, Transfer.Listener { - public ProgressBarReporter unitName(String unitName) { - builder.setUnit(" " + unitName + "s", 1); - return this; - } + private ScheduledExecutorService executor; + private MetricsProvider metricsProvider; + private Long period = null; + private TimeUnit timeUnit; + private ScheduledFuture scheduledFuture; + private ProgressBarBuilder builder = new ProgressBarBuilder(); + private ProgressBar bar; - @Override - public void start() { - this.bar = builder.build(); - } + @Builder + private ProgressBarReporter(Long initialMax, String taskName, String unitName, MetricsProvider metricsProvider, Long period, TimeUnit timeUnit) { + if (initialMax != null) { + builder.setInitialMax(initialMax); + } + builder.setTaskName(taskName); + if (unitName != null) { + builder.setUnit(" " + unitName + "s", 1); + } + if (metricsProvider != null && period != null && timeUnit != null) { + this.metricsProvider = metricsProvider; + this.period = period; + this.timeUnit = timeUnit; + this.executor = Executors.newSingleThreadScheduledExecutor(); + this.scheduledFuture = executor.scheduleAtFixedRate(this, 0, period, timeUnit); + } + } - @Override - public void onUpdate(Metrics update) { - bar.stepTo(update.getWrites()); - if (update.getRunningThreads() > 1) { - bar.setExtraMessage(" (" + update.getRunningThreads() + " threads)"); - } - } + @Override + public void onOpen() { + this.bar = builder.build(); + } - @Override - public void stop() { - this.bar.close(); - } + @Override + public void onClose() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + if (executor != null) { + executor.shutdown(); + } + run(); + this.bar.close(); + } + @Override + public void run() { + if (bar == null) { + return; + } + Metrics metrics = metricsProvider.getMetrics(); + bar.stepTo(metrics.getWrites()); + int runningThreads = metrics.getRunningThreads(); + if (runningThreads > 1) { + bar.setExtraMessage(" (" + runningThreads + " threads)"); + } + } } diff --git a/src/main/java/com/redislabs/riot/cli/ProgressReporter.java b/src/main/java/com/redislabs/riot/cli/ProgressReporter.java deleted file mode 100644 index 41e3ed071..000000000 --- a/src/main/java/com/redislabs/riot/cli/ProgressReporter.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.redislabs.riot.cli; - -import com.redislabs.riot.transfer.Metrics; - -public interface ProgressReporter { - - void start(); - - void onUpdate(Metrics update); - - void stop(); - -} diff --git a/src/main/java/com/redislabs/riot/cli/ReplicateCommand.java b/src/main/java/com/redislabs/riot/cli/ReplicateCommand.java index c20217c7d..4669ff978 100644 --- a/src/main/java/com/redislabs/riot/cli/ReplicateCommand.java +++ b/src/main/java/com/redislabs/riot/cli/ReplicateCommand.java @@ -47,6 +47,11 @@ public String getChannel(RedisOptions redisOptions) { return channel.replace(DATABASE_TOKEN, String.valueOf(redisOptions.getDatabase())); } + @Override + protected String getMainFlowName() { + return "replication"; + } + @Override protected RedisItemReader reader() throws Exception { return reader(KeyDumpReader.builder().timeout(getTimeout()).build()); @@ -54,9 +59,9 @@ protected RedisItemReader reader() throws Exception { @SuppressWarnings("unchecked") @Override - protected Transfer transfer(ItemReader reader, ItemProcessor processor, + protected Transfer transfer(String mainFlowName, ItemReader reader, ItemProcessor processor, ItemWriter writer) { - Transfer transfer = super.transfer(reader, processor, writer); + Transfer transfer = super.transfer(mainFlowName, reader, processor, writer); if (listen) { RedisOptions source = redisOptions(); KeyspaceNotificationsIterator iterator = KeyspaceNotificationsIterator.builder() @@ -75,7 +80,7 @@ protected Transfer transfer(ItemReader reader, ItemPr @Override protected String taskName() { - return "Replicating " + redisOptions().getServers().get(0) + " to " + target.getServers().get(0); + return "Replicating"; } @Override diff --git a/src/main/java/com/redislabs/riot/cli/TransferCommand.java b/src/main/java/com/redislabs/riot/cli/TransferCommand.java index 472305727..3e87eb055 100644 --- a/src/main/java/com/redislabs/riot/cli/TransferCommand.java +++ b/src/main/java/com/redislabs/riot/cli/TransferCommand.java @@ -1,200 +1,163 @@ package com.redislabs.riot.cli; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - +import com.redislabs.picocliredis.RedisOptions; +import com.redislabs.riot.redis.*; +import com.redislabs.riot.redis.writer.*; +import com.redislabs.riot.transfer.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; - -import com.redislabs.picocliredis.RedisOptions; -import com.redislabs.riot.redis.JedisClusterCommands; -import com.redislabs.riot.redis.JedisPipelineCommands; -import com.redislabs.riot.redis.LettuceAsyncCommands; -import com.redislabs.riot.redis.LettuceReactiveCommands; -import com.redislabs.riot.redis.LettuceSyncCommands; -import com.redislabs.riot.redis.RedisCommands; -import com.redislabs.riot.redis.writer.AbstractCommandWriter; -import com.redislabs.riot.redis.writer.AbstractLettuceItemWriter; -import com.redislabs.riot.redis.writer.AbstractRedisItemWriter; -import com.redislabs.riot.redis.writer.CommandWriter; -import com.redislabs.riot.redis.writer.JedisClusterWriter; -import com.redislabs.riot.redis.writer.JedisPipelineWriter; -import com.redislabs.riot.redis.writer.LettuceAsyncItemWriter; -import com.redislabs.riot.redis.writer.LettuceReactiveItemWriter; -import com.redislabs.riot.redis.writer.LettuceSyncItemWriter; -import com.redislabs.riot.redis.writer.RediSearchCommandWriter; -import com.redislabs.riot.transfer.CappedReader; -import com.redislabs.riot.transfer.ErrorHandler; -import com.redislabs.riot.transfer.Flow; -import com.redislabs.riot.transfer.ThrottledReader; -import com.redislabs.riot.transfer.Transfer; -import com.redislabs.riot.transfer.TransferExecution; - -import lombok.extern.slf4j.Slf4j; import picocli.CommandLine.Command; import picocli.CommandLine.Option; +import java.util.concurrent.TimeUnit; + @Slf4j @Command public abstract class TransferCommand extends RiotCommand { - @Option(names = "--threads", description = "Thread count (default: ${DEFAULT-VALUE})", paramLabel = "") - private int threads = 1; - @Option(names = { "-b", - "--batch" }, description = "Number of items in each batch (default: ${DEFAULT-VALUE})", paramLabel = "") - private int batchSize = 50; - @Option(names = { "-m", "--max" }, description = "Max number of items to read", paramLabel = "") - private Long maxItemCount; - @Option(names = "--sleep", description = "Sleep duration in millis between reads", paramLabel = "") - private Long sleep; - @Option(names = "--progress", description = "Progress reporting interval (default: ${DEFAULT-VALUE} ms)", paramLabel = "") - private long progressRate = 300; - @Option(names = "--max-wait", description = "Max duration to wait for transfer to complete", paramLabel = "") - private Long maxWait; - - @Override - public void run() { - ItemReader reader; - try { - reader = reader(); - } catch (Exception e) { - log.error("Could not initialize reader", e); - return; - } - ItemProcessor processor; - try { - processor = processor(); - } catch (Exception e) { - log.error("Could not initialize processor", e); - return; - } - ItemWriter writer; - try { - writer = writer(); - } catch (Exception e) { - log.error("Could not initialize writer", e); - return; - } - execute(transfer(reader, processor, writer)); - } - - protected abstract ItemReader reader() throws Exception; - - protected abstract ItemProcessor processor() throws Exception; - - protected abstract ItemWriter writer() throws Exception; - - protected Transfer transfer(ItemReader reader, ItemProcessor processor, ItemWriter writer) { - return Transfer.builder().flow(flow("main", reader, processor, writer)).build(); - } - - protected ErrorHandler errorHandler() { - return e -> log.error("Could not read item", e); - } - - protected Flow flow(String name, ItemReader reader, ItemProcessor processor, ItemWriter writer) { - return Flow.builder().name(name).batchSize(batchSize).nThreads(threads).reader(throttle(cap(reader))) - .processor(processor).writer(writer).errorHandler(errorHandler()).build(); - } - - protected void execute(Transfer transfer) { - ProgressReporter reporter = progressReporter(); - reporter.start(); - TransferExecution execution = transfer.execute(); - ScheduledExecutorService progressReportExecutor = Executors.newSingleThreadScheduledExecutor(); - progressReportExecutor.scheduleAtFixedRate(() -> reporter.onUpdate(execution.progress()), 0, progressRate, - TimeUnit.MILLISECONDS); - execution.awaitTermination(maxWait(), TimeUnit.MILLISECONDS); - progressReportExecutor.shutdown(); - reporter.onUpdate(execution.progress()); - reporter.stop(); - } - - private long maxWait() { - if (maxWait == null) { - return Long.MAX_VALUE; - } - return maxWait; - } - - private ItemReader throttle(ItemReader reader) { - if (sleep == null) { - return reader; - } - return ThrottledReader.builder().reader(reader).sleep(sleep).build(); - } - - private ItemReader cap(ItemReader reader) { - if (maxItemCount == null) { - return reader; - } - return new CappedReader(reader, maxItemCount); - } - - private ProgressReporter progressReporter() { - if (isQuiet()) { - return new NoopProgressReporter(); - } - ProgressBarReporter progressBarReporter = new ProgressBarReporter().taskName(taskName()); - if (maxItemCount != null) { - progressBarReporter.initialMax(maxItemCount); - } - return progressBarReporter; - } - - protected abstract String taskName(); - - protected AbstractRedisItemWriter writer(RedisOptions redisOptions, CommandWriter writer) { - if (writer instanceof AbstractCommandWriter) { - ((AbstractCommandWriter) writer).setCommands(redisCommands(redisOptions)); - } - if (redisOptions.isJedis()) { - if (redisOptions.isCluster()) { - return JedisClusterWriter.builder().cluster(redisOptions.jedisCluster()).writer(writer).build(); - } - return JedisPipelineWriter.builder().pool(redisOptions.jedisPool()).writer(writer).build(); - } - AbstractLettuceItemWriter lettuceWriter = lettuceWriter(redisOptions); - lettuceWriter.setWriter(writer); - if (writer instanceof RediSearchCommandWriter) { - lettuceWriter.setApi(redisOptions.lettuSearchApi()); - lettuceWriter.setPool(redisOptions.pool(redisOptions.rediSearchClient())); - } else { - lettuceWriter.setApi(redisOptions.lettuceApi()); - lettuceWriter.setPool(redisOptions.pool(redisOptions.lettuceClient())); - } - return lettuceWriter; - } - - private AbstractLettuceItemWriter lettuceWriter(RedisOptions redisOptions) { - switch (redisOptions.getApi()) { - case REACTIVE: - return LettuceReactiveItemWriter.builder().build(); - case SYNC: - return LettuceSyncItemWriter.builder().build(); - default: - return LettuceAsyncItemWriter.builder().timeout(redisOptions.getCommandTimeout()) - .fireAndForget(redisOptions.isFireAndForget()).build(); - } - } - - private RedisCommands redisCommands(RedisOptions redis) { - if (redis.isJedis()) { - if (redis.isCluster()) { - return new JedisClusterCommands(); - } - return new JedisPipelineCommands(); - } - switch (redis.getApi()) { - case REACTIVE: - return new LettuceReactiveCommands(); - case SYNC: - return new LettuceSyncCommands(); - default: - return new LettuceAsyncCommands(); - } - } + @Option(names = "--threads", description = "Thread count (default: ${DEFAULT-VALUE})", paramLabel = "") + private int threads = 1; + @Option(names = {"-b", + "--batch"}, description = "Number of items in each batch (default: ${DEFAULT-VALUE})", paramLabel = "") + private int batchSize = 50; + @Option(names = {"-m", "--max"}, description = "Max number of items to read", paramLabel = "") + private Long maxItemCount; + @Option(names = "--sleep", description = "Sleep duration in millis between reads", paramLabel = "") + private Long sleep; + @Option(names = "--progress", description = "Progress reporting interval (default: ${DEFAULT-VALUE} ms)", paramLabel = "") + private long progressRate = 300; + @Option(names = "--max-wait", description = "Max duration to wait for transfer to complete", paramLabel = "") + private Long maxWait; + + @Override + public void run() { + ItemReader reader; + try { + reader = reader(); + } catch (Exception e) { + log.error("Could not initialize reader", e); + return; + } + ItemProcessor processor; + try { + processor = processor(); + } catch (Exception e) { + log.error("Could not initialize processor", e); + return; + } + ItemWriter writer; + try { + writer = writer(); + } catch (Exception e) { + log.error("Could not initialize writer", e); + return; + } + execute(transfer(getMainFlowName(), reader, processor, writer)); + } + + protected abstract String getMainFlowName(); + + protected abstract ItemReader reader() throws Exception; + + protected abstract ItemProcessor processor() throws Exception; + + protected abstract ItemWriter writer() throws Exception; + + protected Transfer transfer(String name, ItemReader reader, ItemProcessor processor, ItemWriter writer) { + return Transfer.builder().flow(flow(name, reader, processor, writer)).build(); + } + + protected ErrorHandler errorHandler() { + return e -> log.error("Could not read item", e); + } + + protected Flow flow(String name, ItemReader reader, ItemProcessor processor, ItemWriter writer) { + return Flow.builder().name(name).batchSize(batchSize).nThreads(threads).reader(throttle(cap(reader))) + .processor(processor).writer(writer).errorHandler(errorHandler()).build(); + } + + protected void execute(Transfer transfer) { + TransferExecution execution = transfer.execute(); + if (!isQuiet()) { + transfer.addListener(ProgressBarReporter.builder().taskName(taskName()).initialMax(maxItemCount).period(progressRate).timeUnit(TimeUnit.MILLISECONDS).metricsProvider(execution).build()); + } + execution.awaitTermination(maxWait(), TimeUnit.MILLISECONDS); + } + + private long maxWait() { + if (maxWait == null) { + return Long.MAX_VALUE; + } + return maxWait; + } + + private ItemReader throttle(ItemReader reader) { + if (sleep == null) { + return reader; + } + return ThrottledReader.builder().reader(reader).sleep(sleep).build(); + } + + private ItemReader cap(ItemReader reader) { + if (maxItemCount == null) { + return reader; + } + return new CappedReader<>(reader, maxItemCount); + } + + protected abstract String taskName(); + + protected AbstractRedisItemWriter writer(RedisOptions redisOptions, CommandWriter writer) { + if (writer instanceof AbstractCommandWriter) { + ((AbstractCommandWriter) writer).setCommands(redisCommands(redisOptions)); + } + if (redisOptions.isJedis()) { + if (redisOptions.isCluster()) { + return JedisClusterWriter.builder().cluster(redisOptions.jedisCluster()).writer(writer).build(); + } + return JedisPipelineWriter.builder().pool(redisOptions.jedisPool()).writer(writer).build(); + } + AbstractLettuceItemWriter lettuceWriter = lettuceWriter(redisOptions); + lettuceWriter.setWriter(writer); + if (writer instanceof RediSearchCommandWriter) { + lettuceWriter.setApi(redisOptions.lettuSearchApi()); + lettuceWriter.setPool(redisOptions.pool(redisOptions.rediSearchClient())); + } else { + lettuceWriter.setApi(redisOptions.lettuceApi()); + lettuceWriter.setPool(redisOptions.pool(redisOptions.lettuceClient())); + } + return lettuceWriter; + } + + private AbstractLettuceItemWriter lettuceWriter(RedisOptions redisOptions) { + switch (redisOptions.getApi()) { + case REACTIVE: + return LettuceReactiveItemWriter.builder().build(); + case SYNC: + return LettuceSyncItemWriter.builder().build(); + default: + return LettuceAsyncItemWriter.builder().timeout(redisOptions.getCommandTimeout()) + .fireAndForget(redisOptions.isFireAndForget()).build(); + } + } + + private RedisCommands redisCommands(RedisOptions redis) { + if (redis.isJedis()) { + if (redis.isCluster()) { + return new JedisClusterCommands(); + } + return new JedisPipelineCommands(); + } + switch (redis.getApi()) { + case REACTIVE: + return new LettuceReactiveCommands(); + case SYNC: + return new LettuceSyncCommands(); + default: + return new LettuceAsyncCommands(); + } + } } diff --git a/src/main/java/com/redislabs/riot/file/StandardInputResource.java b/src/main/java/com/redislabs/riot/file/StandardInputResource.java index 25b8acc35..96445ac3a 100644 --- a/src/main/java/com/redislabs/riot/file/StandardInputResource.java +++ b/src/main/java/com/redislabs/riot/file/StandardInputResource.java @@ -11,7 +11,7 @@ public StandardInputResource() { } @Override - public InputStream getInputStream() throws IOException, IllegalStateException { + public InputStream getInputStream() { return System.in; } } \ No newline at end of file diff --git a/src/main/java/com/redislabs/riot/transfer/Flow.java b/src/main/java/com/redislabs/riot/transfer/Flow.java index 8fb0f13f2..378120c25 100644 --- a/src/main/java/com/redislabs/riot/transfer/Flow.java +++ b/src/main/java/com/redislabs/riot/transfer/Flow.java @@ -1,44 +1,77 @@ package com.redislabs.riot.transfer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; - import lombok.Builder; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.*; + +import java.util.ArrayList; +import java.util.List; @Builder +@Slf4j public class Flow { + @Getter + private final String name; + @Getter + private final ItemReader reader; + private final ItemProcessor processor; + @Getter + private final ItemWriter writer; + private final int nThreads; + private final int batchSize; + @Setter + private Long flushRate; + private final ErrorHandler errorHandler; + private final List listeners = new ArrayList<>(); + @Getter + private boolean open; + + public void addListener(Listener listener) { + listeners.add(listener); + } + + public FlowExecution execute() { + List> flowThreads = new ArrayList<>(nThreads); + for (int index = 0; index < nThreads; index++) { + Batcher batcher = Batcher.builder().reader(reader).processor(processor).batchSize(batchSize) + .errorHandler(errorHandler).build(); + flowThreads.add(FlowThread.builder().flow(this).threadId(index).threads(nThreads).batcher(batcher) + .flushRate(flushRate).build()); + } + return FlowExecution.builder().threads(flowThreads).build().execute(); + } + + public void close() throws ItemStreamException { + if (reader instanceof ItemStream) { + ((ItemStream) reader).close(); + } + log.debug("Closing writer"); + if (writer instanceof ItemStream) { + ((ItemStream) writer).close(); + } + this.open = false; + listeners.forEach(l -> l.onClose(this)); + } + + public void open(ExecutionContext executionContext) throws ItemStreamException { + if (reader instanceof ItemStream) { + log.debug("Opening reader"); + ((ItemStream) reader).open(executionContext); + } + if (writer instanceof ItemStream) { + log.debug("Opening writer"); + ((ItemStream) writer).open(executionContext); + } + this.open = true; + listeners.forEach(l -> l.onOpen(this)); + } + + public interface Listener { - private @Getter String name; - private @Getter ItemReader reader; - private ItemProcessor processor; - private @Getter ItemWriter writer; - private int nThreads; - private int batchSize; - private @Setter Long flushRate; - private ErrorHandler errorHandler; - - public FlowExecution execute() { - List> flowThreads = new ArrayList<>(nThreads); - for (int index = 0; index < nThreads; index++) { - Batcher batcher = Batcher.builder().reader(reader).processor(processor).batchSize(batchSize) - .errorHandler(errorHandler).build(); - flowThreads.add(FlowThread.builder().flow(this).threadId(index).threads(nThreads).batcher(batcher) - .flushRate(flushRate).build()); - } - ExecutorService executor = Executors.newFixedThreadPool(flowThreads.size()); - for (FlowThread thread : flowThreads) { - executor.submit(thread); - } - executor.shutdown(); - return FlowExecution.builder().threads(flowThreads).executor(executor).build(); - } + void onOpen(Flow flow); + void onClose(Flow flow); + } } diff --git a/src/main/java/com/redislabs/riot/transfer/FlowExecution.java b/src/main/java/com/redislabs/riot/transfer/FlowExecution.java index 8912b94b9..246493698 100644 --- a/src/main/java/com/redislabs/riot/transfer/FlowExecution.java +++ b/src/main/java/com/redislabs/riot/transfer/FlowExecution.java @@ -1,17 +1,27 @@ package com.redislabs.riot.transfer; +import lombok.Builder; +import lombok.Getter; +import lombok.Singular; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import lombok.Builder; - @Builder +@Slf4j public class FlowExecution { + @Getter + @Singular private List> threads; private ExecutorService executor; + private List> futures; public void stop() { threads.forEach(t -> t.stop()); @@ -35,4 +45,14 @@ public void awaitTermination(long timeout, TimeUnit unit) { } } + public FlowExecution execute() { + this.executor = Executors.newFixedThreadPool(threads.size()); + this.futures = new ArrayList<>(threads.size()); + for (FlowThread thread : threads) { + this.futures.add(executor.submit(thread)); + } + executor.shutdown(); + return this; + } + } \ No newline at end of file diff --git a/src/main/java/com/redislabs/riot/transfer/FlowThread.java b/src/main/java/com/redislabs/riot/transfer/FlowThread.java index c2e19fd73..7990faa6d 100644 --- a/src/main/java/com/redislabs/riot/transfer/FlowThread.java +++ b/src/main/java/com/redislabs/riot/transfer/FlowThread.java @@ -1,110 +1,110 @@ package com.redislabs.riot.transfer; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.item.ExecutionContext; + import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.springframework.batch.item.ExecutionContext; -import org.springframework.batch.item.ItemStream; - -import lombok.Builder; -import lombok.extern.slf4j.Slf4j; - @Slf4j -@SuppressWarnings({ "rawtypes", "unchecked" }) public class FlowThread implements Runnable { - public final static String CONTEXT_PARTITION = "partition"; - public final static String CONTEXT_PARTITIONS = "partitions"; + public final static String CONTEXT_PARTITION = "partition"; + public final static String CONTEXT_PARTITIONS = "partitions"; + + private final int threadId; + private final int threads; + @Getter + private final Flow flow; + private final Batcher batcher; + private final Long flushRate; + private long readCount; + private long writeCount; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private ScheduledFuture flushFuture; + private boolean running; + private boolean stopped; + + @Builder + public FlowThread(int threadId, int threads, Flow flow, Batcher batcher, Long flushRate) { + super(); + this.threadId = threadId; + this.threads = threads; + this.flow = flow; + this.batcher = batcher; + this.flushRate = flushRate; + } - private int threadId; - private int threads; - private Flow flow; - private Batcher batcher; - private Long flushRate; - private long readCount; - private long writeCount; - private boolean running; - private boolean stopped; + @Override + public void run() { + ExecutionContext executionContext = new ExecutionContext(); + executionContext.putInt(CONTEXT_PARTITION, threadId); + executionContext.putInt(CONTEXT_PARTITIONS, threads); + try { + flow.open(executionContext); + } catch (Exception e) { + log.error("Could not open {}", flow.getName(), e); + return; + } + if (flushRate != null) { + flushFuture = scheduler.scheduleAtFixedRate(new Flusher(), flushRate, flushRate, TimeUnit.MILLISECONDS); + } + running = true; + try { + List items; + while ((items = batcher.next()) != null && !stopped) { + write(items); + } + } catch (Exception e) { + log.error("Error during {}", flow.getName(), e); + } finally { + if (flushFuture != null) { + flushFuture.cancel(true); + } + scheduler.shutdown(); + try { + flow.close(); + } catch (Exception e) { + log.error("Could not close {}", flow.getName(), e); + } + this.running = false; + log.debug("Flow {} thread {} finished", flow.getName(), threadId); + } - @Builder - public FlowThread(int threadId, int threads, Flow flow, Batcher batcher, Long flushRate) { - super(); - this.threadId = threadId; - this.threads = threads; - this.flow = flow; - this.batcher = batcher; - this.flushRate = flushRate; - } + } - @Override - public void run() { - try { - ExecutionContext executionContext = new ExecutionContext(); - executionContext.putInt(CONTEXT_PARTITION, threadId); - executionContext.putInt(CONTEXT_PARTITIONS, threads); - if (flow.getReader() instanceof ItemStream) { - ((ItemStream) flow.getReader()).open(executionContext); - } - if (flow.getWriter() instanceof ItemStream) { - ((ItemStream) flow.getWriter()).open(executionContext); - } - ScheduledExecutorService scheduler = null; - ScheduledFuture flushFuture = null; - if (flushRate != null) { - scheduler = Executors.newSingleThreadScheduledExecutor(); - flushFuture = scheduler.scheduleAtFixedRate(this::flush, flushRate, flushRate, TimeUnit.MILLISECONDS); - } - this.running = true; - List items; - while ((items = batcher.next()) != null && !stopped) { - write(items); - } - if (scheduler != null) { - scheduler.shutdown(); - } - if (flushFuture != null) { - flushFuture.cancel(true); - } - log.debug("Closing reader"); - if (flow.getReader() instanceof ItemStream) { - ((ItemStream) flow.getReader()).close(); - } - if (flow.getWriter() instanceof ItemStream) { - ((ItemStream) flow.getWriter()).close(); - } - this.running = false; - log.debug("Flow {} thread {} finished", flow.getName(), threadId); - } catch (Throwable e) { - log.error("Flow {} execution failed", flow.getName(), e); - } - } + public Metrics progress() { + return Metrics.builder().reads(readCount).writes(writeCount).runningThreads(running ? 1 : 0).build(); + } - public Metrics progress() { - return Metrics.builder().reads(readCount).writes(writeCount).runningThreads(running ? 1 : 0).build(); - } + public void stop() { + stopped = true; + } - public void stop() { - stopped = true; - } + private void write(List items) throws Exception { + readCount += items.size(); + flow.getWriter().write(items); + writeCount += items.size(); + } - private void write(List items) { - readCount += items.size(); - try { - flow.getWriter().write(items); - writeCount += items.size(); - } catch (Exception e) { - log.error("Could not write items", e); - } - } + private class Flusher implements Runnable { - private void flush() { - List items = batcher.flush(); - if (!items.isEmpty()) { - write(items); - } - } + @Override + public void run() { + List items = batcher.flush(); + if (!items.isEmpty()) { + try { + FlowThread.this.write(items); + } catch (Exception e) { + log.error("Could not flush {}", flow.getName(), e); + } + } + } + } } diff --git a/src/main/java/com/redislabs/riot/transfer/MetricsProvider.java b/src/main/java/com/redislabs/riot/transfer/MetricsProvider.java new file mode 100644 index 000000000..5128a9af5 --- /dev/null +++ b/src/main/java/com/redislabs/riot/transfer/MetricsProvider.java @@ -0,0 +1,9 @@ +package com.redislabs.riot.transfer; + +import java.util.stream.Collectors; + +public interface MetricsProvider { + + Metrics getMetrics(); + +} diff --git a/src/main/java/com/redislabs/riot/transfer/Transfer.java b/src/main/java/com/redislabs/riot/transfer/Transfer.java index a5b9d7cc0..98256a100 100644 --- a/src/main/java/com/redislabs/riot/transfer/Transfer.java +++ b/src/main/java/com/redislabs/riot/transfer/Transfer.java @@ -1,31 +1,67 @@ package com.redislabs.riot.transfer; +import com.redislabs.riot.transfer.TransferExecution.TransferExecutionBuilder; +import lombok.Builder; +import lombok.Getter; +import lombok.Singular; + import java.util.ArrayList; import java.util.List; -import com.redislabs.riot.transfer.TransferExecution.TransferExecutionBuilder; +public class Transfer implements Flow.Listener { -import lombok.Builder; + @Getter + private final List> flows; + private final List listeners = new ArrayList<>(); + + @Builder + private Transfer(Flow flow) { + this.flows = new ArrayList<>(); + this.flows.add(flow); + } + + public void addListener(Listener listener) { + listeners.add(listener); + } + + public Transfer add(Flow flow) { + flows.add(flow); + return this; + } -public class Transfer { + public TransferExecution execute() { + flows.forEach(f -> f.addListener(this)); + TransferExecutionBuilder builder = TransferExecution.builder(); + flows.forEach(f -> builder.flow(f.execute())); + return builder.build(); + } - private List> flows; + @Override + public void onOpen(Flow flow) { + boolean allOpen = true; + for (Flow f : flows) { + allOpen &= f.isOpen(); + } + if (allOpen) { + listeners.forEach(Listener::onOpen); + } + } - @Builder - private Transfer(Flow flow) { - this.flows = new ArrayList<>(); - this.flows.add(flow); - } + @Override + public void onClose(Flow flow) { + boolean allClosed = true; + for (Flow f : flows) { + allClosed &= !f.isOpen(); + } + if (allClosed) { + listeners.forEach(Listener::onClose); + } + } - public Transfer add(Flow flow) { - this.flows.add(flow); - return this; - } + public interface Listener { + void onOpen(); - public TransferExecution execute() { - TransferExecutionBuilder builder = TransferExecution.builder(); - flows.forEach(f -> builder.flow(f.execute())); - return builder.build(); - } + void onClose(); + } } \ No newline at end of file diff --git a/src/main/java/com/redislabs/riot/transfer/TransferExecution.java b/src/main/java/com/redislabs/riot/transfer/TransferExecution.java index 7c1bbb09e..9c688ebb5 100644 --- a/src/main/java/com/redislabs/riot/transfer/TransferExecution.java +++ b/src/main/java/com/redislabs/riot/transfer/TransferExecution.java @@ -8,25 +8,25 @@ import lombok.Singular; @Builder -public class TransferExecution { - - @Singular - private List> flows; - - public Metrics progress() { - return Metrics.builder().metrics(flows.stream().map(f -> f.progress()).collect(Collectors.toList())).build(); - } - - public boolean isTerminated() { - boolean finished = true; - for (FlowExecution flowExecution : flows) { - finished &= flowExecution.isTerminated(); - } - return finished; - } - - public void awaitTermination(long timeout, TimeUnit unit) { - flows.forEach(f -> f.awaitTermination(timeout, unit)); - } - +public class TransferExecution implements MetricsProvider { + + @Singular + private List> flows; + + public boolean isTerminated() { + boolean finished = true; + for (FlowExecution flowExecution : flows) { + finished &= flowExecution.isTerminated(); + } + return finished; + } + + public void awaitTermination(long timeout, TimeUnit unit) { + flows.forEach(f -> f.awaitTermination(timeout, unit)); + } + + @Override + public Metrics getMetrics() { + return Metrics.builder().metrics(flows.stream().map(f -> f.progress()).collect(Collectors.toList())).build(); + } }