Skip to content

Commit

Permalink
refactor: Upgraded spring-batch-redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 11, 2024
1 parent 45d6254 commit df530d4
Show file tree
Hide file tree
Showing 23 changed files with 100 additions and 107 deletions.
14 changes: 5 additions & 9 deletions core/riot-core/src/main/java/com/redis/riot/core/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,9 @@ public Collection<Class<? extends Throwable>> getSkip() {

@Override
public String toString() {
return "Step [name=" + name + ", reader=" + reader + ", writer=" + writer + ", taskName=" + taskName
+ ", statusMessageSupplier=" + statusMessageSupplier + ", maxItemCountSupplier=" + maxItemCountSupplier
+ ", processor=" + processor + ", executionListeners=" + executionListeners + ", readListeners="
+ readListeners + ", writeListeners=" + writeListeners + ", live=" + live + ", flushInterval="
+ flushInterval + ", idleTimeout=" + idleTimeout + ", skip=" + skip + ", noSkip=" + noSkip + ", retry="
+ retry + ", noRetry=" + noRetry + "]";
}


return "Step [name=" + name + ", taskName=" + taskName + ", live=" + live + ", flushInterval=" + flushInterval
+ ", idleTimeout=" + idleTimeout + ", skip=" + skip + ", noSkip=" + noSkip + ", retry=" + retry
+ ", noRetry=" + noRetry + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,23 @@ public abstract class AbstractCompareCommand extends AbstractReplicateCommand {
@ArgGroup(exclusive = false, heading = "Processor options%n")
private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs();

protected ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor() {
protected ItemProcessor<KeyValue<byte[]>, KeyValue<byte[]>> processor() {
return RiotUtils.processor(keyValueFilter(), keyValueProcessor());
}

private KeyValueFilter<byte[], KeyValue<byte[], Object>> keyValueFilter() {
private KeyValueFilter<byte[], KeyValue<byte[]>> keyValueFilter() {
return new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log);
}

protected abstract boolean isStruct();

private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() {
private ItemProcessor<KeyValue<byte[]>, KeyValue<byte[]>> keyValueProcessor() {
if (isIgnoreStreamMessageId()) {
Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct");
}
StandardEvaluationContext evaluationContext = evaluationContext();
log.info("Creating processor with {}", processorArgs);
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processorArgs
.processor(evaluationContext);
ItemProcessor<KeyValue<String>, KeyValue<String>> processor = processorArgs.processor(evaluationContext);
if (processor == null) {
return null;
}
Expand Down Expand Up @@ -103,7 +102,7 @@ protected Step<KeyComparison<byte[]>, KeyComparison<byte[]>> compareStep() {
return step;
}

private RedisItemReader<byte[], byte[], Object> compareRedisReader() {
private RedisItemReader<byte[], byte[]> compareRedisReader() {
if (isQuickCompare()) {
log.info("Creating Redis quick compare reader");
return RedisItemReader.type(ByteArrayCodec.INSTANCE);
Expand All @@ -115,8 +114,8 @@ private RedisItemReader<byte[], byte[], Object> compareRedisReader() {
protected abstract boolean isQuickCompare();

protected KeyComparisonItemReader<byte[], byte[]> compareReader() {
RedisItemReader<byte[], byte[], Object> source = compareSourceReader();
RedisItemReader<byte[], byte[], Object> target = compareTargetReader();
RedisItemReader<byte[], byte[]> source = compareSourceReader();
RedisItemReader<byte[], byte[]> target = compareTargetReader();
KeyComparisonItemReader<byte[], byte[]> reader = new KeyComparisonItemReader<>(source, target);
reader.setComparator(keyComparator());
reader.setProcessor(processor());
Expand All @@ -137,20 +136,20 @@ protected boolean isIgnoreStreamMessageId() {
return !processorArgs.isPropagateIds();
}

private RedisItemReader<byte[], byte[], Object> compareSourceReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
private RedisItemReader<byte[], byte[]> compareSourceReader() {
RedisItemReader<byte[], byte[]> reader = compareRedisReader();
configureSourceRedisReader(reader);
return reader;
}

private RedisItemReader<byte[], byte[], Object> compareTargetReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
private RedisItemReader<byte[], byte[]> compareTargetReader() {
RedisItemReader<byte[], byte[]> reader = compareRedisReader();
configureTargetRedisReader(reader);
return reader;
}

@Override
protected void configureTargetRedisReader(RedisItemReader<?, ?, ?> reader) {
protected void configureTargetRedisReader(RedisItemReader<?, ?> reader) {
super.configureTargetRedisReader(reader);
log.info("Configuring target Redis reader with read-from {}", targetReadFrom);
reader.setReadFrom(targetReadFrom.getReadFrom());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected void configure(StandardEvaluationContext context) {
context.setVariable(VAR_SOURCE, sourceRedisContext.getConnection().sync());
}

protected void configureSourceRedisReader(RedisItemReader<?, ?, ?> reader) {
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
configureAsyncReader(reader);
sourceRedisContext.configure(reader);
log.info("Configuring source Redis reader with {}", sourceRedisReaderArgs);
Expand All @@ -57,17 +57,16 @@ protected void configureSourceRedisReader(RedisItemReader<?, ?, ?> reader) {

protected abstract RedisContext sourceRedisContext();

protected <O> Step<KeyValue<String, Object>, O> step(ItemWriter<O> writer) {
RedisItemReader<String, String, Object> reader = RedisItemReader.struct();
protected <O> Step<KeyValue<String>, O> step(ItemWriter<O> writer) {
RedisItemReader<String, String> reader = RedisItemReader.struct();
configureSourceRedisReader(reader);
Step<KeyValue<String, Object>, O> step = step(STEP_NAME, reader, writer);
Step<KeyValue<String>, O> step = step(STEP_NAME, reader, writer);
step.taskName(TASK_NAME);
return step;
}

protected <K, V, T, O> Step<KeyValue<K, T>, O> step(String name, RedisItemReader<K, V, T> reader,
ItemWriter<O> writer) {
Step<KeyValue<K, T>, O> step = new Step<>(name, reader, writer);
protected <K, V, T, O> Step<KeyValue<K>, O> step(String name, RedisItemReader<K, V> reader, ItemWriter<O> writer) {
Step<KeyValue<K>, O> step = new Step<>(name, reader, writer);
if (reader.getMode() != ReaderMode.LIVEONLY) {
log.info("Configuring step with scan size estimator");
step.maxItemCountSupplier(reader.scanSizeEstimator());
Expand Down Expand Up @@ -97,8 +96,8 @@ private void checkNotifyConfig(AbstractRedisClient client) {
log.info("Retrieved config {}: {}", NOTIFY_CONFIG, actual);
Set<Character> expected = characterSet(NOTIFY_CONFIG_VALUE);
Assert.isTrue(characterSet(actual).containsAll(expected),
String.format("Keyspace notifications not property configured: expected '%s' but was '%s'.",
NOTIFY_CONFIG_VALUE, actual));
String.format("Keyspace notifications not property configured. Expected %s '%s' but was '%s'.",
NOTIFY_CONFIG, NOTIFY_CONFIG_VALUE, actual));
}

private Set<Character> characterSet(String string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ protected RedisContext sourceRedisContext() {
}

@Override
protected void configureSourceRedisReader(RedisItemReader<?, ?, ?> reader) {
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
super.configureSourceRedisReader(reader);
log.info("Configuring Redis reader with poolSize {}", poolSize);
reader.setPoolSize(poolSize);
}

protected ItemProcessor<KeyValue<String, Object>, Map<String, Object>> mapProcessor() {
protected ItemProcessor<KeyValue<String>, Map<String, Object>> mapProcessor() {
KeyValueMap mapFunction = new KeyValueMap();
if (keyRegex != null) {
mapFunction.setKey(new RegexNamedGroupFunction(keyRegex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ protected void configure(StandardEvaluationContext context) {
}

@Override
protected void configureSourceRedisReader(RedisItemReader<?, ?, ?> reader) {
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
super.configureSourceRedisReader(reader);
log.info("Configuring source Redis reader with poolSize {}", sourceRedisArgs.getPoolSize());
reader.setPoolSize(sourceRedisArgs.getPoolSize());
}

protected void configureTargetRedisReader(RedisItemReader<?, ?, ?> reader) {
protected void configureTargetRedisReader(RedisItemReader<?, ?> reader) {
configureAsyncReader(reader);
targetRedisContext.configure(reader);
log.info("Configuring target Redis reader with poolSize {}", targetRedisArgs.getPoolSize());
Expand Down
9 changes: 4 additions & 5 deletions plugins/riot/src/main/java/com/redis/riot/FileExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private ContentType contentType(FileType fileType) {
}
}

private ItemProcessor<KeyValue<String, Object>, ?> processor(FileType fileType) {
private ItemProcessor<KeyValue<String>, ?> processor(FileType fileType) {
if (contentType(fileType) == ContentType.MAP) {
return mapProcessor();
}
Expand All @@ -102,17 +102,16 @@ private ContentType contentType(FileType fileType) {

@SuppressWarnings("unchecked")
private Map<String, Object> headerRecord(FileType fileType) {
RedisItemReader<String, String, Object> reader = RedisItemReader.struct();
RedisItemReader<String, String> reader = RedisItemReader.struct();
configureSourceRedisReader(reader);
try {
reader.open(new ExecutionContext());
try {
KeyValue<String, Object> keyValue = reader.read();
KeyValue<String> keyValue = reader.read();
if (keyValue == null) {
return Collections.emptyMap();
}
return ((ItemProcessor<KeyValue<String, Object>, Map<String, Object>>) processor(fileType))
.process(keyValue);
return ((ItemProcessor<KeyValue<String>, Map<String, Object>>) processor(fileType)).process(keyValue);
} catch (Exception e) {
throw new ItemStreamException("Could not read header record", e);
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/riot/src/main/java/com/redis/riot/FileImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected Job job() throws IOException {
Assert.isTrue(type != FileType.CSV, "CSV file import requires a Redis command");
Assert.isTrue(type != FileType.FIXED, "Fixed-length file import requires a Redis command");
ItemReader<KeyValue> reader = createReader(resource, type, KeyValue.class);
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct();
RedisItemWriter<String, String, KeyValue<String>> writer = RedisItemWriter.struct();
configureTargetRedisWriter(writer);
return new Step<>(name, reader, writer);
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/riot/src/main/java/com/redis/riot/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ protected Job job() {
if (StringUtils.hasLength(generateArgs.getIndex())) {
commands().ftCreate(generateArgs.getIndex(), indexCreateOptions(), indexFields());
}
Step<KeyValue<String, Object>, KeyValue<String, Object>> step = new Step<>(STEP_NAME, reader(), writer());
Step<KeyValue<String>, KeyValue<String>> step = new Step<>(STEP_NAME, reader(), writer());
step.taskName(TASK_NAME);
step.maxItemCount(generateArgs.getCount());
return job(step);
}

private RedisItemWriter<String, String, KeyValue<String, Object>> writer() {
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct();
private RedisItemWriter<String, String, KeyValue<String>> writer() {
RedisItemWriter<String, String, KeyValue<String>> writer = RedisItemWriter.struct();
configure(writer);
log.info("Configuring Redis writer with {}", redisWriterArgs);
redisWriterArgs.configure(writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ public KeyValueDeserializer() {
this(null);
}

public KeyValueDeserializer(Class<KeyValue<String, Object>> t) {
public KeyValueDeserializer(Class<KeyValue<String>> t) {
super(t);
}

@Override
public KeyValue<String, Object> deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
public KeyValue<String> deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonNode node = p.getCodec().readTree(p);
if (!node.has(KEY)) {
throw new InvalidFormatException(p, "No key field found", node, _valueClass);
}
KeyValue<String, Object> keyValue = new KeyValue<>();
KeyValue<String> keyValue = new KeyValue<>();
keyValue.setKey(node.get(KEY).asText());
JsonNode typeNode = node.get(TYPE);
if (typeNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import io.lettuce.core.codec.RedisCodec;

public class KeyValueFilter<K, T extends KeyValue<K, ?>> implements ItemProcessor<T, T> {
public class KeyValueFilter<K, T extends KeyValue<K>> implements ItemProcessor<T, T> {

private final Function<K, String> keyToString;
private final Logger log;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class KeyValueProcessorArgs {
@Option(names = "--stream-prune", description = "Drop empty streams.")
private boolean prune;

public ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor(EvaluationContext context) {
List<ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>>> processors = new ArrayList<>();
public ItemProcessor<KeyValue<String>, KeyValue<String>> processor(EvaluationContext context) {
List<ItemProcessor<KeyValue<String>, KeyValue<String>>> processors = new ArrayList<>();
if (keyExpression != null) {
processors.add(processor(t -> t.setKey(keyExpression.getValue(context, t))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public RedisContext(RedisURI uri, AbstractRedisClient client) {
this.connection = RedisModulesUtils.connection(client);
}

public void configure(RedisItemReader<?, ?, ?> reader) {
public void configure(RedisItemReader<?, ?> reader) {
reader.setClient(client);
reader.setDatabase(uri.getDatabase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.redis.spring.batch.item.AbstractPollableItemReader;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode;
import com.redis.spring.batch.item.redis.common.KeyEvent;
import com.redis.spring.batch.item.redis.common.KeyValue;
import com.redis.spring.batch.item.redis.reader.KeyValueRead;

import io.lettuce.core.codec.RedisCodec;
Expand Down Expand Up @@ -81,7 +81,7 @@ public class RedisReaderArgs {
@Option(names = "--read-poll", description = "Interval in millis between queue polls (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>", hidden = true)
private long pollTimeout = DEFAULT_POLL_TIMEOUT.toMillis();

public <K> void configure(RedisItemReader<K, ?, ?> reader) {
public <K> void configure(RedisItemReader<K, ?> reader) {
reader.setChunkSize(chunkSize);
reader.setFlushInterval(Duration.ofMillis(flushInterval));
if (idleTimeout > 0) {
Expand All @@ -107,8 +107,8 @@ public <K> void configure(RedisItemReader<K, ?, ?> reader) {
}
}

private <K> ItemProcessor<KeyEvent<K>, KeyEvent<K>> keyProcessor(RedisCodec<K, ?> codec, KeyFilterArgs args) {
return args.predicate(codec).map(p -> new FunctionPredicate<KeyEvent<K>, K>(KeyEvent::getKey, p))
private <K> ItemProcessor<KeyValue<K>, KeyValue<K>> keyProcessor(RedisCodec<K, ?> codec, KeyFilterArgs args) {
return args.predicate(codec).map(p -> new FunctionPredicate<KeyValue<K>, K>(KeyValue::getKey, p))
.map(PredicateOperator::new).map(FunctionItemProcessor::new).orElse(null);
}

Expand Down
22 changes: 11 additions & 11 deletions plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected boolean isQuickCompare() {
@Override
protected Job job() {
List<Step<?, ?>> steps = new ArrayList<>();
Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> replicateStep = step();
Step<KeyValue<byte[]>, KeyValue<byte[]>> replicateStep = step();
steps.add(replicateStep);
if (shouldCompare()) {
steps.add(compareStep());
Expand All @@ -68,21 +68,21 @@ protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
targetRedisWriterArgs.configure(writer);
}

private Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> step() {
RedisItemReader<byte[], byte[], Object> reader = reader();
private Step<KeyValue<byte[]>, KeyValue<byte[]>> step() {
RedisItemReader<byte[], byte[]> reader = reader();
configureSourceRedisReader(reader);
RedisItemWriter<byte[], byte[], KeyValue<byte[], Object>> writer = writer();
RedisItemWriter<byte[], byte[], KeyValue<byte[]>> writer = writer();
configureTargetRedisWriter(writer);
Step<KeyValue<byte[], Object>, KeyValue<byte[], Object>> step = step(STEP_NAME, reader, writer);
Step<KeyValue<byte[]>, KeyValue<byte[]>> step = step(STEP_NAME, reader, writer);
step.processor(processor());
step.taskName(taskName(reader));
step.writeListener(new ReplicateLagWriteListener());
if (reader.getMode() != ReaderMode.SCAN) {
step.statusMessageSupplier(() -> liveExtraMessage(reader));
}
if (logKeys) {
log.info("Adding key logger");
ReplicateWriteLogger<byte[], Object> writeLogger = new ReplicateWriteLogger<>(log, reader.getCodec());
step.writeListener(writeLogger);
step.writeListener(new ReplicateWriteLogger<>(log, reader.getCodec()));
ReplicateReadLogger<byte[]> readLogger = new ReplicateReadLogger<>(log, reader.getCodec());
reader.addItemReadListener(readLogger);
reader.addItemWriteListener(readLogger);
Expand All @@ -95,7 +95,7 @@ private boolean shouldCompare() {
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private RedisItemReader<byte[], byte[], Object> reader() {
private RedisItemReader<byte[], byte[]> reader() {
if (struct) {
log.info("Creating Redis data-structure reader");
return RedisItemReader.struct(ByteArrayCodec.INSTANCE);
Expand All @@ -105,7 +105,7 @@ private RedisItemReader<byte[], byte[], Object> reader() {
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private RedisItemWriter<byte[], byte[], KeyValue<byte[], Object>> writer() {
private RedisItemWriter<byte[], byte[], KeyValue<byte[]>> writer() {
if (struct) {
log.info("Creating Redis data-structure writer");
return RedisItemWriter.struct(ByteArrayCodec.INSTANCE);
Expand All @@ -114,7 +114,7 @@ private RedisItemWriter<byte[], byte[], KeyValue<byte[], Object>> writer() {
return (RedisItemWriter) RedisItemWriter.dump();
}

private String taskName(RedisItemReader<?, ?, ?> reader) {
private String taskName(RedisItemReader<?, ?> reader) {
switch (reader.getMode()) {
case SCAN:
return SCAN_TASK_NAME;
Expand All @@ -126,7 +126,7 @@ private String taskName(RedisItemReader<?, ?, ?> reader) {
}

@SuppressWarnings("rawtypes")
private String liveExtraMessage(RedisItemReader<?, ?, ?> reader) {
private String liveExtraMessage(RedisItemReader<?, ?> reader) {
KeyNotificationItemReader keyReader = (KeyNotificationItemReader) reader.getReader();
if (keyReader == null || keyReader.getQueue() == null) {
return "";
Expand Down
Loading

0 comments on commit df530d4

Please sign in to comment.