From 7b7a06ee6e2c28cfd76aa929d60debe131900a7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Richard=20Boldi=C5=A1?= Date: Sat, 14 Sep 2024 21:11:50 +0200 Subject: [PATCH] feat: Add KeyValue and EvaluationContext args to compare --- .../src/main/java/com/redis/riot/Compare.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/plugins/riot/src/main/java/com/redis/riot/Compare.java b/plugins/riot/src/main/java/com/redis/riot/Compare.java index 9d92eae45..86f7f3b03 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Compare.java +++ b/plugins/riot/src/main/java/com/redis/riot/Compare.java @@ -1,7 +1,19 @@ package com.redis.riot; +import com.redis.riot.core.RiotUtils; +import com.redis.riot.function.StringKeyValue; +import com.redis.riot.function.ToStringKeyValue; +import com.redis.spring.batch.item.redis.common.KeyValue; +import com.redis.spring.batch.item.redis.reader.KeyComparisonItemReader; + +import io.lettuce.core.codec.ByteArrayCodec; + import org.springframework.batch.core.Job; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -14,6 +26,12 @@ public class Compare extends AbstractCompareCommand { @Option(names = "--quick", description = "Skip value comparison.") private boolean quick; + @ArgGroup(exclusive = false) + private final EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs(); + + @ArgGroup(exclusive = false, heading = "Processor options%n") + private final KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); + @Override protected boolean isQuickCompare() { return quick; @@ -29,6 +47,37 @@ protected Job job() { return job(compareStep()); } + private StandardEvaluationContext evaluationContext() { + log.info("Creating SpEL evaluation context with {}", evaluationContextArgs); + StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext(); + configure(evaluationContext); + return evaluationContext; + } + + private ItemProcessor, KeyValue> keyValueProcessor() { + StandardEvaluationContext evaluationContext = evaluationContext(); + log.info("Creating processor with {}", processorArgs); + ItemProcessor, KeyValue> processor = processorArgs + .processor(evaluationContext); + if (processor == null) { + return null; + } + ToStringKeyValue code = new ToStringKeyValue<>(ByteArrayCodec.INSTANCE); + StringKeyValue decode = new StringKeyValue<>(ByteArrayCodec.INSTANCE); + return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode)); + } + + private ItemProcessor, KeyValue> processor() { + return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor()); + } + + @Override + protected KeyComparisonItemReader compareReader() { + KeyComparisonItemReader reader = super.compareReader(); + reader.setProcessor(processor()); + return reader; + } + public boolean isCompareStreamMessageId() { return compareStreamMessageId; }