diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractCommand.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractCommand.java index 9d7661000..28cc795d0 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractCommand.java @@ -13,7 +13,7 @@ import picocli.CommandLine.Option; @Command -public abstract class AbstractCommand extends BaseCommand implements Callable { +public abstract class AbstractCommand extends BaseCommand implements Callable { @Option(names = "--help", usageHelp = true, description = "Show this help message and exit.") private boolean helpRequested; @@ -29,14 +29,11 @@ public Integer call() throws Exception { setupLogging(); log = LoggerFactory.getLogger(getClass()); } - try (C context = executionContext()) { - context.afterPropertiesSet(); - execute(context); - } + execute(); return 0; } - protected abstract C executionContext(); + protected abstract void execute() throws Exception; private void setupLogging() { Level level = logLevel(); @@ -73,8 +70,6 @@ private static void setBoolean(String property, boolean value) { System.setProperty(property, String.valueOf(value)); } - protected abstract void execute(C context); - public LoggingArgs getLoggingArgs() { return loggingArgs; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java index f034c6e7a..737dd04cc 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java @@ -12,11 +12,16 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.builder.SimpleJobBuilder; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; +import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStreamReader; @@ -26,11 +31,14 @@ import org.springframework.batch.item.support.SynchronizedItemStreamReader; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.core.task.TaskExecutor; +import org.springframework.retry.policy.AlwaysRetryPolicy; +import org.springframework.retry.policy.NeverRetryPolicy; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.Assert; import com.redis.spring.batch.JobUtils; +import com.redis.spring.batch.item.AbstractAsyncItemReader; import com.redis.spring.batch.item.AbstractPollableItemReader; import com.redis.spring.batch.step.FlushingStepBuilder; @@ -39,7 +47,7 @@ import picocli.CommandLine.Option; @Command -public abstract class AbstractJobCommand extends AbstractCommand { +public abstract class AbstractJobCommand extends AbstractCommand { public static final String DEFAULT_JOB_REPOSITORY_NAME = "riot"; @@ -54,36 +62,38 @@ public abstract class AbstractJobCommand extends private PlatformTransactionManager transactionManager; private JobLauncher jobLauncher; - @Override - protected C executionContext() { - C context = newExecutionContext(); - context.setJobName(jobName()); - context.setJobRepositoryName(jobRepositoryName); - context.setJobRepository(jobRepository); - context.setTransactionManager(transactionManager); - context.setJobLauncher(jobLauncher); - return context; + private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception { + TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher(); + launcher.setJobRepository(jobRepository); + launcher.setTaskExecutor(new SyncTaskExecutor()); + launcher.afterPropertiesSet(); + return launcher; } - protected abstract C newExecutionContext(); + protected void configureAsyncReader(AbstractAsyncItemReader reader) { + reader.setJobRepository(jobRepository); + } - private String jobName() { - if (jobName == null) { - Assert.notNull(commandSpec, "Command spec not set"); - return commandSpec.name(); - } - return jobName; + private JobBuilder jobBuilder() { + return new JobBuilder(jobName, jobRepository); } @Override - protected void execute(C context) { - Job job = job(context); - JobExecution jobExecution; - try { - jobExecution = context.getJobLauncher().run(job, new JobParameters()); - } catch (JobExecutionException e) { - throw new RiotException("Job failed", e); + protected void execute() throws Exception { + if (jobName == null) { + Assert.notNull(commandSpec, "Command spec not set"); + jobName = commandSpec.name(); + } + if (jobRepository == null) { + jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject(); } + if (transactionManager == null) { + transactionManager = JobUtils.resourcelessTransactionManager(); + } + if (jobLauncher == null) { + jobLauncher = taskExecutorJobLauncher(); + } + JobExecution jobExecution = jobLauncher.run(job(), new JobParameters()); if (JobUtils.isFailed(jobExecution.getExitStatus())) { for (StepExecution stepExecution : jobExecution.getStepExecutions()) { if (JobUtils.isFailed(stepExecution.getExitStatus())) { @@ -94,23 +104,23 @@ protected void execute(C context) { } } - private RiotException wrapException(List throwables) { + private JobExecutionException wrapException(List throwables) { if (throwables.isEmpty()) { - return new RiotException("Job failed"); + return new JobExecutionException("Job failed"); } - return new RiotException("Job failed", throwables.get(0)); + return new JobExecutionException("Job failed", throwables.get(0)); } - protected Job job(C context, Step... steps) { - return job(context, Arrays.asList(steps)); + protected Job job(Step... steps) { + return job(Arrays.asList(steps)); } - protected Job job(C context, Collection> steps) { + protected Job job(Collection> steps) { Assert.notEmpty(steps, "At least one step must be specified"); Iterator> iterator = steps.iterator(); - SimpleJobBuilder job = context.jobBuilder().start(step(context, iterator.next())); + SimpleJobBuilder job = jobBuilder().start(step(iterator.next())); while (iterator.hasNext()) { - job.next(step(context, iterator.next())); + job.next(step(iterator.next())); } return job.build(); } @@ -119,41 +129,58 @@ protected boolean shouldShowProgress() { return stepArgs.getProgressArgs().getStyle() != ProgressStyle.NONE; } - protected abstract Job job(C context); + protected abstract Job job() throws Exception; - private TaskletStep step(C context, Step step) { - SimpleStepBuilder builder = simpleStep(context, step); + private TaskletStep step(Step step) { + SimpleStepBuilder builder = simpleStep(step); if (stepArgs.getRetryPolicy() == RetryPolicy.NEVER && stepArgs.getSkipPolicy() == SkipPolicy.NEVER) { return builder.build(); } FaultTolerantStepBuilder ftStep = JobUtils.faultTolerant(builder); - if (stepArgs.getSkipPolicy() == SkipPolicy.LIMIT) { - ftStep.skipLimit(stepArgs.getSkipLimit()); - step.getSkip().forEach(ftStep::skip); - step.getNoSkip().forEach(ftStep::noSkip); - } else { - ftStep.skipPolicy(stepArgs.skipPolicy()); + step.getSkip().forEach(ftStep::skip); + step.getNoSkip().forEach(ftStep::noSkip); + step.getRetry().forEach(ftStep::retry); + step.getNoRetry().forEach(ftStep::noRetry); + ftStep.retryLimit(stepArgs.getRetryLimit()); + ftStep.retryPolicy(retryPolicy()); + ftStep.skipLimit(stepArgs.getSkipLimit()); + ftStep.skipPolicy(skipPolicy()); + return ftStep.build(); + } + + private org.springframework.retry.RetryPolicy retryPolicy() { + switch (stepArgs.getRetryPolicy()) { + case ALWAYS: + return new AlwaysRetryPolicy(); + case NEVER: + return new NeverRetryPolicy(); + default: + return null; } - if (stepArgs.getRetryPolicy() == RetryPolicy.LIMIT) { - ftStep.retryLimit(stepArgs.getRetryLimit()); - step.getRetry().forEach(ftStep::retry); - step.getNoRetry().forEach(ftStep::noRetry); - } else { - ftStep.retryPolicy(stepArgs.retryPolicy()); + } + + private org.springframework.batch.core.step.skip.SkipPolicy skipPolicy() { + switch (stepArgs.getSkipPolicy()) { + case ALWAYS: + return new AlwaysSkipItemSkipPolicy(); + case NEVER: + return new NeverSkipItemSkipPolicy(); + default: + return null; } - return ftStep.build(); } @SuppressWarnings("removal") - private SimpleStepBuilder simpleStep(C context, Step step) { - String stepName = context.getJobName() + "-" + step.getName(); + private SimpleStepBuilder simpleStep(Step step) { + String name = jobName + "-" + step.getName(); if (step.getReader() instanceof ItemStreamSupport) { ItemStreamSupport support = (ItemStreamSupport) step.getReader(); - Assert.notNull(support.getName(), "No name specified for reader in step " + stepName); - support.setName(stepName + "-" + support.getName()); + Assert.notNull(support.getName(), "No name specified for reader in step " + name); + support.setName(name + "-" + support.getName()); } - log.info("Creating step {} with chunk size {}", stepName, stepArgs.getChunkSize()); - SimpleStepBuilder builder = context.step(stepName, stepArgs.getChunkSize()); + log.info("Creating step {} with chunk size {}", name, stepArgs.getChunkSize()); + SimpleStepBuilder builder = new StepBuilder(name, jobRepository).chunk(stepArgs.getChunkSize(), + transactionManager); builder.reader(reader(step)); builder.writer(writer(step)); builder.processor(step.getProcessor()); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/ExecutionContext.java b/core/riot-core/src/main/java/com/redis/riot/core/ExecutionContext.java deleted file mode 100644 index e2d1d679d..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/ExecutionContext.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.redis.riot.core; - -import org.springframework.beans.factory.InitializingBean; - -public interface ExecutionContext extends InitializingBean, AutoCloseable { - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/JobExecutionContext.java b/core/riot-core/src/main/java/com/redis/riot/core/JobExecutionContext.java deleted file mode 100644 index 673ff6cf8..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/JobExecutionContext.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.redis.riot.core; - -import org.springframework.batch.core.job.builder.JobBuilder; -import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher; -import org.springframework.batch.core.repository.JobRepository; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.core.step.builder.StepBuilder; -import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.transaction.PlatformTransactionManager; - -import com.redis.spring.batch.JobUtils; -import com.redis.spring.batch.item.AbstractAsyncItemReader; - -public class JobExecutionContext implements ExecutionContext { - - private String jobName; - private String jobRepositoryName; - private JobRepository jobRepository; - private PlatformTransactionManager transactionManager; - private JobLauncher jobLauncher; - - @Override - public void afterPropertiesSet() throws Exception { - if (jobRepository == null) { - jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject(); - } - if (transactionManager == null) { - transactionManager = JobUtils.resourcelessTransactionManager(); - } - if (jobLauncher == null) { - jobLauncher = taskExecutorJobLauncher(); - } - } - - private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception { - TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher(); - launcher.setJobRepository(jobRepository); - launcher.setTaskExecutor(new SyncTaskExecutor()); - launcher.afterPropertiesSet(); - return launcher; - } - - public void configure(AbstractAsyncItemReader reader) { - reader.setJobRepository(jobRepository); - } - - public JobBuilder jobBuilder() { - return new JobBuilder(jobName, jobRepository); - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public JobRepository getJobRepository() { - return jobRepository; - } - - public void setJobRepository(JobRepository jobRepository) { - this.jobRepository = jobRepository; - } - - public PlatformTransactionManager getTransactionManager() { - return transactionManager; - } - - public void setTransactionManager(PlatformTransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - public JobLauncher getJobLauncher() { - return jobLauncher; - } - - public void setJobLauncher(JobLauncher jobLauncher) { - this.jobLauncher = jobLauncher; - } - - public String getJobRepositoryName() { - return jobRepositoryName; - } - - public void setJobRepositoryName(String name) { - this.jobRepositoryName = name; - } - - @Override - public void close() throws Exception { - // do nothing - } - - public SimpleStepBuilder step(String name, int chunkSize) { - return new StepBuilder(name, jobRepository).chunk(chunkSize, transactionManager); - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/PrintExceptionMessageHandler.java b/core/riot-core/src/main/java/com/redis/riot/core/PrintExceptionMessageHandler.java index 12ef12c39..09afb6319 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/PrintExceptionMessageHandler.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/PrintExceptionMessageHandler.java @@ -9,7 +9,7 @@ public class PrintExceptionMessageHandler implements IExecutionExceptionHandler public int handleExecutionException(Exception ex, CommandLine cmd, ParseResult parseResult) { if (cmd.getCommand() instanceof AbstractCommand) { - if (((AbstractCommand) cmd.getCommand()).getLoggingArgs().isStacktrace()) { + if (((AbstractCommand) cmd.getCommand()).getLoggingArgs().isStacktrace()) { ex.printStackTrace(cmd.getErr()); } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotException.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotException.java deleted file mode 100644 index 3f464a425..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotException.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.redis.riot.core; - -@SuppressWarnings("serial") -public class RiotException extends RuntimeException { - - public RiotException(String msg) { - super(msg); - } - - public RiotException(String msg, Throwable cause) { - super(msg, cause); - } - - public RiotException(Throwable cause) { - super(cause); - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java index 6884ee144..aec767d2a 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java @@ -15,6 +15,7 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -23,12 +24,31 @@ import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.ClassUtils; +import org.springframework.util.ObjectUtils; public abstract class RiotUtils { private RiotUtils() { } + public static String mask(char[] password) { + if (ObjectUtils.isEmpty(password)) { + return null; + } + return mask(password.length); + } + + private static String mask(int length) { + return IntStream.range(0, length).mapToObj(i -> "*").collect(Collectors.joining()); + } + + public static String mask(String password) { + if (ObjectUtils.isEmpty(password)) { + return null; + } + return mask(password.length()); + } + public static ItemProcessor processor(Collection> functions) { return processor(functions.toArray(new Function[0])); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/Step.java b/core/riot-core/src/main/java/com/redis/riot/core/Step.java index f5ce7e3fc..62b208746 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/Step.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/Step.java @@ -22,7 +22,7 @@ public class Step { private static final long NO_MAX_ITEM_COUNT = -1; private static final String EMPTY_STRING = ""; - private final String name; + protected final String name; private final ItemReader reader; private final ItemWriter writer; private String taskName; diff --git a/core/riot-core/src/main/java/com/redis/riot/core/StepArgs.java b/core/riot-core/src/main/java/com/redis/riot/core/StepArgs.java index 52bdff657..455861800 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/StepArgs.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/StepArgs.java @@ -1,11 +1,6 @@ package com.redis.riot.core; -import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; -import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; -import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; -import org.springframework.retry.policy.AlwaysRetryPolicy; import org.springframework.retry.policy.MaxAttemptsRetryPolicy; -import org.springframework.retry.policy.NeverRetryPolicy; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; @@ -124,27 +119,4 @@ public String toString() { + ", retryLimit=" + retryLimit + ", progressArgs=" + progressArgs + "]"; } - public org.springframework.batch.core.step.skip.SkipPolicy skipPolicy() { - switch (skipPolicy) { - case ALWAYS: - return new AlwaysSkipItemSkipPolicy(); - case NEVER: - return new NeverSkipItemSkipPolicy(); - default: - return new LimitCheckingItemSkipPolicy(); - } - } - - public org.springframework.retry.RetryPolicy retryPolicy() { - switch (retryPolicy) { - case ALWAYS: - return new AlwaysRetryPolicy(); - case NEVER: - return new NeverRetryPolicy(); - default: - return new MaxAttemptsRetryPolicy(); - } - - } - } diff --git a/gradle.properties b/gradle.properties index 8e2d71568..ac4036c9f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -37,7 +37,7 @@ latencyUtilsVersion = 2.0.3 lettucemodVersion = 4.0.0 picocliVersion = 4.7.6 progressbarVersion = 0.10.1 -springBatchRedisVersion = 4.4.2 +springBatchRedisVersion = 4.4.3 testcontainersRedisVersion = 2.2.2 org.gradle.daemon = false diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java index 32dd1a6d4..c3d5a8044 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java @@ -15,7 +15,7 @@ import io.lettuce.core.codec.ByteArrayCodec; import picocli.CommandLine.Option; -public abstract class AbstractCompareCommand extends AbstractTargetCommand { +public abstract class AbstractCompareCommand extends AbstractRedisToRedisCommand { public static final Duration DEFAULT_TTL_TOLERANCE = DefaultKeyComparator.DEFAULT_TTL_TOLERANCE; public static final String COMPARE_STEP_NAME = "compare"; @@ -36,8 +36,8 @@ private String compareMessage(Collection counts) { return builder.toString(); } - protected Step, KeyComparison> compareStep(TargetRedisExecutionContext context) { - KeyComparisonItemReader reader = compareReader(context); + protected Step, KeyComparison> compareStep() { + KeyComparisonItemReader reader = compareReader(); CompareStatusItemWriter writer = new CompareStatusItemWriter<>(); Step, KeyComparison> step = new Step<>(COMPARE_STEP_NAME, reader, writer); step.taskName(COMPARE_TASK_NAME); @@ -53,16 +53,18 @@ protected Step, KeyComparison> compareStep(TargetR private RedisItemReader compareRedisReader() { if (isQuickCompare()) { + log.info("Creating Redis quick compare reader"); return RedisItemReader.type(ByteArrayCodec.INSTANCE); } + log.info("Creating Redis full compare reader"); return RedisItemReader.struct(ByteArrayCodec.INSTANCE); } protected abstract boolean isQuickCompare(); - protected KeyComparisonItemReader compareReader(TargetRedisExecutionContext context) { - RedisItemReader source = compareSourceReader(context); - RedisItemReader target = compareTargetReader(context); + protected KeyComparisonItemReader compareReader() { + RedisItemReader source = compareSourceReader(); + RedisItemReader target = compareTargetReader(); KeyComparisonItemReader reader = new KeyComparisonItemReader<>(source, target); reader.setComparator(keyComparator()); return reader; @@ -80,15 +82,15 @@ private KeyComparator keyComparator() { protected abstract boolean isIgnoreStreamMessageId(); - private RedisItemReader compareSourceReader(TargetRedisExecutionContext context) { + private RedisItemReader compareSourceReader() { RedisItemReader reader = compareRedisReader(); - configureSourceReader(context, reader); + configureSourceReader(reader); return reader; } - private RedisItemReader compareTargetReader(TargetRedisExecutionContext context) { + private RedisItemReader compareTargetReader() { RedisItemReader reader = compareRedisReader(); - configureTargetReader(context, reader); + configureTargetReader(reader); return reader; } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java index 3c8548411..ddd82da81 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java @@ -1,33 +1,23 @@ package com.redis.riot; import java.util.Map; -import java.util.Set; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.function.FunctionItemProcessor; -import org.springframework.util.Assert; -import com.redis.lettucemod.RedisModulesUtils; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; import com.redis.riot.core.Step; import com.redis.riot.core.processor.RegexNamedGroupFunction; import com.redis.riot.function.KeyValueMap; import com.redis.spring.batch.item.redis.RedisItemReader; -import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode; import com.redis.spring.batch.item.redis.common.KeyValue; -import io.lettuce.core.AbstractRedisClient; -import io.lettuce.core.RedisException; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; -public abstract class AbstractExportCommand extends AbstractRedisCommand { +public abstract class AbstractExportCommand extends AbstractRedisCommand { - private static final String NOTIFY_CONFIG = "notify-keyspace-events"; - private static final String NOTIFY_CONFIG_VALUE = "KEA"; private static final String TASK_NAME = "Exporting"; private static final String STEP_NAME = "step"; @@ -37,6 +27,12 @@ public abstract class AbstractExportCommand ext @Option(names = "--key-regex", description = "Regex for key-field extraction, e.g. '\\w+:(?.+)' extracts an id field from the key", paramLabel = "") private Pattern keyRegex; + @Override + protected void configure(RedisItemReader reader) { + super.configure(reader); + redisReaderArgs.configure(reader); + } + protected ItemProcessor, Map> mapProcessor() { KeyValueMap mapFunction = new KeyValueMap(); if (keyRegex != null) { @@ -45,56 +41,15 @@ protected ItemProcessor, Map> mapProces return new FunctionItemProcessor<>(mapFunction); } - protected Step, T> step(C context, ItemWriter writer) { + protected Step, T> step(ItemWriter writer) { RedisItemReader reader = RedisItemReader.struct(); - configure(context, reader); - Step, T> step = new Step<>(STEP_NAME, reader, writer); + configure(reader); + RedisExportStep step = new RedisExportStep<>(STEP_NAME, reader, writer); step.taskName(TASK_NAME); - configure(step); + step.afterPropertiesSet(); return step; } - protected void configure(C context, RedisItemReader reader) { - context.configure(reader); - log.info("Configuring Redis reader with {}", redisReaderArgs); - redisReaderArgs.configure(reader); - } - - public static void configure(Step step) { - Assert.isInstanceOf(RedisItemReader.class, step.getReader(), - "Step reader must be an instance of RedisItemReader"); - RedisItemReader reader = (RedisItemReader) step.getReader(); - if (reader.getMode() != ReaderMode.LIVEONLY) { - step.maxItemCountSupplier(reader.scanSizeEstimator()); - } - if (reader.getMode() != ReaderMode.SCAN) { - checkNotifyConfig(reader.getClient()); - step.live(true); - step.flushInterval(reader.getFlushInterval()); - step.idleTimeout(reader.getIdleTimeout()); - } - } - - private static void checkNotifyConfig(AbstractRedisClient client) { - Map valueMap; - try (StatefulRedisModulesConnection conn = RedisModulesUtils.connection(client)) { - try { - valueMap = conn.sync().configGet(NOTIFY_CONFIG); - } catch (RedisException e) { - return; - } - } - String actual = valueMap.getOrDefault(NOTIFY_CONFIG, ""); - Set expected = characterSet(NOTIFY_CONFIG_VALUE); - Assert.isTrue(characterSet(actual).containsAll(expected), - String.format("Keyspace notifications not property configured: expected '%s' but was '%s'.", - NOTIFY_CONFIG_VALUE, actual)); - } - - private static Set characterSet(String string) { - return string.codePoints().mapToObj(c -> (char) c).collect(Collectors.toSet()); - } - public RedisReaderArgs getRedisReaderArgs() { return redisReaderArgs; } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java index 0df6f30e8..f9ec4a168 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractImportCommand.java @@ -8,6 +8,7 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.function.FunctionItemProcessor; import org.springframework.expression.EvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.util.Assert; @@ -15,7 +16,9 @@ import com.redis.riot.core.Expression; import com.redis.riot.core.QuietMapAccessor; +import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; +import com.redis.riot.core.processor.PredicateOperator; import com.redis.riot.operation.DelCommand; import com.redis.riot.operation.ExpireCommand; import com.redis.riot.operation.GeoaddCommand; @@ -41,7 +44,7 @@ LpushCommand.class, RpushCommand.class, SaddCommand.class, SetCommand.class, XaddCommand.class, ZaddCommand.class, SugaddCommand.class, JsonSetCommand.class, TsAddCommand.class }, subcommandsRepeatable = true, synopsisSubcommandLabel = "[REDIS COMMAND...]", commandListHeading = "Redis commands:%n") -public abstract class AbstractImportCommand extends AbstractRedisCommand { +public abstract class AbstractImportCommand extends AbstractRedisCommand { private static final String TASK_NAME = "Importing"; private static final String STEP_NAME = "step"; @@ -69,29 +72,43 @@ protected boolean hasOperations() { return !CollectionUtils.isEmpty(importOperationCommands); } - protected Step, Map> step(C context, ItemReader> reader) { + protected Step, Map> step(ItemReader> reader) { Assert.isTrue(hasOperations(), "No Redis command specified"); RedisItemWriter> writer = operationWriter(); - configure(context, writer); + configure(writer); Step, Map> step = new Step<>(STEP_NAME, reader, writer); - step.processor(processor(context)); + step.processor(processor()); step.taskName(TASK_NAME); return step; } - protected ItemProcessor, Map> processor(C context) { + protected ItemProcessor, Map> processor() { + log.info("Creating SpEL evaluation context with {}", evaluationContextArgs); StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext(); - evaluationContext.setVariable(VAR_REDIS, context.getRedisContext().getConnection().sync()); + evaluationContext.setVariable(VAR_REDIS, redisContext.getConnection().sync()); evaluationContext.addPropertyAccessor(new QuietMapAccessor()); - return processorArgs.processor(evaluationContext); + return processor(evaluationContext, processorArgs); + } + + public static ItemProcessor, Map> processor(EvaluationContext evaluationContext, + ImportProcessorArgs args) { + List, Map>> processors = new ArrayList<>(); + if (args.getFilter() != null) { + processors.add(new FunctionItemProcessor<>( + new PredicateOperator<>(args.getFilter().predicate(evaluationContext)))); + } + if (!CollectionUtils.isEmpty(args.getExpressions())) { + processors.add(new ExpressionProcessor(evaluationContext, args.getExpressions())); + } + return RiotUtils.processor(processors); } protected RedisItemWriter> operationWriter() { return RedisItemWriter.operation(new MultiOperation<>(operations())); } - protected void configure(C context, RedisItemWriter writer) { - context.configure(writer); + protected void configure(RedisItemWriter writer) { + super.configure(writer); log.info("Configuring Redis writer with {}", redisWriterArgs); redisWriterArgs.configure(writer); } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java index e9ff85cb5..0e7ca121c 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java @@ -1,52 +1,43 @@ package com.redis.riot; import com.redis.riot.core.AbstractJobCommand; +import com.redis.spring.batch.item.redis.RedisItemReader; +import com.redis.spring.batch.item.redis.RedisItemWriter; -import io.lettuce.core.RedisURI; import picocli.CommandLine.ArgGroup; -public abstract class AbstractRedisCommand extends AbstractJobCommand { +public abstract class AbstractRedisCommand extends AbstractJobCommand { @ArgGroup(exclusive = false) - private RedisURIArgs redisURIArgs = new RedisURIArgs(); + private RedisArgs redisArgs = new RedisArgs(); - @ArgGroup(exclusive = false) - private RedisClientArgs redisClientArgs = new RedisClientArgs(); + protected RedisContext redisContext; @Override - protected C executionContext() { - C context = super.executionContext(); - context.setRedisContext(redisContext()); - return context; - } - - private RedisContext redisContext() { - RedisURI redisURI = redisURIArgs.redisURI(); - log.info("Creating Redis context with URI {} and {}", redisURI, redisClientArgs); - RedisContext context = new RedisContext(); - context.setAutoReconnect(redisClientArgs.isAutoReconnect()); - context.setCluster(redisClientArgs.isCluster()); - context.setPoolSize(redisClientArgs.getPoolSize()); - context.setProtocolVersion(redisClientArgs.getProtocolVersion()); - context.setSslOptions(redisClientArgs.getSslArgs().sslOptions()); - context.setUri(redisURI); - return context; + protected void execute() throws Exception { + redisContext = redisArgs.redisContext(); + try { + super.execute(); + } finally { + redisContext.close(); + } } - public RedisURIArgs getRedisURIArgs() { - return redisURIArgs; + protected void configure(RedisItemReader reader) { + configureAsyncReader(reader); + redisContext.configure(reader); } - public void setRedisURIArgs(RedisURIArgs argfs) { - this.redisURIArgs = argfs; + protected void configure(RedisItemWriter writer) { + redisContext.configure(writer); } - public RedisClientArgs getRedisClientArgs() { - return redisClientArgs; + public RedisArgs getRedisArgs() { + return redisArgs; } - public void setRedisClientArgs(RedisClientArgs clientArgs) { - this.redisClientArgs = clientArgs; + public void setRedisArgs(RedisArgs clientArgs) { + this.redisArgs = clientArgs; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/AbstractTargetCommand.java b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisToRedisCommand.java similarity index 58% rename from plugins/riot/src/main/java/com/redis/riot/AbstractTargetCommand.java rename to plugins/riot/src/main/java/com/redis/riot/AbstractRedisToRedisCommand.java index 0bcab995d..8a76222f9 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AbstractTargetCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/AbstractRedisToRedisCommand.java @@ -1,14 +1,16 @@ package com.redis.riot; +import com.redis.lettucemod.RedisURIBuilder; import com.redis.riot.core.AbstractJobCommand; import com.redis.spring.batch.item.redis.RedisItemReader; import io.lettuce.core.RedisURI; +import io.lettuce.core.SslVerifyMode; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; -public abstract class AbstractTargetCommand extends AbstractJobCommand { +public abstract class AbstractRedisToRedisCommand extends AbstractJobCommand { public static final int DEFAULT_TARGET_POOL_SIZE = RedisItemReader.DEFAULT_POOL_SIZE; @@ -39,58 +41,67 @@ public abstract class AbstractTargetCommand extends AbstractJobCommand reader) { + protected void configureSourceReader(RedisItemReader reader) { + configureAsyncReader(reader); + sourceRedisContext.configure(reader); + log.info("Configuring source Redis reader with {}", sourceRedisReaderArgs); sourceRedisReaderArgs.configure(reader); - context.configureSourceReader(reader); } - protected void configureTargetReader(TargetRedisExecutionContext context, RedisItemReader reader) { + protected void configureTargetReader(RedisItemReader reader) { + configureAsyncReader(reader); + targetRedisContext.configure(reader); if (targetReadFrom != null) { + log.info("Configuring target Redis reader with read-from {}", targetReadFrom); reader.setReadFrom(targetReadFrom.getReadFrom()); } - context.configureTargetReader(reader); } public SourceRedisURIArgs getSourceRedisURIArgs() { diff --git a/plugins/riot/src/main/java/com/redis/riot/AwsCredentialsArgs.java b/plugins/riot/src/main/java/com/redis/riot/AwsCredentialsArgs.java index 7f3eb89d5..004719e86 100644 --- a/plugins/riot/src/main/java/com/redis/riot/AwsCredentialsArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/AwsCredentialsArgs.java @@ -1,5 +1,7 @@ package com.redis.riot; +import com.redis.riot.core.RiotUtils; + import picocli.CommandLine.Option; public class AwsCredentialsArgs { @@ -28,7 +30,7 @@ public void setSecretKey(String secretKey) { @Override public String toString() { - return "AwsCredentialsArgs [accessKey=" + accessKey + ", secretKey=" + secretKey + "]"; + return "AwsCredentialsArgs [accessKey=" + accessKey + ", secretKey=" + RiotUtils.mask(secretKey) + "]"; } } \ No newline at end of file diff --git a/plugins/riot/src/main/java/com/redis/riot/Compare.java b/plugins/riot/src/main/java/com/redis/riot/Compare.java index dcfa3a7e6..9d92eae45 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Compare.java +++ b/plugins/riot/src/main/java/com/redis/riot/Compare.java @@ -25,8 +25,8 @@ protected boolean isIgnoreStreamMessageId() { } @Override - protected Job job(TargetRedisExecutionContext context) { - return job(context, compareStep(context)); + protected Job job() { + return job(compareStep()); } public boolean isCompareStreamMessageId() { diff --git a/plugins/riot/src/main/java/com/redis/riot/DataSourceArgs.java b/plugins/riot/src/main/java/com/redis/riot/DataSourceArgs.java index 29d2030d3..50c620a49 100644 --- a/plugins/riot/src/main/java/com/redis/riot/DataSourceArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/DataSourceArgs.java @@ -1,5 +1,11 @@ package com.redis.riot; +import javax.sql.DataSource; + +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; + +import com.redis.riot.core.RiotUtils; + import picocli.CommandLine.Option; public class DataSourceArgs { @@ -50,8 +56,17 @@ public void setPassword(String password) { @Override public String toString() { - return "DataSourceArgs [driver=" + driver + ", url=" + url + ", username=" + username + ", password=" + password - + "]"; + return "DataSourceArgs [driver=" + driver + ", url=" + url + ", username=" + username + ", password=" + + RiotUtils.mask(password) + "]"; + } + + public DataSource dataSource() { + DataSourceProperties properties = new DataSourceProperties(); + properties.setUrl(url); + properties.setDriverClassName(driver); + properties.setUsername(username); + properties.setPassword(password); + return properties.initializeDataSourceBuilder().build(); } } diff --git a/plugins/riot/src/main/java/com/redis/riot/DatabaseExport.java b/plugins/riot/src/main/java/com/redis/riot/DatabaseExport.java index e7e0a8d60..3282df249 100644 --- a/plugins/riot/src/main/java/com/redis/riot/DatabaseExport.java +++ b/plugins/riot/src/main/java/com/redis/riot/DatabaseExport.java @@ -7,6 +7,8 @@ import org.springframework.batch.core.Job; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import picocli.CommandLine.ArgGroup; @@ -15,7 +17,7 @@ import picocli.CommandLine.Parameters; @Command(name = "db-export", description = "Export Redis data to a relational database.") -public class DatabaseExport extends AbstractExportCommand { +public class DatabaseExport extends AbstractExportCommand { public static final boolean DEFAULT_ASSERT_UPDATES = true; @@ -29,19 +31,14 @@ public class DatabaseExport extends AbstractExportCommand private boolean assertUpdates = DEFAULT_ASSERT_UPDATES; @Override - protected RedisExecutionContext newExecutionContext() { - return new RedisExecutionContext(); - } - - @Override - protected Job job(RedisExecutionContext context) { - return job(context, step(context, writer()).processor(mapProcessor())); + protected Job job() { + return job(step(writer()).processor(mapProcessor())); } private JdbcBatchItemWriter> writer() { Assert.hasLength(sql, "No SQL statement specified"); log.info("Creating data source with {}", dataSourceArgs); - DataSource dataSource = DatabaseHelper.dataSource(dataSourceArgs); + DataSource dataSource = dataSourceArgs.dataSource(); log.info("Creating JDBC writer with sql=\"{}\" assertUpdates={}", sql, assertUpdates); JdbcBatchItemWriterBuilder> builder = new JdbcBatchItemWriterBuilder<>(); builder.itemSqlParameterSourceProvider(NullableSqlParameterSource::new); @@ -53,6 +50,23 @@ private JdbcBatchItemWriter> writer() { return writer; } + private static class NullableSqlParameterSource extends MapSqlParameterSource { + + public NullableSqlParameterSource(@Nullable Map values) { + super(values); + } + + @Override + @Nullable + public Object getValue(String paramName) { + if (!hasValue(paramName)) { + return null; + } + return super.getValue(paramName); + } + + } + public String getSql() { return sql; } diff --git a/plugins/riot/src/main/java/com/redis/riot/DatabaseHelper.java b/plugins/riot/src/main/java/com/redis/riot/DatabaseHelper.java deleted file mode 100644 index dec574873..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/DatabaseHelper.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.redis.riot; - -import javax.sql.DataSource; - -import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; - -public abstract class DatabaseHelper { - - private DatabaseHelper() { - } - - public static DataSource dataSource(DataSourceArgs args) { - DataSourceProperties properties = new DataSourceProperties(); - properties.setUrl(args.getUrl()); - properties.setDriverClassName(args.getDriver()); - properties.setUsername(args.getUsername()); - properties.setPassword(args.getPassword()); - return properties.initializeDataSourceBuilder().build(); - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java b/plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java index 6f1e0eb02..22ad62166 100644 --- a/plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java +++ b/plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java @@ -15,7 +15,7 @@ import picocli.CommandLine.Parameters; @Command(name = "db-import", description = "Import from a relational database.") -public class DatabaseImport extends AbstractImportCommand { +public class DatabaseImport extends AbstractImportCommand { @ArgGroup(exclusive = false) private DataSourceArgs dataSourceArgs = new DataSourceArgs(); @@ -27,19 +27,14 @@ public class DatabaseImport extends AbstractImportCommand private DatabaseReaderArgs readerArgs = new DatabaseReaderArgs(); @Override - protected RedisExecutionContext newExecutionContext() { - return new RedisExecutionContext(); - } - - @Override - protected Job job(RedisExecutionContext context) { - return job(context, step(context, reader())); + protected Job job() { + return job(step(reader())); } private JdbcCursorItemReader> reader() { Assert.hasLength(sql, "No SQL statement specified"); log.info("Creating data source with {}", dataSourceArgs); - DataSource dataSource = DatabaseHelper.dataSource(dataSourceArgs); + DataSource dataSource = dataSourceArgs.dataSource(); log.info("Creating JDBC reader with sql=\"{}\" {}", sql, readerArgs); JdbcCursorItemReaderBuilder> reader = new JdbcCursorItemReaderBuilder<>(); reader.dataSource(dataSource); diff --git a/plugins/riot/src/main/java/com/redis/riot/EvaluationContextArgs.java b/plugins/riot/src/main/java/com/redis/riot/EvaluationContextArgs.java index 56e20f1b6..a7e837b92 100644 --- a/plugins/riot/src/main/java/com/redis/riot/EvaluationContextArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/EvaluationContextArgs.java @@ -72,6 +72,14 @@ public void setVars(Map variables) { this.vars = variables; } + public String getNumberFormat() { + return numberFormat; + } + + public void setNumberFormat(String numberFormat) { + this.numberFormat = numberFormat; + } + @Override public String toString() { return "EvaluationContextArgs [varExpressions=" + varExpressions + ", dateFormat=" + dateFormat diff --git a/plugins/riot/src/main/java/com/redis/riot/FakerImport.java b/plugins/riot/src/main/java/com/redis/riot/FakerImport.java index d212a25ce..7324ebfaf 100644 --- a/plugins/riot/src/main/java/com/redis/riot/FakerImport.java +++ b/plugins/riot/src/main/java/com/redis/riot/FakerImport.java @@ -13,7 +13,7 @@ import picocli.CommandLine.Parameters; @Command(name = "faker", description = "Import Faker data.") -public class FakerImport extends AbstractImportCommand { +public class FakerImport extends AbstractImportCommand { public static final int DEFAULT_COUNT = 1000; public static final Locale DEFAULT_LOCALE = Locale.ENGLISH; @@ -31,13 +31,8 @@ public class FakerImport extends AbstractImportCommand { private Locale locale = DEFAULT_LOCALE; @Override - protected RedisExecutionContext newExecutionContext() { - return new RedisExecutionContext(); - } - - @Override - protected Job job(RedisExecutionContext context) { - return job(context, step(context, reader()).maxItemCount(count)); + protected Job job() { + return job(step(reader()).maxItemCount(count)); } private FakerItemReader reader() { diff --git a/plugins/riot/src/main/java/com/redis/riot/FileExport.java b/plugins/riot/src/main/java/com/redis/riot/FileExport.java index 9ba32a9da..6840c89a0 100644 --- a/plugins/riot/src/main/java/com/redis/riot/FileExport.java +++ b/plugins/riot/src/main/java/com/redis/riot/FileExport.java @@ -2,19 +2,36 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import org.springframework.batch.core.Job; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.transform.LineAggregator; +import org.springframework.batch.item.file.transform.PassThroughFieldExtractor; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.core.io.WritableResource; +import org.springframework.util.CollectionUtils; -import com.redis.riot.core.RiotException; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.redis.riot.file.FileType; import com.redis.riot.file.FileUtils; +import com.redis.riot.file.JsonLineAggregator; +import com.redis.riot.file.xml.XmlResourceItemWriter; +import com.redis.riot.file.xml.XmlResourceItemWriterBuilder; +import com.redis.riot.resource.FlatFileItemWriter; +import com.redis.riot.resource.FlatFileItemWriterBuilder; +import com.redis.riot.resource.FlatFileItemWriterBuilder.DelimitedBuilder; +import com.redis.riot.resource.FlatFileItemWriterBuilder.FormattedBuilder; +import com.redis.riot.resource.JsonFileItemWriter; +import com.redis.riot.resource.JsonFileItemWriterBuilder; import com.redis.spring.batch.item.redis.RedisItemReader; import com.redis.spring.batch.item.redis.common.KeyValue; @@ -24,7 +41,7 @@ import picocli.CommandLine.Parameters; @Command(name = "file-export", description = "Export Redis data to files.") -public class FileExport extends AbstractExportCommand { +public class FileExport extends AbstractExportCommand { public static final FileType DEFAULT_FILE_TYPE = FileType.JSONL; @@ -40,33 +57,13 @@ public class FileExport extends AbstractExportCommand headerRecord(context, fileType)); - return job(context, step(context, writer).processor(processor(fileType))); + ItemWriter writer = create(resource, fileType, () -> headerRecord(fileType)); + return job(step(writer).processor(processor(fileType))); } private FileType fileType(WritableResource resource) { @@ -102,9 +99,9 @@ private ContentType contentType(FileType fileType) { } @SuppressWarnings("unchecked") - private Map headerRecord(FileExportExecutionContext context, FileType fileType) { + private Map headerRecord(FileType fileType) { RedisItemReader reader = RedisItemReader.struct(); - configure(context, reader); + configure(reader); try { reader.open(new ExecutionContext()); try { @@ -122,6 +119,117 @@ private Map headerRecord(FileExportExecutionContext context, Fil } } + @SuppressWarnings("unchecked") + public ItemWriter create(WritableResource resource, FileType fileType, + Supplier> headerSupplier) { + switch (fileType) { + case CSV: + return (ItemWriter) delimitedWriter(resource, headerSupplier); + case FIXED: + return (ItemWriter) fixedLengthWriter(resource, headerSupplier); + case JSON: + return jsonWriter(resource); + case JSONL: + return jsonlWriter(resource); + case XML: + return xmlWriter(resource); + default: + throw new UnsupportedOperationException("Unsupported file type: " + fileType); + } + } + + private FlatFileItemWriter jsonlWriter(WritableResource resource) { + FlatFileItemWriterBuilder builder = flatFileWriter(resource); + builder.lineAggregator(new JsonLineAggregator<>(objectMapper(new ObjectMapper()))); + return builder.build(); + } + + private T objectMapper(T objectMapper) { + objectMapper.setSerializationInclusion(Include.NON_NULL); + objectMapper.setSerializationInclusion(Include.NON_DEFAULT); + return objectMapper; + } + + private JsonFileItemWriter jsonWriter(WritableResource resource) { + JsonFileItemWriterBuilder writer = new JsonFileItemWriterBuilder<>(); + writer.name(resource.getFilename()); + writer.append(fileWriterArgs.isAppend()); + writer.encoding(fileWriterArgs.getFileArgs().getEncoding()); + writer.lineSeparator(fileWriterArgs.getLineSeparator()); + writer.resource(resource); + writer.saveState(false); + ObjectMapper mapper = objectMapper(new ObjectMapper()); + writer.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper)); + return writer.build(); + } + + private XmlResourceItemWriter xmlWriter(WritableResource resource) { + XmlResourceItemWriterBuilder writer = new XmlResourceItemWriterBuilder<>(); + writer.name(resource.getFilename()); + writer.append(fileWriterArgs.isAppend()); + writer.encoding(fileWriterArgs.getFileArgs().getEncoding()); + writer.lineSeparator(fileWriterArgs.getLineSeparator()); + writer.rootName(fileWriterArgs.getRootName()); + writer.resource(resource); + writer.saveState(false); + XmlMapper mapper = objectMapper(new XmlMapper()); + mapper.setConfig(mapper.getSerializationConfig().withRootName(fileWriterArgs.getElementName())); + writer.xmlObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper)); + return writer.build(); + } + + private ItemWriter> delimitedWriter(WritableResource resource, + Supplier> headerSupplier) { + FlatFileItemWriterBuilder> writer = flatFileWriter(resource); + DelimitedBuilder> delimitedBuilder = writer.delimited(); + delimitedBuilder.delimiter(fileWriterArgs.getFileArgs().getDelimiter()); + delimitedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); + delimitedBuilder.quoteCharacter(String.valueOf(fileWriterArgs.getFileArgs().getQuoteCharacter())); + return writer(writer, delimitedBuilder.build(), headerSupplier); + } + + private FlatFileItemWriter> writer(FlatFileItemWriterBuilder> writer, + LineAggregator> lineAggregator, Supplier> headerSupplier) { + writer.lineAggregator(lineAggregator); + if (fileWriterArgs.getFileArgs().isHeader()) { + Map headerRecord = headerSupplier.get(); + if (CollectionUtils.isEmpty(headerRecord)) { + log.warn("Could not determine header"); + } else { + Map headerFieldMap = new HashMap<>(); + headerRecord.forEach((k, v) -> headerFieldMap.put(k, k)); + String headerLine = lineAggregator.aggregate(headerFieldMap); + log.info("Found header: {}", headerLine); + writer.headerCallback(w -> w.write(headerLine)); + } + } + return writer.build(); + } + + private ItemWriter> fixedLengthWriter(WritableResource resource, + Supplier> headerSupplier) { + FlatFileItemWriterBuilder> writer = flatFileWriter(resource); + FormattedBuilder> formattedBuilder = writer.formatted(); + formattedBuilder.format(fileWriterArgs.getFormatterString()); + formattedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); + return writer(writer, formattedBuilder.build(), headerSupplier); + } + + private FlatFileItemWriterBuilder flatFileWriter(WritableResource resource) { + FlatFileItemWriterBuilder builder = new FlatFileItemWriterBuilder<>(); + builder.name(resource.getFilename()); + builder.resource(resource); + builder.append(fileWriterArgs.isAppend()); + builder.encoding(fileWriterArgs.getFileArgs().getEncoding()); + builder.forceSync(fileWriterArgs.isForceSync()); + builder.lineSeparator(fileWriterArgs.getLineSeparator()); + builder.saveState(false); + builder.shouldDeleteIfEmpty(fileWriterArgs.isShouldDeleteIfEmpty()); + builder.shouldDeleteIfExists(fileWriterArgs.isShouldDeleteIfExists()); + builder.transactional(fileWriterArgs.isTransactional()); + return builder; + } + public String getFile() { return file; } diff --git a/plugins/riot/src/main/java/com/redis/riot/FileExportExecutionContext.java b/plugins/riot/src/main/java/com/redis/riot/FileExportExecutionContext.java deleted file mode 100644 index 1aeaded61..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/FileExportExecutionContext.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.redis.riot; - -public class FileExportExecutionContext extends RedisExecutionContext { - - private FileWriterFactory fileWriterFactory; - - public FileWriterFactory getFileWriterFactory() { - return fileWriterFactory; - } - - public void setFileWriterFactory(FileWriterFactory factory) { - this.fileWriterFactory = factory; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/FileImport.java b/plugins/riot/src/main/java/com/redis/riot/FileImport.java index 212d840ed..d6c97fce3 100644 --- a/plugins/riot/src/main/java/com/redis/riot/FileImport.java +++ b/plugins/riot/src/main/java/com/redis/riot/FileImport.java @@ -1,6 +1,5 @@ package com.redis.riot; -import java.io.FileNotFoundException; import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; @@ -14,19 +13,42 @@ import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.JsonLineMapper; +import org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy; +import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; +import org.springframework.batch.item.file.transform.AbstractLineTokenizer; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.batch.item.file.transform.FixedLengthTokenizer; +import org.springframework.batch.item.file.transform.Range; +import org.springframework.batch.item.file.transform.RangeArrayPropertyEditor; import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.batch.item.json.JacksonJsonObjectReader; +import org.springframework.batch.item.json.JsonItemReader; +import org.springframework.batch.item.json.builder.JsonItemReaderBuilder; import org.springframework.core.io.Resource; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; -import com.redis.riot.core.RiotException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; import com.redis.riot.core.processor.RegexNamedGroupFunction; import com.redis.riot.file.FileType; import com.redis.riot.file.FileUtils; +import com.redis.riot.file.HeaderCallbackHandler; +import com.redis.riot.file.MapFieldSetMapper; import com.redis.riot.file.MapToFieldFunction; +import com.redis.riot.file.ObjectMapperLineMapper; import com.redis.riot.file.ToMapFunction; +import com.redis.riot.file.xml.XmlItemReader; +import com.redis.riot.file.xml.XmlItemReaderBuilder; +import com.redis.riot.file.xml.XmlObjectReader; import com.redis.spring.batch.item.redis.RedisItemWriter; import com.redis.spring.batch.item.redis.common.KeyValue; @@ -36,7 +58,7 @@ import picocli.CommandLine.Parameters; @Command(name = "file-import", description = "Import data from files.") -public class FileImport extends AbstractImportCommand { +public class FileImport extends AbstractImportCommand { @Parameters(arity = "1..*", description = "Files or URLs to import. Use '-' to read from stdin.", paramLabel = "FILE") private List files; @@ -51,38 +73,17 @@ public class FileImport extends AbstractImportCommand regexes = new LinkedHashMap<>(); @Override - protected FileImportExecutionContext newExecutionContext() { - return new FileImportExecutionContext(); - } - - @Override - protected FileImportExecutionContext executionContext() { - FileImportExecutionContext context = super.executionContext(); - FileReaderFactory factory = new FileReaderFactory(); - factory.addDeserializer(KeyValue.class, new KeyValueDeserializer()); - factory.setArgs(fileReaderArgs); - context.setFileReaderFactory(factory); - return context; - } - - @Override - protected Job job(FileImportExecutionContext context) { + protected Job job() throws IOException { Assert.notEmpty(files, "No file specified"); List> steps = new ArrayList<>(); List resources = new ArrayList<>(); for (String file : files) { - try { - for (String expandedFile : FileUtils.expand(file)) { - resources.add(fileReaderArgs.resource(expandedFile)); - } - } catch (FileNotFoundException e) { - throw new RiotException("File not found: " + file); - } catch (IOException e) { - throw new RiotException("Could not read file " + file, e); + for (String expandedFile : FileUtils.expand(file)) { + resources.add(fileReaderArgs.resource(expandedFile)); } } for (Resource resource : resources) { - Step step = step(context, resource); + Step step = step(resource); step.skip(ParseException.class); step.skip(org.springframework.batch.item.ParseException.class); step.noRetry(ParseException.class); @@ -90,31 +91,30 @@ protected Job job(FileImportExecutionContext context) { step.taskName(taskName(resource)); steps.add(step); } - return job(context, steps); + return job(steps); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private Step step(FileImportExecutionContext context, Resource resource) { + private Step step(Resource resource) { String name = resource.getFilename(); FileType type = fileType(resource); if (hasOperations()) { - ItemReader> reader = (ItemReader) context.getFileReaderFactory().createReader(resource, - type, Map.class); + ItemReader> reader = (ItemReader) createReader(resource, type, Map.class); RedisItemWriter> writer = operationWriter(); - configure(context, writer); - return new Step<>(name, reader, writer).processor(processor(context)); + configure(writer); + return new Step<>(name, reader, writer).processor(processor()); } Assert.isTrue(type != FileType.CSV, "CSV file import requires a Redis command"); Assert.isTrue(type != FileType.FIXED, "Fixed-length file import requires a Redis command"); - ItemReader reader = context.getFileReaderFactory().createReader(resource, type, KeyValue.class); + ItemReader reader = createReader(resource, type, KeyValue.class); RedisItemWriter> writer = RedisItemWriter.struct(); - configure(context, writer); + configure(writer); return new Step<>(name, reader, writer); } @Override - protected ItemProcessor, Map> processor(FileImportExecutionContext context) { - return RiotUtils.processor(super.processor(context), regexProcessor()); + protected ItemProcessor, Map> processor() { + return RiotUtils.processor(super.processor(), regexProcessor()); } private FileType fileType(Resource resource) { @@ -145,6 +145,159 @@ private Function, Map> toFieldFunction(Strin return new MapToFieldFunction(key).andThen((Function) new RegexNamedGroupFunction(regex)); } + public static final String PIPE_DELIMITER = "|"; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public ItemReader createReader(Resource resource, FileType fileType, Class itemType) { + switch (fileType) { + case XML: + return xmlReader(resource, itemType); + case JSON: + return jsonReader(resource, itemType); + case JSONL: + return jsonlReader(resource, itemType); + case CSV: + return (FlatFileItemReader) flatFileReader(resource, delimitedLineTokenizer(delimiter(resource))); + case FIXED: + return (FlatFileItemReader) flatFileReader(resource, fixedLengthTokenizer()); + default: + throw new UnsupportedOperationException("Unsupported file type: " + fileType); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private FlatFileItemReader jsonlReader(Resource resource, Class itemType) { + if (Map.class.isAssignableFrom(itemType)) { + FlatFileItemReaderBuilder> reader = flatFileReader(resource); + return (FlatFileItemReader) reader.fieldSetMapper(new MapFieldSetMapper()).lineMapper(new JsonLineMapper()) + .build(); + } + FlatFileItemReaderBuilder reader = flatFileReader(resource); + reader.lineMapper(new ObjectMapperLineMapper<>(objectMapper(new ObjectMapper()), itemType)); + return reader.build(); + } + + private DelimitedLineTokenizer delimitedLineTokenizer(String delimiter) { + DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); + tokenizer.setDelimiter(delimiter); + tokenizer.setQuoteCharacter(fileReaderArgs.getFileArgs().getQuoteCharacter()); + if (!ObjectUtils.isEmpty(fileReaderArgs.getIncludedFields())) { + tokenizer.setIncludedFields( + fileReaderArgs.getIncludedFields().stream().mapToInt(Integer::intValue).toArray()); + } + return tokenizer; + } + + private FlatFileItemReader> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) { + if (ObjectUtils.isEmpty(fileReaderArgs.getFields())) { + Assert.isTrue(fileReaderArgs.getFileArgs().isHeader(), + String.format("Could not create reader for file '%s': no header or field names specified", + resource.getFilename())); + } else { + tokenizer.setNames(fileReaderArgs.getFields().toArray(new String[0])); + } + FlatFileItemReaderBuilder> builder = flatFileReader(resource); + builder.fieldSetMapper(new MapFieldSetMapper()); + builder.lineTokenizer(tokenizer); + builder.skippedLinesCallback(new HeaderCallbackHandler(tokenizer, headerIndex())); + return builder.build(); + } + + private FlatFileItemReaderBuilder flatFileReader(Resource resource) { + FlatFileItemReaderBuilder builder = new FlatFileItemReaderBuilder<>(); + builder.resource(resource); + builder.maxItemCount(fileReaderArgs.getMaxItemCount()); + if (fileReaderArgs.getFileArgs().getEncoding() != null) { + builder.encoding(fileReaderArgs.getFileArgs().getEncoding()); + } + builder.recordSeparatorPolicy(recordSeparatorPolicy()); + builder.linesToSkip(linesToSkip()); + builder.strict(true); + builder.saveState(false); + return builder; + } + + private FixedLengthTokenizer fixedLengthTokenizer() { + FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); + RangeArrayPropertyEditor editor = new RangeArrayPropertyEditor(); + Assert.notEmpty(fileReaderArgs.getColumnRanges(), "Column ranges are required"); + editor.setAsText(String.join(",", fileReaderArgs.getColumnRanges())); + Range[] ranges = (Range[]) editor.getValue(); + Assert.notEmpty(ranges, "Invalid ranges specified: " + fileReaderArgs.getColumnRanges()); + tokenizer.setColumns(ranges); + return tokenizer; + } + + private T objectMapper(T objectMapper) { + objectMapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true); + SimpleModule module = new SimpleModule(); + module.addDeserializer(KeyValue.class, new KeyValueDeserializer()); + objectMapper.registerModule(module); + return objectMapper; + } + + private XmlItemReader xmlReader(Resource resource, Class itemType) { + XmlItemReaderBuilder builder = new XmlItemReaderBuilder<>(); + builder.name(resource.getFilename() + "-xml-file-reader"); + builder.resource(resource); + XmlObjectReader objectReader = new XmlObjectReader<>(itemType); + objectReader.setMapper(objectMapper(new XmlMapper())); + builder.xmlObjectReader(objectReader); + builder.maxItemCount(fileReaderArgs.getMaxItemCount()); + return builder.build(); + } + + private JsonItemReader jsonReader(Resource resource, Class itemType) { + JsonItemReaderBuilder builder = new JsonItemReaderBuilder<>(); + builder.name(resource.getFilename() + "-json-file-reader"); + builder.resource(resource); + JacksonJsonObjectReader objectReader = new JacksonJsonObjectReader<>(itemType); + objectReader.setMapper(objectMapper(new ObjectMapper())); + builder.jsonObjectReader(objectReader); + builder.maxItemCount(fileReaderArgs.getMaxItemCount()); + return builder.build(); + } + + private String delimiter(Resource resource) { + if (fileReaderArgs.getFileArgs().getDelimiter() != null) { + return fileReaderArgs.getFileArgs().getDelimiter(); + } + String extension = FileUtils.fileExtension(resource); + if (extension == null) { + return DelimitedLineTokenizer.DELIMITER_COMMA; + } + switch (extension) { + case FileUtils.PSV: + return PIPE_DELIMITER; + case FileUtils.TSV: + return DelimitedLineTokenizer.DELIMITER_TAB; + default: + return DelimitedLineTokenizer.DELIMITER_COMMA; + } + } + + private RecordSeparatorPolicy recordSeparatorPolicy() { + return new DefaultRecordSeparatorPolicy(String.valueOf(fileReaderArgs.getFileArgs().getQuoteCharacter()), + fileReaderArgs.getContinuationString()); + } + + private int headerIndex() { + if (fileReaderArgs.getHeaderLine() != null) { + return fileReaderArgs.getHeaderLine(); + } + return linesToSkip() - 1; + } + + private int linesToSkip() { + if (fileReaderArgs.getLinesToSkip() != null) { + return fileReaderArgs.getLinesToSkip(); + } + if (fileReaderArgs.getFileArgs().isHeader()) { + return 1; + } + return 0; + } + public List getFiles() { return files; } diff --git a/plugins/riot/src/main/java/com/redis/riot/FileImportExecutionContext.java b/plugins/riot/src/main/java/com/redis/riot/FileImportExecutionContext.java deleted file mode 100644 index 182a5e820..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/FileImportExecutionContext.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.redis.riot; - -public class FileImportExecutionContext extends RedisExecutionContext { - - private FileReaderFactory fileReaderFactory; - - public FileReaderFactory getFileReaderFactory() { - return fileReaderFactory; - } - - public void setFileReaderFactory(FileReaderFactory factory) { - this.fileReaderFactory = factory; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/FileReaderFactory.java b/plugins/riot/src/main/java/com/redis/riot/FileReaderFactory.java deleted file mode 100644 index ca8cf5fd8..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/FileReaderFactory.java +++ /dev/null @@ -1,222 +0,0 @@ -package com.redis.riot; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.file.FlatFileItemReader; -import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; -import org.springframework.batch.item.file.mapping.JsonLineMapper; -import org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy; -import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; -import org.springframework.batch.item.file.transform.AbstractLineTokenizer; -import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; -import org.springframework.batch.item.file.transform.FixedLengthTokenizer; -import org.springframework.batch.item.file.transform.Range; -import org.springframework.batch.item.file.transform.RangeArrayPropertyEditor; -import org.springframework.batch.item.json.JacksonJsonObjectReader; -import org.springframework.batch.item.json.JsonItemReader; -import org.springframework.batch.item.json.builder.JsonItemReaderBuilder; -import org.springframework.core.io.Resource; -import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; -import org.springframework.util.ObjectUtils; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.redis.riot.file.FileType; -import com.redis.riot.file.FileUtils; -import com.redis.riot.file.HeaderCallbackHandler; -import com.redis.riot.file.MapFieldSetMapper; -import com.redis.riot.file.ObjectMapperLineMapper; -import com.redis.riot.file.xml.XmlItemReader; -import com.redis.riot.file.xml.XmlItemReaderBuilder; -import com.redis.riot.file.xml.XmlObjectReader; - -public class FileReaderFactory { - - public static final String PIPE_DELIMITER = "|"; - - private FileReaderArgs args = new FileReaderArgs(); - private Map, JsonDeserializer> deserializers = new HashMap<>(); - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public ItemReader createReader(Resource resource, FileType fileType, Class itemType) { - switch (fileType) { - case XML: - return xmlReader(resource, itemType); - case JSON: - return jsonReader(resource, itemType); - case JSONL: - return jsonlReader(resource, itemType); - case CSV: - return (FlatFileItemReader) flatFileReader(resource, delimitedLineTokenizer(delimiter(resource))); - case FIXED: - return (FlatFileItemReader) flatFileReader(resource, fixedLengthTokenizer()); - default: - throw new UnsupportedOperationException("Unsupported file type: " + fileType); - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private FlatFileItemReader jsonlReader(Resource resource, Class itemType) { - if (Map.class.isAssignableFrom(itemType)) { - FlatFileItemReaderBuilder> reader = flatFileReader(resource); - return (FlatFileItemReader) reader.fieldSetMapper(new MapFieldSetMapper()).lineMapper(new JsonLineMapper()) - .build(); - } - FlatFileItemReaderBuilder reader = flatFileReader(resource); - reader.lineMapper(new ObjectMapperLineMapper<>(objectMapper(new ObjectMapper()), itemType)); - return reader.build(); - } - - private DelimitedLineTokenizer delimitedLineTokenizer(String delimiter) { - DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); - tokenizer.setDelimiter(delimiter); - tokenizer.setQuoteCharacter(args.getFileArgs().getQuoteCharacter()); - if (!ObjectUtils.isEmpty(args.getIncludedFields())) { - tokenizer.setIncludedFields(args.getIncludedFields().stream().mapToInt(Integer::intValue).toArray()); - } - return tokenizer; - } - - private FlatFileItemReader> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) { - if (ObjectUtils.isEmpty(args.getFields())) { - Assert.isTrue(args.getFileArgs().isHeader(), - String.format("Could not create reader for file '%s': no header or field names specified", - resource.getFilename())); - } else { - tokenizer.setNames(args.getFields().toArray(new String[0])); - } - FlatFileItemReaderBuilder> builder = flatFileReader(resource); - builder.fieldSetMapper(new MapFieldSetMapper()); - builder.lineTokenizer(tokenizer); - builder.skippedLinesCallback(new HeaderCallbackHandler(tokenizer, headerIndex())); - return builder.build(); - } - - private FlatFileItemReaderBuilder flatFileReader(Resource resource) { - FlatFileItemReaderBuilder builder = new FlatFileItemReaderBuilder<>(); - builder.resource(resource); - builder.maxItemCount(args.getMaxItemCount()); - if (args.getFileArgs().getEncoding() != null) { - builder.encoding(args.getFileArgs().getEncoding()); - } - builder.recordSeparatorPolicy(recordSeparatorPolicy()); - builder.linesToSkip(linesToSkip()); - builder.strict(true); - builder.saveState(false); - return builder; - } - - private FixedLengthTokenizer fixedLengthTokenizer() { - FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); - RangeArrayPropertyEditor editor = new RangeArrayPropertyEditor(); - Assert.notEmpty(args.getColumnRanges(), "Column ranges are required"); - editor.setAsText(String.join(",", args.getColumnRanges())); - Range[] ranges = (Range[]) editor.getValue(); - Assert.notEmpty(ranges, "Invalid ranges specified: " + args.getColumnRanges()); - tokenizer.setColumns(ranges); - return tokenizer; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private T objectMapper(T objectMapper) { - objectMapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true); - if (!CollectionUtils.isEmpty(deserializers)) { - SimpleModule module = new SimpleModule(); - for (Entry, JsonDeserializer> entry : deserializers.entrySet()) { - module.addDeserializer((Class) entry.getKey(), entry.getValue()); - } - objectMapper.registerModule(module); - } - return objectMapper; - } - - private XmlItemReader xmlReader(Resource resource, Class itemType) { - XmlItemReaderBuilder builder = new XmlItemReaderBuilder<>(); - builder.name(resource.getFilename() + "-xml-file-reader"); - builder.resource(resource); - XmlObjectReader objectReader = new XmlObjectReader<>(itemType); - objectReader.setMapper(objectMapper(new XmlMapper())); - builder.xmlObjectReader(objectReader); - builder.maxItemCount(args.getMaxItemCount()); - return builder.build(); - } - - private JsonItemReader jsonReader(Resource resource, Class itemType) { - JsonItemReaderBuilder builder = new JsonItemReaderBuilder<>(); - builder.name(resource.getFilename() + "-json-file-reader"); - builder.resource(resource); - JacksonJsonObjectReader objectReader = new JacksonJsonObjectReader<>(itemType); - objectReader.setMapper(objectMapper(new ObjectMapper())); - builder.jsonObjectReader(objectReader); - builder.maxItemCount(args.getMaxItemCount()); - return builder.build(); - } - - private String delimiter(Resource resource) { - if (args.getFileArgs().getDelimiter() != null) { - return args.getFileArgs().getDelimiter(); - } - String extension = FileUtils.fileExtension(resource); - if (extension == null) { - return DelimitedLineTokenizer.DELIMITER_COMMA; - } - switch (extension) { - case FileUtils.PSV: - return PIPE_DELIMITER; - case FileUtils.TSV: - return DelimitedLineTokenizer.DELIMITER_TAB; - default: - return DelimitedLineTokenizer.DELIMITER_COMMA; - } - } - - private RecordSeparatorPolicy recordSeparatorPolicy() { - return new DefaultRecordSeparatorPolicy(String.valueOf(args.getFileArgs().getQuoteCharacter()), - args.getContinuationString()); - } - - private int headerIndex() { - if (args.getHeaderLine() != null) { - return args.getHeaderLine(); - } - return linesToSkip() - 1; - } - - private int linesToSkip() { - if (args.getLinesToSkip() != null) { - return args.getLinesToSkip(); - } - if (args.getFileArgs().isHeader()) { - return 1; - } - return 0; - } - - public Map, JsonDeserializer> getDeserializers() { - return deserializers; - } - - public void setDeserializers(Map, JsonDeserializer> deserializers) { - this.deserializers = deserializers; - } - - public void addDeserializer(Class type, JsonDeserializer deserializer) { - deserializers.put(type, deserializer); - } - - public FileReaderArgs getArgs() { - return args; - } - - public void setArgs(FileReaderArgs options) { - this.args = options; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/FileWriterFactory.java b/plugins/riot/src/main/java/com/redis/riot/FileWriterFactory.java deleted file mode 100644 index add76f5b6..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/FileWriterFactory.java +++ /dev/null @@ -1,155 +0,0 @@ -package com.redis.riot; - -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.item.file.transform.LineAggregator; -import org.springframework.batch.item.file.transform.PassThroughFieldExtractor; -import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; -import org.springframework.core.io.WritableResource; -import org.springframework.util.CollectionUtils; - -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.redis.riot.file.FileType; -import com.redis.riot.file.JsonLineAggregator; -import com.redis.riot.file.xml.XmlResourceItemWriter; -import com.redis.riot.file.xml.XmlResourceItemWriterBuilder; -import com.redis.riot.resource.FlatFileItemWriter; -import com.redis.riot.resource.FlatFileItemWriterBuilder; -import com.redis.riot.resource.FlatFileItemWriterBuilder.DelimitedBuilder; -import com.redis.riot.resource.FlatFileItemWriterBuilder.FormattedBuilder; -import com.redis.riot.resource.JsonFileItemWriter; -import com.redis.riot.resource.JsonFileItemWriterBuilder; - -public class FileWriterFactory { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private FileWriterArgs args = new FileWriterArgs(); - - @SuppressWarnings("unchecked") - public ItemWriter create(WritableResource resource, FileType fileType, - Supplier> headerSupplier) { - switch (fileType) { - case CSV: - return (ItemWriter) delimitedWriter(resource, headerSupplier); - case FIXED: - return (ItemWriter) fixedLengthWriter(resource, headerSupplier); - case JSON: - return jsonWriter(resource); - case JSONL: - return jsonlWriter(resource); - case XML: - return xmlWriter(resource); - default: - throw new UnsupportedOperationException("Unsupported file type: " + fileType); - } - } - - private FlatFileItemWriter jsonlWriter(WritableResource resource) { - FlatFileItemWriterBuilder builder = flatFileWriter(resource); - builder.lineAggregator(new JsonLineAggregator<>(objectMapper(new ObjectMapper()))); - return builder.build(); - } - - private T objectMapper(T objectMapper) { - objectMapper.setSerializationInclusion(Include.NON_NULL); - objectMapper.setSerializationInclusion(Include.NON_DEFAULT); - return objectMapper; - } - - private JsonFileItemWriter jsonWriter(WritableResource resource) { - JsonFileItemWriterBuilder writer = new JsonFileItemWriterBuilder<>(); - writer.name(resource.getFilename()); - writer.append(args.isAppend()); - writer.encoding(args.getFileArgs().getEncoding()); - writer.lineSeparator(args.getLineSeparator()); - writer.resource(resource); - writer.saveState(false); - ObjectMapper mapper = objectMapper(new ObjectMapper()); - writer.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper)); - return writer.build(); - } - - private XmlResourceItemWriter xmlWriter(WritableResource resource) { - XmlResourceItemWriterBuilder writer = new XmlResourceItemWriterBuilder<>(); - writer.name(resource.getFilename()); - writer.append(args.isAppend()); - writer.encoding(args.getFileArgs().getEncoding()); - writer.lineSeparator(args.getLineSeparator()); - writer.rootName(args.getRootName()); - writer.resource(resource); - writer.saveState(false); - XmlMapper mapper = objectMapper(new XmlMapper()); - mapper.setConfig(mapper.getSerializationConfig().withRootName(args.getElementName())); - writer.xmlObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper)); - return writer.build(); - } - - private ItemWriter> delimitedWriter(WritableResource resource, - Supplier> headerSupplier) { - FlatFileItemWriterBuilder> writer = flatFileWriter(resource); - DelimitedBuilder> delimitedBuilder = writer.delimited(); - delimitedBuilder.delimiter(args.getFileArgs().getDelimiter()); - delimitedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); - delimitedBuilder.quoteCharacter(String.valueOf(args.getFileArgs().getQuoteCharacter())); - return writer(writer, delimitedBuilder.build(), headerSupplier); - } - - private FlatFileItemWriter> writer(FlatFileItemWriterBuilder> writer, - LineAggregator> lineAggregator, Supplier> headerSupplier) { - writer.lineAggregator(lineAggregator); - if (args.getFileArgs().isHeader()) { - Map headerRecord = headerSupplier.get(); - if (CollectionUtils.isEmpty(headerRecord)) { - log.warn("Could not determine header"); - } else { - Map headerFieldMap = new HashMap<>(); - headerRecord.forEach((k, v) -> headerFieldMap.put(k, k)); - String headerLine = lineAggregator.aggregate(headerFieldMap); - log.info("Found header: {}", headerLine); - writer.headerCallback(w -> w.write(headerLine)); - } - } - return writer.build(); - } - - private ItemWriter> fixedLengthWriter(WritableResource resource, - Supplier> headerSupplier) { - FlatFileItemWriterBuilder> writer = flatFileWriter(resource); - FormattedBuilder> formattedBuilder = writer.formatted(); - formattedBuilder.format(args.getFormatterString()); - formattedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); - return writer(writer, formattedBuilder.build(), headerSupplier); - } - - private FlatFileItemWriterBuilder flatFileWriter(WritableResource resource) { - FlatFileItemWriterBuilder builder = new FlatFileItemWriterBuilder<>(); - builder.name(resource.getFilename()); - builder.resource(resource); - builder.append(args.isAppend()); - builder.encoding(args.getFileArgs().getEncoding()); - builder.forceSync(args.isForceSync()); - builder.lineSeparator(args.getLineSeparator()); - builder.saveState(false); - builder.shouldDeleteIfEmpty(args.isShouldDeleteIfEmpty()); - builder.shouldDeleteIfExists(args.isShouldDeleteIfExists()); - builder.transactional(args.isTransactional()); - return builder; - } - - public FileWriterArgs getArgs() { - return args; - } - - public void setArgs(FileWriterArgs options) { - this.args = options; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/Generate.java b/plugins/riot/src/main/java/com/redis/riot/Generate.java index 0b9747ce7..a2f2bdf39 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Generate.java +++ b/plugins/riot/src/main/java/com/redis/riot/Generate.java @@ -19,7 +19,7 @@ import picocli.CommandLine.Command; @Command(name = "generate", description = "Generate Redis data structures.") -public class Generate extends AbstractRedisCommand { +public class Generate extends AbstractRedisCommand { private static final String TASK_NAME = "Generating"; private static final String STEP_NAME = "step"; @@ -31,24 +31,24 @@ public class Generate extends AbstractRedisCommand { private RedisWriterArgs redisWriterArgs = new RedisWriterArgs(); @Override - protected RedisExecutionContext newExecutionContext() { - return new RedisExecutionContext(); - } - - @Override - protected Job job(RedisExecutionContext context) { + protected Job job() { if (StringUtils.hasLength(generateArgs.getIndex())) { - StatefulRedisModulesConnection connection = context.getRedisContext().getConnection(); + StatefulRedisModulesConnection connection = redisContext.getConnection(); connection.sync().ftCreate(generateArgs.getIndex(), indexCreateOptions(), indexFields()); } - RedisItemWriter> writer = RedisItemWriter.struct(); - log.info("Configuring Redis writer with {}", redisWriterArgs); - redisWriterArgs.configure(writer); - context.configure(writer); + RedisItemWriter> writer = writer(); Step, KeyValue> step = new Step<>(STEP_NAME, reader(), writer); step.taskName(TASK_NAME); step.maxItemCount(generateArgs.getCount()); - return job(context, step); + return job(step); + } + + private RedisItemWriter> writer() { + RedisItemWriter> writer = RedisItemWriter.struct(); + configure(writer); + log.info("Configuring Redis writer with {}", redisWriterArgs); + redisWriterArgs.configure(writer); + return writer; } private CreateOptions indexCreateOptions() { diff --git a/plugins/riot/src/main/java/com/redis/riot/GoogleStorageArgs.java b/plugins/riot/src/main/java/com/redis/riot/GoogleStorageArgs.java index 8ea36d901..a47c3f476 100644 --- a/plugins/riot/src/main/java/com/redis/riot/GoogleStorageArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/GoogleStorageArgs.java @@ -16,6 +16,7 @@ import com.google.cloud.spring.core.UserAgentHeaderProvider; import com.google.cloud.spring.storage.GoogleStorageResource; import com.google.cloud.storage.StorageOptions; +import com.redis.riot.core.RiotUtils; import picocli.CommandLine.Option; @@ -86,8 +87,8 @@ public Resource resource(String location) throws IOException { @Override public String toString() { - return "GoogleStorageArgs [keyFile=" + keyFile + ", projectId=" + projectId + ", encodedKey=" + encodedKey - + ", scope=" + scope + "]"; + return "GoogleStorageArgs [keyFile=" + keyFile + ", projectId=" + projectId + ", encodedKey=" + + RiotUtils.mask(encodedKey) + ", scope=" + scope + "]"; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/ImportProcessorArgs.java b/plugins/riot/src/main/java/com/redis/riot/ImportProcessorArgs.java index 720123192..40c44eb62 100644 --- a/plugins/riot/src/main/java/com/redis/riot/ImportProcessorArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/ImportProcessorArgs.java @@ -1,18 +1,8 @@ package com.redis.riot; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.function.FunctionItemProcessor; -import org.springframework.expression.EvaluationContext; -import org.springframework.util.CollectionUtils; - -import com.redis.riot.AbstractImportCommand.ExpressionProcessor; import com.redis.riot.core.Expression; -import com.redis.riot.core.RiotUtils; -import com.redis.riot.core.processor.PredicateOperator; import picocli.CommandLine.Option; @@ -24,17 +14,6 @@ public class ImportProcessorArgs { @Option(names = "--filter", description = "Discard records using a SpEL expression.", paramLabel = "") private Expression filter; - public ItemProcessor, Map> processor(EvaluationContext context) { - List, Map>> processors = new ArrayList<>(); - if (filter != null) { - processors.add(new FunctionItemProcessor<>(new PredicateOperator<>(filter.predicate(context)))); - } - if (!CollectionUtils.isEmpty(expressions)) { - processors.add(new ExpressionProcessor(context, expressions)); - } - return RiotUtils.processor(processors); - } - public Map getExpressions() { return expressions; } diff --git a/plugins/riot/src/main/java/com/redis/riot/Main.java b/plugins/riot/src/main/java/com/redis/riot/Main.java index 0ee2d777a..a5e24b053 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Main.java +++ b/plugins/riot/src/main/java/com/redis/riot/Main.java @@ -84,7 +84,7 @@ public static int executionStrategy(ParseResult parseResult) { for (ParseResult subcommand : parseResult.subcommands()) { Object command = subcommand.commandSpec().userObject(); if (AbstractImportCommand.class.isAssignableFrom(command.getClass())) { - AbstractImportCommand importCommand = (AbstractImportCommand) command; + AbstractImportCommand importCommand = (AbstractImportCommand) command; for (ParseResult redisCommand : subcommand.subcommands()) { if (redisCommand.isUsageHelpRequested()) { return new RunLast().execute(redisCommand); diff --git a/plugins/riot/src/main/java/com/redis/riot/NullableSqlParameterSource.java b/plugins/riot/src/main/java/com/redis/riot/NullableSqlParameterSource.java deleted file mode 100644 index 5b4ce1d61..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/NullableSqlParameterSource.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.redis.riot; - -import java.util.Map; - -import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; -import org.springframework.lang.Nullable; - -public class NullableSqlParameterSource extends MapSqlParameterSource { - - public NullableSqlParameterSource(@Nullable Map values) { - super(values); - } - - @Override - @Nullable - public Object getValue(String paramName) { - if (!hasValue(paramName)) { - return null; - } - return super.getValue(paramName); - } - -} \ No newline at end of file diff --git a/plugins/riot/src/main/java/com/redis/riot/Ping.java b/plugins/riot/src/main/java/com/redis/riot/Ping.java index 8c60ac537..728c2e9d5 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Ping.java +++ b/plugins/riot/src/main/java/com/redis/riot/Ping.java @@ -25,7 +25,7 @@ import picocli.CommandLine.ParentCommand; @Command(name = "ping", description = "Test connectivity to a Redis server.") -public class Ping extends AbstractRedisCommand { +public class Ping extends AbstractRedisCommand { private static final String TASK_NAME = "Pinging"; private static final String STEP_NAME = "step"; @@ -48,19 +48,14 @@ public class Ping extends AbstractRedisCommand { private int count = DEFAULT_COUNT; @Override - protected RedisExecutionContext newExecutionContext() { - return new RedisExecutionContext(); - } - - @Override - protected Job job(RedisExecutionContext context) { - PingExecutionItemReader reader = new PingExecutionItemReader(context.getRedisContext().getConnection().sync()); + protected Job job() { + PingExecutionItemReader reader = new PingExecutionItemReader(redisContext.getConnection().sync()); reader.setMaxItemCount(count); PingLatencyItemWriter writer = new PingLatencyItemWriter(); Step step = new Step<>(STEP_NAME, reader, writer); step.taskName(TASK_NAME); step.maxItemCount(count); - return job(context, step); + return job(step); } public static Set defaultPercentiles() { diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisURIArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java similarity index 59% rename from plugins/riot/src/main/java/com/redis/riot/RedisURIArgs.java rename to plugins/riot/src/main/java/com/redis/riot/RedisArgs.java index 662d9caef..dd661590a 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisURIArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisArgs.java @@ -1,16 +1,18 @@ package com.redis.riot; import java.time.Duration; -import java.util.Arrays; import com.redis.lettucemod.RedisURIBuilder; +import com.redis.riot.core.RiotUtils; import com.redis.riot.core.RiotVersion; import io.lettuce.core.RedisURI; import io.lettuce.core.SslVerifyMode; +import io.lettuce.core.protocol.ProtocolVersion; +import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; -public class RedisURIArgs { +public class RedisArgs { @Option(names = { "-u", "--uri" }, description = "Server URI.", paramLabel = "") private RedisURI uri; @@ -48,7 +50,29 @@ public class RedisURIArgs { @Option(names = "--client", description = "Client name used to connect to Redis (default: ${DEFAULT-VALUE}).", paramLabel = "") private String clientName = RiotVersion.riotVersion(); - public RedisURI redisURI() { + @Option(names = { "-c", "--cluster" }, description = "Enable cluster mode.") + private boolean cluster; + + @Option(names = "--auto-reconnect", description = "Automatically reconnect on connection loss. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true) + private boolean autoReconnect = RedisContext.DEFAULT_AUTO_RECONNECT; + + @Option(names = "--resp", description = "Redis protocol version used to connect to Redis: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") + private ProtocolVersion protocolVersion = RedisContext.DEFAULT_PROTOCOL_VERSION; + + @Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "") + private int poolSize = RedisContext.DEFAULT_POOL_SIZE; + + @ArgGroup(exclusive = false, heading = "TLS options%n") + private SslArgs sslArgs = new SslArgs(); + + public RedisContext redisContext() { + RedisContext context = RedisContext.create(redisURI(), cluster, autoReconnect, protocolVersion, + sslArgs.sslOptions()); + context.setPoolSize(poolSize); + return context; + } + + private RedisURI redisURI() { RedisURIBuilder builder = new RedisURIBuilder(); builder.clientName(clientName); builder.database(database); @@ -66,6 +90,46 @@ public RedisURI redisURI() { return builder.build(); } + public boolean isCluster() { + return cluster; + } + + public void setCluster(boolean cluster) { + this.cluster = cluster; + } + + public boolean isAutoReconnect() { + return autoReconnect; + } + + public void setAutoReconnect(boolean autoReconnect) { + this.autoReconnect = autoReconnect; + } + + public ProtocolVersion getProtocolVersion() { + return protocolVersion; + } + + public void setProtocolVersion(ProtocolVersion version) { + this.protocolVersion = version; + } + + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + public SslArgs getSslArgs() { + return sslArgs; + } + + public void setSslArgs(SslArgs sslArgs) { + this.sslArgs = sslArgs; + } + public RedisURI getUri() { return uri; } @@ -156,9 +220,11 @@ public void setClientName(String clientName) { @Override public String toString() { - return "RedisURIArgs [uri=" + uri + ", host=" + host + ", port=" + port + ", socket=" + socket + ", username=" - + username + ", password=" + Arrays.toString(password) + ", timeout=" + timeout + ", database=" - + database + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName + "]"; + return "RedisClientArgs [uri=" + uri + ", host=" + host + ", port=" + port + ", socket=" + socket + + ", username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout + + ", database=" + database + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName + + ", cluster=" + cluster + ", autoReconnect=" + autoReconnect + ", protocolVersion=" + protocolVersion + + ", poolSize=" + poolSize + ", sslArgs=" + sslArgs + "]"; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisClientArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisClientArgs.java deleted file mode 100644 index 7acda14f9..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/RedisClientArgs.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.redis.riot; - -import io.lettuce.core.protocol.ProtocolVersion; -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Option; - -public class RedisClientArgs { - - @ArgGroup(exclusive = false, heading = "TLS options%n") - private SslArgs sslArgs = new SslArgs(); - - @Option(names = { "-c", "--cluster" }, description = "Enable cluster mode.") - private boolean cluster; - - @Option(names = "--auto-reconnect", description = "Automatically reconnect on connection loss. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true) - private boolean autoReconnect = RedisContext.DEFAULT_AUTO_RECONNECT; - - @Option(names = "--resp", description = "Redis protocol version used to connect to Redis: ${COMPLETION-CANDIDATES}.", paramLabel = "") - private ProtocolVersion protocolVersion = RedisContext.DEFAULT_PROTOCOL_VERSION; - - @Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "") - private int poolSize = RedisContext.DEFAULT_POOL_SIZE; - - public boolean isCluster() { - return cluster; - } - - public void setCluster(boolean cluster) { - this.cluster = cluster; - } - - public boolean isAutoReconnect() { - return autoReconnect; - } - - public void setAutoReconnect(boolean autoReconnect) { - this.autoReconnect = autoReconnect; - } - - public ProtocolVersion getProtocolVersion() { - return protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion version) { - this.protocolVersion = version; - } - - public int getPoolSize() { - return poolSize; - } - - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - public SslArgs getSslArgs() { - return sslArgs; - } - - public void setSslArgs(SslArgs sslArgs) { - this.sslArgs = sslArgs; - } - - @Override - public String toString() { - return "RedisClientArgs [sslArgs=" + sslArgs + ", cluster=" + cluster + ", autoReconnect=" + autoReconnect - + ", protocolVersion=" + protocolVersion + ", poolSize=" + poolSize + "]"; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisContext.java b/plugins/riot/src/main/java/com/redis/riot/RedisContext.java index 7a1e335c7..f223eaba0 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisContext.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisContext.java @@ -1,7 +1,5 @@ package com.redis.riot; -import org.springframework.beans.factory.InitializingBean; - import com.redis.lettucemod.RedisModulesClientBuilder; import com.redis.lettucemod.RedisModulesUtils; import com.redis.lettucemod.api.StatefulRedisModulesConnection; @@ -16,80 +14,59 @@ import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.protocol.ProtocolVersion; -public class RedisContext implements InitializingBean, AutoCloseable { +public class RedisContext implements AutoCloseable { public static final boolean DEFAULT_AUTO_RECONNECT = ClientOptions.DEFAULT_AUTO_RECONNECT; - public static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ClientOptions.DEFAULT_PROTOCOL_VERSION; + public static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.RESP2; public static final int DEFAULT_POOL_SIZE = RedisItemReader.DEFAULT_POOL_SIZE; - private RedisURI uri; - private boolean cluster; - private boolean autoReconnect = DEFAULT_AUTO_RECONNECT; - private ProtocolVersion protocolVersion = DEFAULT_PROTOCOL_VERSION; + private final RedisURI uri; + private final AbstractRedisClient client; + private final StatefulRedisModulesConnection connection; private int poolSize = DEFAULT_POOL_SIZE; - private SslOptions sslOptions; - private AbstractRedisClient client; - private StatefulRedisModulesConnection connection; + private RedisContext(RedisURI uri, AbstractRedisClient client) { + this.uri = uri; + this.client = client; + this.connection = RedisModulesUtils.connection(client); + } - @Override - public void afterPropertiesSet() { + public static RedisContext create(RedisURI uri, boolean cluster, boolean autoReconnect, + ProtocolVersion protocolVersion, SslOptions sslOptions) { RedisModulesClientBuilder clientBuilder = new RedisModulesClientBuilder(); clientBuilder.cluster(cluster); - clientBuilder.clientOptions(clientOptions()); - clientBuilder.uri(uri); - client = clientBuilder.build(); - connection = RedisModulesUtils.connection(client); - } - - private ClientOptions clientOptions() { - Builder options = clientOptionsBuilder().autoReconnect(autoReconnect).protocolVersion(protocolVersion); + Builder options = cluster ? ClusterClientOptions.builder() : ClientOptions.builder(); + options.autoReconnect(autoReconnect); + options.protocolVersion(protocolVersion); if (sslOptions != null) { options.sslOptions(sslOptions); } - return options.build(); + clientBuilder.clientOptions(options.build()); + clientBuilder.uri(uri); + return new RedisContext(uri, clientBuilder.build()); } - private ClientOptions.Builder clientOptionsBuilder() { - if (cluster) { - return ClusterClientOptions.builder(); - } - return ClientOptions.builder(); + public int getPoolSize() { + return poolSize; } - public void configure(RedisItemWriter writer) { - writer.setClient(client); - writer.setPoolSize(poolSize); + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; } public void configure(RedisItemReader reader) { reader.setClient(client); - reader.setPoolSize(poolSize); reader.setDatabase(uri.getDatabase()); + reader.setPoolSize(poolSize); } - public RedisURI getUri() { - return uri; - } - - public void setUri(RedisURI uri) { - this.uri = uri; - } - - public boolean isCluster() { - return cluster; - } - - public void setCluster(boolean cluster) { - this.cluster = cluster; - } - - public int getPoolSize() { - return poolSize; + public void configure(RedisItemWriter writer) { + writer.setClient(client); + writer.setPoolSize(poolSize); } - public void setPoolSize(int poolSize) { - this.poolSize = poolSize; + public RedisURI getUri() { + return uri; } public AbstractRedisClient getClient() { @@ -100,40 +77,14 @@ public StatefulRedisModulesConnection getConnection() { return connection; } - public boolean isAutoReconnect() { - return autoReconnect; - } - - public void setAutoReconnect(boolean autoReconnect) { - this.autoReconnect = autoReconnect; - } - - public ProtocolVersion getProtocolVersion() { - return protocolVersion; - } - - public void setProtocolVersion(ProtocolVersion protocolVersion) { - this.protocolVersion = protocolVersion; - } - - public SslOptions getSslOptions() { - return sslOptions; - } - - public void setSslOptions(SslOptions sslOptions) { - this.sslOptions = sslOptions; - } - @Override - public void close() throws Exception { + public void close() { if (connection != null) { connection.close(); - connection = null; } if (client != null) { client.shutdown(); client.getResources().shutdown(); - client = null; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisExecutionContext.java b/plugins/riot/src/main/java/com/redis/riot/RedisExecutionContext.java deleted file mode 100644 index 518574245..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/RedisExecutionContext.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.redis.riot; - -import org.springframework.util.Assert; - -import com.redis.riot.core.JobExecutionContext; -import com.redis.spring.batch.item.redis.RedisItemReader; -import com.redis.spring.batch.item.redis.RedisItemWriter; - -public class RedisExecutionContext extends JobExecutionContext { - - private RedisContext redisContext; - - @Override - public void afterPropertiesSet() throws Exception { - Assert.notNull(redisContext, "Redis context must not be null"); - redisContext.afterPropertiesSet(); - super.afterPropertiesSet(); - } - - @Override - public void close() throws Exception { - if (redisContext != null) { - redisContext.close(); - redisContext = null; - } - super.close(); - } - - public void configure(RedisItemWriter writer) { - redisContext.configure(writer); - } - - public void configure(RedisItemReader reader) { - super.configure(reader); - redisContext.configure(reader); - } - - public RedisContext getRedisContext() { - return redisContext; - } - - public void setRedisContext(RedisContext context) { - this.redisContext = context; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisExportStep.java b/plugins/riot/src/main/java/com/redis/riot/RedisExportStep.java new file mode 100644 index 000000000..d547bd9b4 --- /dev/null +++ b/plugins/riot/src/main/java/com/redis/riot/RedisExportStep.java @@ -0,0 +1,65 @@ +package com.redis.riot; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; + +import com.redis.lettucemod.RedisModulesUtils; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.riot.core.Step; +import com.redis.spring.batch.item.redis.RedisItemReader; +import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode; +import com.redis.spring.batch.item.redis.common.KeyValue; + +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisException; + +public class RedisExportStep extends Step, O> implements InitializingBean { + + public static final String NOTIFY_CONFIG = "notify-keyspace-events"; + public static final String NOTIFY_CONFIG_VALUE = "KEA"; + + public RedisExportStep(String name, RedisItemReader reader, ItemWriter writer) { + super(name, reader, writer); + } + + @Override + public void afterPropertiesSet() { + @SuppressWarnings("unchecked") + RedisItemReader reader = (RedisItemReader) getReader(); + if (reader.getMode() != ReaderMode.LIVEONLY) { + maxItemCountSupplier(reader.scanSizeEstimator()); + } + if (reader.getMode() != ReaderMode.SCAN) { + checkNotifyConfig(reader.getClient()); + live(true); + flushInterval(reader.getFlushInterval()); + idleTimeout(reader.getIdleTimeout()); + } + } + + private void checkNotifyConfig(AbstractRedisClient client) { + Map valueMap; + try (StatefulRedisModulesConnection conn = RedisModulesUtils.connection(client)) { + try { + valueMap = conn.sync().configGet(NOTIFY_CONFIG); + } catch (RedisException e) { + return; + } + } + String actual = valueMap.getOrDefault(NOTIFY_CONFIG, ""); + Set expected = characterSet(NOTIFY_CONFIG_VALUE); + Assert.isTrue(characterSet(actual).containsAll(expected), + String.format("Keyspace notifications not property configured: expected '%s' but was '%s'.", + NOTIFY_CONFIG_VALUE, actual)); + } + + private Set characterSet(String string) { + return string.codePoints().mapToObj(c -> (char) c).collect(Collectors.toSet()); + } + +} diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java index d0e73e924..baec30b4f 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java @@ -25,7 +25,6 @@ public class RedisReaderArgs { public static final Duration DEFAULT_POLL_TIMEOUT = AbstractPollableItemReader.DEFAULT_POLL_TIMEOUT; public static final int DEFAULT_THREADS = AbstractAsyncItemReader.DEFAULT_THREADS; public static final int DEFAULT_CHUNK_SIZE = AbstractAsyncItemReader.DEFAULT_CHUNK_SIZE; - public static final DataSize DEFAULT_MEMORY_USAGE_LIMIT = DataSize.ofBytes(-1); public static final int DEFAULT_MEMORY_USAGE_SAMPLES = KeyValueRead.DEFAULT_MEM_USAGE_SAMPLES; public static final long DEFAULT_SCAN_COUNT = 1000; public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL; @@ -55,8 +54,8 @@ public class RedisReaderArgs { @Option(names = "--read-from", description = "Which Redis cluster nodes to read from: ${COMPLETION-CANDIDATES}.", paramLabel = "") private RedisReadFrom readFrom; - @Option(names = "--mem-limit", description = "Max mem usage for a key to be read. Use 0 for no limit, -1 to disable (default). Examples: 12KB, 5MB", paramLabel = "") - private DataSize memUsageLimit = DEFAULT_MEMORY_USAGE_LIMIT; + @Option(names = "--mem-limit", description = "Max mem usage for a key to be read, for example 12KB 5MB. Use 0 for no limit but still read mem usage.", paramLabel = "") + private DataSize memUsageLimit; @Option(names = "--mem-samples", description = "Number of memory usage samples for a key (default: ${DEFAULT-VALUE}).", paramLabel = "") private int memUsageSamples = DEFAULT_MEMORY_USAGE_SAMPLES; @@ -82,7 +81,7 @@ public class RedisReaderArgs { @Option(names = "--read-poll", description = "Interval in millis between queue polls (default: ${DEFAULT-VALUE}).", paramLabel = "", hidden = true) private long pollTimeout = DEFAULT_POLL_TIMEOUT.toMillis(); - public void configure(RedisItemReader reader) { + public void configure(RedisItemReader reader) { reader.setChunkSize(chunkSize); reader.setFlushInterval(Duration.ofMillis(flushInterval)); if (idleTimeout > 0) { @@ -93,7 +92,7 @@ public void configure(RedisItemReader reader) { reader.setMode(mode); reader.setNotificationQueueCapacity(notificationQueueCapacity); reader.setPollTimeout(Duration.ofMillis(pollTimeout)); - reader.setProcessor(keyProcessor(reader.getCodec())); + reader.setProcessor(keyProcessor(reader.getCodec(), keyFilterArgs)); reader.setQueueCapacity(queueCapacity); if (readFrom != null) { reader.setReadFrom(readFrom.getReadFrom()); @@ -102,16 +101,16 @@ public void configure(RedisItemReader reader) { reader.setScanCount(scanCount); reader.setSkipLimit(skipLimit); reader.setThreads(threads); - if (reader.getOperation() instanceof KeyValueRead) { + if (memUsageLimit != null && reader.getOperation() instanceof KeyValueRead) { @SuppressWarnings("rawtypes") KeyValueRead operation = (KeyValueRead) reader.getOperation(); - operation.setMemUsageLimit(memUsageLimit); + operation.setMemUsageLimit(memUsageLimit.toBytes()); operation.setMemUsageSamples(memUsageSamples); } } - private ItemProcessor, KeyEvent> keyProcessor(RedisCodec codec) { - return keyFilterArgs.predicate(codec).map(p -> new FunctionPredicate, K>(KeyEvent::getKey, p)) + private ItemProcessor, KeyEvent> keyProcessor(RedisCodec codec, KeyFilterArgs args) { + return args.predicate(codec).map(p -> new FunctionPredicate, K>(KeyEvent::getKey, p)) .map(PredicateOperator::new).map(FunctionItemProcessor::new).orElse(null); } @@ -227,6 +226,30 @@ public void setMode(ReaderMode mode) { this.mode = mode; } + public long getPollTimeout() { + return pollTimeout; + } + + public void setPollTimeout(long pollTimeout) { + this.pollTimeout = pollTimeout; + } + + public int getRetryLimit() { + return retryLimit; + } + + public void setRetryLimit(int retryLimit) { + this.retryLimit = retryLimit; + } + + public int getSkipLimit() { + return skipLimit; + } + + public void setSkipLimit(int skipLimit) { + this.skipLimit = skipLimit; + } + @Override public String toString() { return "RedisReaderArgs [mode=" + mode + ", keyPattern=" + keyPattern + ", keyType=" + keyType + ", scanCount=" diff --git a/plugins/riot/src/main/java/com/redis/riot/RedisWriterArgs.java b/plugins/riot/src/main/java/com/redis/riot/RedisWriterArgs.java index 2166db202..335c74e61 100644 --- a/plugins/riot/src/main/java/com/redis/riot/RedisWriterArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/RedisWriterArgs.java @@ -25,22 +25,15 @@ public class RedisWriterArgs { @Option(names = "--merge", description = "Merge properties from collection data structures (`hash`, `set`, ...) instead of overwriting them.") private boolean merge; - public void configure(RedisItemWriter writer) { + protected void configure(RedisItemWriter writer) { writer.setMultiExec(multiExec); writer.setWaitReplicas(waitReplicas); writer.setWaitTimeout(waitTimeout); if (writer.getOperation() instanceof KeyValueWrite) { - ((KeyValueWrite) writer.getOperation()).setMode(writeMode()); + ((KeyValueWrite) writer.getOperation()).setMode(merge ? WriteMode.MERGE : WriteMode.OVERWRITE); } } - private WriteMode writeMode() { - if (merge) { - return WriteMode.MERGE; - } - return WriteMode.OVERWRITE; - } - public boolean isMultiExec() { return multiExec; } diff --git a/plugins/riot/src/main/java/com/redis/riot/Replicate.java b/plugins/riot/src/main/java/com/redis/riot/Replicate.java index 761382ea7..8d8ccb058 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Replicate.java +++ b/plugins/riot/src/main/java/com/redis/riot/Replicate.java @@ -69,14 +69,14 @@ protected boolean isQuickCompare() { } @Override - protected Job job(TargetRedisExecutionContext context) { + protected Job job() { List> steps = new ArrayList<>(); - Step, KeyValue> replicateStep = step(context); + Step, KeyValue> replicateStep = step(); steps.add(replicateStep); if (shouldCompare()) { - steps.add(compareStep(context)); + steps.add(compareStep()); } - return job(context, steps); + return job(steps); } @Override @@ -84,18 +84,18 @@ protected boolean isIgnoreStreamMessageId() { return !processorArgs.isPropagateIds(); } - private ItemProcessor, KeyValue> processor( - TargetRedisExecutionContext context) { - return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor(context)); + private ItemProcessor, KeyValue> processor() { + return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor()); } - private ItemProcessor, KeyValue> keyValueProcessor( - TargetRedisExecutionContext context) { + private ItemProcessor, KeyValue> keyValueProcessor() { if (isIgnoreStreamMessageId()) { - Assert.isTrue(isStruct(), "'--no-stream-ids' can only be used with '--struct'"); + Assert.isTrue(isStruct(), "--no-stream-ids can only be used with --struct"); } + StandardEvaluationContext evaluationContext = evaluationContext(); + log.info("Creating processor with {}", processorArgs); ItemProcessor, KeyValue> processor = processorArgs - .processor(evaluationContext(context)); + .processor(evaluationContext); if (processor == null) { return null; } @@ -104,28 +104,29 @@ private ItemProcessor, KeyValue> keyVal return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode)); } - private StandardEvaluationContext evaluationContext(TargetRedisExecutionContext context) { + private StandardEvaluationContext evaluationContext() { + log.info("Creating SpEL evaluation context with {}", evaluationContextArgs); StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext(); - evaluationContext.setVariable(VAR_SOURCE, context.getSourceRedisContext().getConnection().sync()); - evaluationContext.setVariable(VAR_TARGET, context.getTargetRedisContext().getConnection().sync()); + evaluationContext.setVariable(VAR_SOURCE, sourceRedisContext.getConnection().sync()); + evaluationContext.setVariable(VAR_TARGET, targetRedisContext.getConnection().sync()); return evaluationContext; } - protected void configureTargetWriter(TargetRedisExecutionContext context, RedisItemWriter writer) { + protected void configureTargetWriter(RedisItemWriter writer) { + targetRedisContext.configure(writer); log.info("Configuring target Redis writer with {}", targetRedisWriterArgs); targetRedisWriterArgs.configure(writer); - context.configureTargetWriter(writer); } - private Step, KeyValue> step(TargetRedisExecutionContext context) { + private Step, KeyValue> step() { RedisItemReader reader = reader(); - configureSourceReader(context, reader); + configureSourceReader(reader); RedisItemWriter> writer = writer(); - configureTargetWriter(context, writer); - Step, KeyValue> step = new Step<>(STEP_NAME, reader, writer); - step.processor(processor(context)); + configureTargetWriter(writer); + RedisExportStep> step = new RedisExportStep<>(STEP_NAME, + reader, writer); + step.processor(processor()); step.taskName(taskName(reader)); - AbstractExportCommand.configure(step); if (reader.getMode() != ReaderMode.SCAN) { step.statusMessageSupplier(() -> liveExtraMessage(reader)); } @@ -138,6 +139,7 @@ private Step, KeyValue> step(TargetRedi reader.addItemReadListener(readLogger); reader.addItemWriteListener(readLogger); } + step.afterPropertiesSet(); return step; } @@ -148,18 +150,20 @@ private boolean shouldCompare() { @SuppressWarnings({ "unchecked", "rawtypes" }) private RedisItemReader reader() { if (struct) { - log.info("Creating data-structure type Redis reader"); + log.info("Creating Redis data-structure reader"); return RedisItemReader.struct(ByteArrayCodec.INSTANCE); } - log.info("Creating dump Redis reader"); + log.info("Creating Redis dump reader"); return (RedisItemReader) RedisItemReader.dump(); } @SuppressWarnings({ "unchecked", "rawtypes" }) private RedisItemWriter> writer() { if (struct) { + log.info("Creating Redis data-structure writer"); return RedisItemWriter.struct(ByteArrayCodec.INSTANCE); } + log.info("Creating Redis dump writer"); return (RedisItemWriter) RedisItemWriter.dump(); } @@ -185,9 +189,9 @@ private String liveExtraMessage(RedisItemReader reader) { } @Override - protected KeyComparisonItemReader compareReader(TargetRedisExecutionContext context) { - KeyComparisonItemReader reader = super.compareReader(context); - reader.setProcessor(processor(context)); + protected KeyComparisonItemReader compareReader() { + KeyComparisonItemReader reader = super.compareReader(); + reader.setProcessor(processor()); return reader; } diff --git a/plugins/riot/src/main/java/com/redis/riot/SourceRedisClientArgs.java b/plugins/riot/src/main/java/com/redis/riot/SourceRedisClientArgs.java index a34a52cf7..a383fb1e2 100644 --- a/plugins/riot/src/main/java/com/redis/riot/SourceRedisClientArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/SourceRedisClientArgs.java @@ -11,7 +11,7 @@ public class SourceRedisClientArgs { @Option(names = "--source-auto-reconnect", description = "Automatically reconnect on connection loss. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true) private boolean autoReconnect = RedisContext.DEFAULT_AUTO_RECONNECT; - @Option(names = "--source-resp", description = "Redis protocol version used to connect to Redis: ${COMPLETION-CANDIDATES}.", paramLabel = "") + @Option(names = "--source-resp", description = "Redis protocol version used to connect to Redis: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") private ProtocolVersion protocolVersion = RedisContext.DEFAULT_PROTOCOL_VERSION; @Option(names = "--source-pool", description = "Max pool connections used for source Redis (default: ${DEFAULT-VALUE}).", paramLabel = "") diff --git a/plugins/riot/src/main/java/com/redis/riot/SourceRedisURIArgs.java b/plugins/riot/src/main/java/com/redis/riot/SourceRedisURIArgs.java index 0f80ca9d6..1918a5db5 100644 --- a/plugins/riot/src/main/java/com/redis/riot/SourceRedisURIArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/SourceRedisURIArgs.java @@ -1,11 +1,7 @@ package com.redis.riot; -import java.util.Arrays; +import com.redis.riot.core.RiotUtils; -import com.redis.lettucemod.RedisURIBuilder; - -import io.lettuce.core.RedisURI; -import io.lettuce.core.SslVerifyMode; import picocli.CommandLine.Option; public class SourceRedisURIArgs { @@ -19,17 +15,6 @@ public class SourceRedisURIArgs { @Option(names = "--source-insecure", description = "Allow insecure TLS connection by skipping cert validation.") private boolean insecure; - public RedisURI redisURI(RedisURI uri) { - RedisURIBuilder builder = new RedisURIBuilder(); - builder.password(password); - builder.uri(uri); - builder.username(username); - if (insecure) { - builder.verifyMode(SslVerifyMode.NONE); - } - return builder.build(); - } - public String getUsername() { return username; } @@ -56,7 +41,7 @@ public void setInsecure(boolean insecure) { @Override public String toString() { - return "SourceRedisURIArgs [username=" + username + ", password=" + Arrays.toString(password) + ", insecure=" + return "SourceRedisURIArgs [username=" + username + ", password=" + RiotUtils.mask(password) + ", insecure=" + insecure + "]"; } diff --git a/plugins/riot/src/main/java/com/redis/riot/SslArgs.java b/plugins/riot/src/main/java/com/redis/riot/SslArgs.java index b78806e49..e038e6116 100644 --- a/plugins/riot/src/main/java/com/redis/riot/SslArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/SslArgs.java @@ -1,7 +1,8 @@ package com.redis.riot; import java.io.File; -import java.util.Arrays; + +import com.redis.riot.core.RiotUtils; import io.lettuce.core.SslOptions; import io.lettuce.core.SslOptions.Builder; @@ -117,9 +118,9 @@ public void setTrustedCerts(File trustedCerts) { @Override public String toString() { - return "SslArgs [keystore=" + keystore + ", keystorePassword=" + Arrays.toString(keystorePassword) - + ", truststore=" + truststore + ", truststorePassword=" + Arrays.toString(truststorePassword) - + ", keyCert=" + keyCert + ", key=" + key + ", keyPassword=" + Arrays.toString(keyPassword) + return "SslArgs [keystore=" + keystore + ", keystorePassword=" + RiotUtils.mask(keystorePassword) + + ", truststore=" + truststore + ", truststorePassword=" + RiotUtils.mask(truststorePassword) + + ", keyCert=" + keyCert + ", key=" + key + ", keyPassword=" + RiotUtils.mask(keyPassword) + ", trustedCerts=" + trustedCerts + "]"; } diff --git a/plugins/riot/src/main/java/com/redis/riot/TargetRedisClientArgs.java b/plugins/riot/src/main/java/com/redis/riot/TargetRedisClientArgs.java index 34450662a..ef48d0e74 100644 --- a/plugins/riot/src/main/java/com/redis/riot/TargetRedisClientArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/TargetRedisClientArgs.java @@ -11,7 +11,7 @@ public class TargetRedisClientArgs { @Option(names = "--target-auto-reconnect", description = "Automatically reconnect to target on connection loss. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true) private boolean autoReconnect = RedisContext.DEFAULT_AUTO_RECONNECT; - @Option(names = "--target-resp", description = "Redis protocol version used to connect to target: ${COMPLETION-CANDIDATES}.", paramLabel = "") + @Option(names = "--target-resp", description = "Redis protocol version used to connect to target: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "") private ProtocolVersion protocolVersion = RedisContext.DEFAULT_PROTOCOL_VERSION; @Option(names = "--target-pool", description = "Max pool connections used for target Redis (default: ${DEFAULT-VALUE}).", paramLabel = "") diff --git a/plugins/riot/src/main/java/com/redis/riot/TargetRedisExecutionContext.java b/plugins/riot/src/main/java/com/redis/riot/TargetRedisExecutionContext.java deleted file mode 100644 index 03ef47f0f..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/TargetRedisExecutionContext.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.redis.riot; - -import org.springframework.util.Assert; - -import com.redis.riot.core.JobExecutionContext; -import com.redis.spring.batch.item.redis.RedisItemReader; -import com.redis.spring.batch.item.redis.RedisItemWriter; - -public class TargetRedisExecutionContext extends JobExecutionContext { - - private RedisContext sourceRedisContext; - private RedisContext targetRedisContext; - - @Override - public void afterPropertiesSet() throws Exception { - Assert.notNull(sourceRedisContext, "Source Redis context not set"); - Assert.notNull(targetRedisContext, "Target Redis context not set"); - sourceRedisContext.afterPropertiesSet(); - targetRedisContext.afterPropertiesSet(); - super.afterPropertiesSet(); - } - - @Override - public void close() throws Exception { - if (targetRedisContext != null) { - targetRedisContext.close(); - targetRedisContext = null; - } - if (sourceRedisContext != null) { - sourceRedisContext.close(); - sourceRedisContext = null; - } - super.close(); - } - - public void configureTargetWriter(RedisItemWriter writer) { - targetRedisContext.configure(writer); - } - - public void configureSourceReader(RedisItemReader reader) { - configure(reader); - sourceRedisContext.configure(reader); - } - - public void configureTargetReader(RedisItemReader reader) { - configure(reader); - targetRedisContext.configure(reader); - } - - public RedisContext getSourceRedisContext() { - return sourceRedisContext; - } - - public void setSourceRedisContext(RedisContext sourceRedisContext) { - this.sourceRedisContext = sourceRedisContext; - } - - public RedisContext getTargetRedisContext() { - return targetRedisContext; - } - - public void setTargetRedisContext(RedisContext targetRedisContext) { - this.targetRedisContext = targetRedisContext; - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/TargetRedisURIArgs.java b/plugins/riot/src/main/java/com/redis/riot/TargetRedisURIArgs.java index ded56d28e..9a0bd94b7 100644 --- a/plugins/riot/src/main/java/com/redis/riot/TargetRedisURIArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/TargetRedisURIArgs.java @@ -1,11 +1,7 @@ package com.redis.riot; -import java.util.Arrays; +import com.redis.riot.core.RiotUtils; -import com.redis.lettucemod.RedisURIBuilder; - -import io.lettuce.core.RedisURI; -import io.lettuce.core.SslVerifyMode; import picocli.CommandLine.Option; public class TargetRedisURIArgs { @@ -19,17 +15,6 @@ public class TargetRedisURIArgs { @Option(names = "--target-insecure", description = "Allow insecure TLS connection to target by skipping cert validation.") private boolean insecure; - public RedisURI redisURI(RedisURI uri) { - RedisURIBuilder builder = new RedisURIBuilder(); - builder.password(password); - builder.uri(uri); - builder.username(username); - if (insecure) { - builder.verifyMode(SslVerifyMode.NONE); - } - return builder.build(); - } - public String getUsername() { return username; } @@ -56,7 +41,7 @@ public void setInsecure(boolean insecure) { @Override public String toString() { - return "TargetRedisURIArgs [username=" + username + ", password=" + Arrays.toString(password) + ", insecure=" + return "TargetRedisURIArgs [username=" + username + ", password=" + RiotUtils.mask(password) + ", insecure=" + insecure + "]"; } diff --git a/plugins/riot/src/test/java/com/redis/riot/AbstractRiotTestBase.java b/plugins/riot/src/test/java/com/redis/riot/AbstractRiotTestBase.java index 4efa49bf6..af82b9964 100644 --- a/plugins/riot/src/test/java/com/redis/riot/AbstractRiotTestBase.java +++ b/plugins/riot/src/test/java/com/redis/riot/AbstractRiotTestBase.java @@ -102,21 +102,21 @@ private void configure(TestInfo info, ParseResult parseResult) { command = subParseResult.commandSpec().parent().commandLine().getCommand(); } if (command instanceof AbstractJobCommand) { - AbstractJobCommand jobCommand = ((AbstractJobCommand) command); + AbstractJobCommand jobCommand = ((AbstractJobCommand) command); jobCommand.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE); jobCommand.setJobName(name(info)); } if (command instanceof AbstractRedisCommand) { - AbstractRedisCommand redisCommand = (AbstractRedisCommand) command; - redisCommand.getRedisURIArgs().setUri(redisURI); - redisCommand.getRedisClientArgs().setCluster(getRedisServer().isRedisCluster()); + AbstractRedisCommand redisCommand = (AbstractRedisCommand) command; + redisCommand.getRedisArgs().setUri(redisURI); + redisCommand.getRedisArgs().setCluster(getRedisServer().isRedisCluster()); } if (command instanceof AbstractExportCommand) { - AbstractExportCommand exportCommand = (AbstractExportCommand) command; + AbstractExportCommand exportCommand = (AbstractExportCommand) command; configure(exportCommand.getRedisReaderArgs()); } - if (command instanceof AbstractTargetCommand) { - AbstractTargetCommand targetCommand = (AbstractTargetCommand) command; + if (command instanceof AbstractRedisToRedisCommand) { + AbstractRedisToRedisCommand targetCommand = (AbstractRedisToRedisCommand) command; configure(targetCommand.getSourceRedisReaderArgs()); targetCommand.setSourceRedisURI(redisURI); targetCommand.getSourceRedisClientArgs().setCluster(getRedisServer().isRedisCluster()); diff --git a/plugins/riot/src/test/java/com/redis/riot/FileTests.java b/plugins/riot/src/test/java/com/redis/riot/FileTests.java index 3a4685dac..29dbab41e 100644 --- a/plugins/riot/src/test/java/com/redis/riot/FileTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/FileTests.java @@ -76,9 +76,9 @@ void fileApiImportCSV(TestInfo info) throws Exception { } } - private void configure(TestInfo info, AbstractRedisCommand callable) { - callable.getRedisURIArgs().setUri(RedisURI.create(getRedisServer().getRedisURI())); - callable.getRedisClientArgs().setCluster(getRedisServer().isRedisCluster()); + private void configure(TestInfo info, AbstractRedisCommand callable) { + callable.getRedisArgs().setUri(RedisURI.create(getRedisServer().getRedisURI())); + callable.getRedisArgs().setCluster(getRedisServer().isRedisCluster()); callable.setJobName(name(info)); callable.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE); } @@ -181,8 +181,8 @@ private void fileExportImport(TestInfo info, String filename) throws Exception { FileImport fileImport = new FileImport(); configure(info, fileImport); fileImport.setFiles(file); - fileImport.getRedisURIArgs().setUri(RedisURI.create(getTargetRedisServer().getRedisURI())); - fileImport.getRedisClientArgs().setCluster(getTargetRedisServer().isRedisCluster()); + fileImport.getRedisArgs().setUri(RedisURI.create(getTargetRedisServer().getRedisURI())); + fileImport.getRedisArgs().setCluster(getTargetRedisServer().isRedisCluster()); fileImport.call(); KeyspaceComparison comparison = compare(info); Assertions.assertFalse(comparison.getAll().isEmpty()); diff --git a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java index f658cb1eb..8f1ae8f5f 100644 --- a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java @@ -762,10 +762,10 @@ void keyProcessorWithDate(TestInfo info) throws Throwable { void testMapProcessor() throws Exception { Map expressions = new LinkedHashMap<>(); expressions.put("field1", Expression.parse("'test:1'")); - ImportProcessorArgs processorArgs = new ImportProcessorArgs(); - processorArgs.setExpressions(expressions); - ItemProcessor, Map> processor = processorArgs - .processor(evaluationContext()); + ImportProcessorArgs args = new ImportProcessorArgs(); + args.setExpressions(expressions); + ItemProcessor, Map> processor = AbstractImportCommand + .processor(evaluationContext(), args); Map map = processor.process(new HashMap<>()); Assertions.assertEquals("test:1", map.get("field1")); // Assertions.assertEquals("1", map.get("id")); @@ -779,9 +779,10 @@ void processor() throws Exception { expressions.put("field3", Expression.parse("1")); expressions.put("field4", Expression.parse("2")); expressions.put("field5", Expression.parse("field3+field4")); - ImportProcessorArgs options = new ImportProcessorArgs(); - options.setExpressions(expressions); - ItemProcessor, Map> processor = options.processor(evaluationContext()); + ImportProcessorArgs args = new ImportProcessorArgs(); + args.setExpressions(expressions); + ItemProcessor, Map> processor = AbstractImportCommand + .processor(evaluationContext(), args); for (int index = 0; index < 10; index++) { Map result = processor.process(new HashMap<>()); assertEquals(5, result.size()); @@ -799,9 +800,10 @@ private EvaluationContext evaluationContext() { @Test void processorFilter() throws Exception { - ImportProcessorArgs options = new ImportProcessorArgs(); - options.setFilter(Expression.parse("index<10")); - ItemProcessor, Map> processor = options.processor(evaluationContext()); + ImportProcessorArgs args = new ImportProcessorArgs(); + args.setFilter(Expression.parse("index<10")); + ItemProcessor, Map> processor = AbstractImportCommand + .processor(evaluationContext(), args); for (int index = 0; index < 100; index++) { Map map = new HashMap<>(); map.put("index", index);