Skip to content

Commit

Permalink
fix: Removed duplicated key in stream dumps
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Jun 14, 2024
1 parent d7421b8 commit b058a10
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 105 deletions.
111 changes: 111 additions & 0 deletions plugins/riot/src/main/java/com/redis/riot/AbstractCompareCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.redis.riot;

import java.time.Duration;
import java.util.Collection;

import com.redis.riot.CompareStatusItemWriter.StatusCount;
import com.redis.riot.core.Step;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.reader.DefaultKeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparison;
import com.redis.spring.batch.item.redis.reader.KeyComparisonItemReader;

import io.lettuce.core.codec.ByteArrayCodec;
import picocli.CommandLine.Option;

public abstract class AbstractCompareCommand extends AbstractTargetCommand {

public static final Duration DEFAULT_TTL_TOLERANCE = DefaultKeyComparator.DEFAULT_TTL_TOLERANCE;
public static final String COMPARE_STEP_NAME = "compare";
public static final boolean DEFAULT_COMPARE_STREAM_MESSAGE_ID = true;

private static final String COMPARE_TASK_NAME = "Comparing";
private static final String STATUS_DELIMITER = " | ";

@Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.")
private boolean showDiffs;

@Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private long ttlToleranceMillis = DEFAULT_TTL_TOLERANCE.toMillis();

private String compareMessage(Collection<StatusCount> counts) {
StringBuilder builder = new StringBuilder();
counts.stream().map(CompareStepListener::toString).forEach(s -> builder.append(STATUS_DELIMITER).append(s));
return builder.toString();
}

protected Step<KeyComparison<byte[]>, KeyComparison<byte[]>> compareStep() {
KeyComparisonItemReader<byte[], byte[]> reader = compareReader();
CompareStatusItemWriter<byte[]> writer = new CompareStatusItemWriter<>();
Step<KeyComparison<byte[]>, KeyComparison<byte[]>> step = new Step<>(COMPARE_STEP_NAME, reader, writer);
step.taskName(COMPARE_TASK_NAME);
step.statusMessageSupplier(() -> compareMessage(writer.getMismatches()));
step.maxItemCountSupplier(RedisScanSizeEstimator.from(reader.getSourceReader()));
if (showDiffs) {
log.info("Adding key diff logger");
step.writeListener(new CompareLoggingWriteListener<>(ByteArrayCodec.INSTANCE));
}
step.executionListener(new CompareStepListener(writer));
return step;
}

private RedisItemReader<byte[], byte[], Object> compareRedisReader() {
if (isQuickCompare()) {
return RedisItemReader.type(ByteArrayCodec.INSTANCE);
}
return RedisItemReader.struct(ByteArrayCodec.INSTANCE);
}

protected abstract boolean isQuickCompare();

protected KeyComparisonItemReader<byte[], byte[]> compareReader() {
RedisItemReader<byte[], byte[], Object> source = compareSourceReader();
RedisItemReader<byte[], byte[], Object> target = compareTargetReader();
KeyComparisonItemReader<byte[], byte[]> reader = new KeyComparisonItemReader<>(source, target);
reader.setComparator(keyComparator());
return reader;
}

private KeyComparator<byte[]> keyComparator() {
boolean ignoreStreamId = isIgnoreStreamMessageId();
Duration ttlTolerance = Duration.ofMillis(ttlToleranceMillis);
log.info("Creating KeyComparator with ttlTolerance={} ignoreStreamMessageId={}", ttlTolerance, ignoreStreamId);
DefaultKeyComparator<byte[], byte[]> comparator = new DefaultKeyComparator<>(ByteArrayCodec.INSTANCE);
comparator.setIgnoreStreamMessageId(ignoreStreamId);
comparator.setTtlTolerance(ttlTolerance);
return comparator;
}

protected abstract boolean isIgnoreStreamMessageId();

private RedisItemReader<byte[], byte[], Object> compareSourceReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
configure(reader);
return reader;
}

private RedisItemReader<byte[], byte[], Object> compareTargetReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
reader.setClient(targetRedisURIClient.getClient());
reader.setDatabase(targetRedisURIClient.getUri().getDatabase());
return reader;
}

public boolean isShowDiffs() {
return showDiffs;
}

public void setShowDiffs(boolean showDiffs) {
this.showDiffs = showDiffs;
}

public long getTtlToleranceMillis() {
return ttlToleranceMillis;
}

public void setTtlToleranceMillis(long tolerance) {
this.ttlToleranceMillis = tolerance;
}

}
103 changes: 1 addition & 102 deletions plugins/riot/src/main/java/com/redis/riot/AbstractTargetCommand.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
package com.redis.riot;

import java.time.Duration;
import java.util.Collection;

import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.redis.riot.CompareStatusItemWriter.StatusCount;
import com.redis.riot.RedisClientBuilder.RedisURIClient;
import com.redis.riot.core.Step;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.RedisItemWriter;
import com.redis.spring.batch.item.redis.reader.DefaultKeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparison;
import com.redis.spring.batch.item.redis.reader.KeyComparisonItemReader;

import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.ByteArrayCodec;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

public abstract class AbstractTargetCommand extends AbstractRedisCommand {

public static final Duration DEFAULT_TTL_TOLERANCE = DefaultKeyComparator.DEFAULT_TTL_TOLERANCE;
public static final int DEFAULT_TARGET_POOL_SIZE = RedisItemReader.DEFAULT_POOL_SIZE;
public static final String COMPARE_STEP_NAME = "compare";
public static final boolean DEFAULT_COMPARE_STREAM_MESSAGE_ID = true;

private static final String SOURCE_VAR = "source";
private static final String TARGET_VAR = "target";
private static final String COMPARE_TASK_NAME = "Comparing";
private static final String STATUS_DELIMITER = " | ";

@Parameters(arity = "1", index = "0", description = "Source server URI.", paramLabel = "SOURCE")
private RedisURI sourceRedisURI;
Expand All @@ -48,13 +32,7 @@ public abstract class AbstractTargetCommand extends AbstractRedisCommand {
@ArgGroup(exclusive = false)
private RedisReaderArgs redisReaderArgs = new RedisReaderArgs();

@Option(names = "--show-diffs", description = "Print details of key mismatches during dataset verification. Disables progress reporting.")
private boolean showDiffs;

@Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private long ttlToleranceMillis = DEFAULT_TTL_TOLERANCE.toMillis();

private RedisURIClient targetRedisURIClient;
protected RedisURIClient targetRedisURIClient;

protected <T extends RedisItemWriter<?, ?, ?>> T configure(T writer) {
writer.setClient(targetRedisURIClient.getClient());
Expand All @@ -69,12 +47,6 @@ protected RedisURIClient redisURIClient() {
return builder.build();
}

private String compareMessage(Collection<StatusCount> counts) {
StringBuilder builder = new StringBuilder();
counts.stream().map(CompareStepListener::toString).forEach(s -> builder.append(STATUS_DELIMITER).append(s));
return builder.toString();
}

@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Expand Down Expand Up @@ -106,70 +78,13 @@ protected StandardEvaluationContext evaluationContext(ProcessorArgs args) {
return context;
}

protected Step<KeyComparison<byte[]>, KeyComparison<byte[]>> compareStep() {
KeyComparisonItemReader<byte[], byte[]> reader = compareReader();
CompareStatusItemWriter<byte[]> writer = new CompareStatusItemWriter<>();
Step<KeyComparison<byte[]>, KeyComparison<byte[]>> step = new Step<>(COMPARE_STEP_NAME, reader, writer);
step.taskName(COMPARE_TASK_NAME);
step.statusMessageSupplier(() -> compareMessage(writer.getMismatches()));
step.maxItemCountSupplier(RedisScanSizeEstimator.from(reader.getSourceReader()));
if (showDiffs) {
log.info("Adding key diff logger");
step.writeListener(new CompareLoggingWriteListener<>(ByteArrayCodec.INSTANCE));
}
step.executionListener(new CompareStepListener(writer));
return step;
}

private RedisItemReader<byte[], byte[], Object> compareRedisReader() {
if (isQuickCompare()) {
return RedisItemReader.type(ByteArrayCodec.INSTANCE);
}
return RedisItemReader.struct(ByteArrayCodec.INSTANCE);
}

protected abstract boolean isQuickCompare();

@Override
protected <K, V, T> RedisItemReader<K, V, T> configure(RedisItemReader<K, V, T> reader) {
log.info("Configuring Redis reader with {}", redisReaderArgs);
redisReaderArgs.configure(reader);
return super.configure(reader);
}

protected KeyComparisonItemReader<byte[], byte[]> compareReader() {
RedisItemReader<byte[], byte[], Object> source = compareSourceReader();
RedisItemReader<byte[], byte[], Object> target = compareTargetReader();
KeyComparisonItemReader<byte[], byte[]> reader = new KeyComparisonItemReader<>(source, target);
reader.setComparator(keyComparator());
return reader;
}

private KeyComparator<byte[]> keyComparator() {
boolean ignoreStreamId = isIgnoreStreamMessageId();
Duration ttlTolerance = Duration.ofMillis(ttlToleranceMillis);
log.info("Creating KeyComparator with ttlTolerance={} ignoreStreamMessageId={}", ttlTolerance, ignoreStreamId);
DefaultKeyComparator<byte[], byte[]> comparator = new DefaultKeyComparator<>(ByteArrayCodec.INSTANCE);
comparator.setIgnoreStreamMessageId(ignoreStreamId);
comparator.setTtlTolerance(ttlTolerance);
return comparator;
}

protected abstract boolean isIgnoreStreamMessageId();

private RedisItemReader<byte[], byte[], Object> compareSourceReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
configure(reader);
return reader;
}

private RedisItemReader<byte[], byte[], Object> compareTargetReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
reader.setClient(targetRedisURIClient.getClient());
reader.setDatabase(targetRedisURIClient.getUri().getDatabase());
return reader;
}

public RedisReaderArgs getRedisReaderArgs() {
return redisReaderArgs;
}
Expand Down Expand Up @@ -210,20 +125,4 @@ public void setTargetRedisURI(RedisURI targetRedisURI) {
this.targetRedisURI = targetRedisURI;
}

public boolean isShowDiffs() {
return showDiffs;
}

public void setShowDiffs(boolean showDiffs) {
this.showDiffs = showDiffs;
}

public long getTtlToleranceMillis() {
return ttlToleranceMillis;
}

public void setTtlToleranceMillis(long tolerance) {
this.ttlToleranceMillis = tolerance;
}

}
2 changes: 1 addition & 1 deletion plugins/riot/src/main/java/com/redis/riot/Compare.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import picocli.CommandLine.Option;

@Command(name = "compare", description = "Compare two Redis databases.")
public class Compare extends AbstractTargetCommand {
public class Compare extends AbstractCompareCommand {

@Option(names = "--stream-msg-id", description = "Compare stream message ids. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
private boolean compareStreamMessageId = DEFAULT_COMPARE_STREAM_MESSAGE_ID;
Expand Down
2 changes: 1 addition & 1 deletion plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import picocli.CommandLine.Option;

@Command(name = "replicate", description = "Replicate a Redis database into another Redis database.")
public class Replicate extends AbstractTargetCommand {
public class Replicate extends AbstractCompareCommand {

public enum CompareMode {
FULL, QUICK, NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private XAddArgs xAddArgs() {

@Override
public Xadd<String, String, Map<String, Object>> operation() {
Xadd<String, String, Map<String, Object>> operation = new Xadd<>(messageFunction());
Xadd<String, String, Map<String, Object>> operation = new Xadd<>(keyFunction(), messageFunction());
operation.setArgs(xAddArgs());
return operation;
}
Expand Down

0 comments on commit b058a10

Please sign in to comment.