From 5535c507e0b507d345dc59cab6660b6313d7ae1f Mon Sep 17 00:00:00 2001 From: jruaux Date: Wed, 24 Apr 2024 10:00:50 -0700 Subject: [PATCH] deps: Upgraded spring batch redis --- .../com/redis/riot/db/DatabaseImport.java | 4 +- .../com/redis/riot/faker/FakerImport.java | 16 +- .../com/redis/riot/file/FileDumpExport.java | 3 +- .../com/redis/riot/file/FileDumpImport.java | 9 +- .../java/com/redis/riot/file/FileImport.java | 4 +- .../redis/riot/file/KeyValueDeserializer.java | 19 +- .../redis/riot/file/AbstractFileTests.java | 108 ++--- .../com/redis/riot/file/JsonSerdeTests.java | 4 +- .../redis/riot/file/XmlItemWriterTests.java | 6 +- .../com/redis/riot/redis/GeneratorImport.java | 16 +- .../riot/redis/KeyComparisonDiffLogger.java | 20 +- .../main/java/com/redis/riot/redis/Ping.java | 69 ++- .../com/redis/riot/redis/Replication.java | 164 +++++--- .../redis/riot/redis/ReplicationTests.java | 20 +- .../com/redis/riot/core/AbstractExport.java | 137 +++--- .../com/redis/riot/core/AbstractImport.java | 69 +-- .../redis/riot/core/AbstractMapExport.java | 3 +- .../redis/riot/core/AbstractMapImport.java | 70 ++++ .../riot/core/AbstractRedisCallable.java | 92 ++++ .../riot/core/AbstractRedisRunnable.java | 41 -- ...cutable.java => AbstractRiotCallable.java} | 30 +- .../redis/riot/core/AbstractStructImport.java | 21 - .../com/redis/riot/core/KeyFilterOptions.java | 6 + .../redis/riot/core/LoggingWriteListener.java | 21 - .../redis/riot/core/RedisReaderOptions.java | 64 +-- .../redis/riot/core/RedisWriterOptions.java | 20 + .../java/com/redis/riot/core/RiotUtils.java | 54 --- .../riot/core/function/CompositeOperator.java | 27 ++ .../core/function/DropStreamMessageId.java | 29 ++ .../function/DropStreamMessageIdFunction.java | 34 -- .../core/function/ExpressionFunction.java | 47 +-- .../riot/core/function/KeyValueOperator.java | 42 -- .../core/function/LongExpressionFunction.java | 22 +- .../core/function/StringKeyValueFunction.java | 6 +- .../core/function/StructToMapFunction.java | 32 +- .../function/ToStringKeyValueFunction.java | 6 +- .../riot/core/operation/ExpireAtBuilder.java | 2 +- .../com/redis/riot/core/FunctionTests.java | 6 +- .../com/redis/riot/core/ProcessorTests.java | 28 +- gradle.properties | 2 +- .../redis/riot/cli/AbstractExportCommand.java | 46 +- .../redis/riot/cli/AbstractImportCommand.java | 14 +- .../redis/riot/cli/AbstractMainCommand.java | 12 +- .../redis/riot/cli/AbstractRiotCommand.java | 44 +- .../riot/cli/AbstractStructImportCommand.java | 10 +- .../java/com/redis/riot/cli/BaseCommand.java | 2 +- .../main/java/com/redis/riot/cli/DbArgs.java | 72 +++- .../java/com/redis/riot/cli/DbExportArgs.java | 18 - .../com/redis/riot/cli/DbExportCommand.java | 52 ++- .../java/com/redis/riot/cli/DbImportArgs.java | 27 -- .../com/redis/riot/cli/DbImportCommand.java | 99 ++++- .../java/com/redis/riot/cli/FileArgs.java | 124 ++++-- .../redis/riot/cli/FileDumpExportArgs.java | 29 -- .../redis/riot/cli/FileDumpExportCommand.java | 93 ++++- .../redis/riot/cli/FileDumpImportArgs.java | 18 - .../redis/riot/cli/FileDumpImportCommand.java | 35 +- .../com/redis/riot/cli/FileImportArgs.java | 55 --- .../com/redis/riot/cli/FileImportCommand.java | 205 ++++++++- .../redis/riot/cli/KeyValueProcessorArgs.java | 88 ++-- .../java/com/redis/riot/cli/LoggingMixin.java | 2 +- .../java/com/redis/riot/cli/RedisArgs.java | 392 +++++++++++++----- .../com/redis/riot/cli/RedisReaderArgs.java | 157 ++++--- .../com/redis/riot/cli/RedisWriterArgs.java | 88 ++-- .../com/redis/riot/cli/ReplicateCommand.java | 129 +++++- .../com/redis/riot/cli/AbstractDbTests.java | 10 +- .../riot/cli/AbstractReplicationTests.java | 21 +- .../redis/riot/cli/AbstractRiotTestBase.java | 20 +- .../cli/StackToStackIntegrationTests.java | 20 +- .../test/resources/replicate-live-only-struct | 1 + 69 files changed, 2019 insertions(+), 1237 deletions(-) create mode 100644 core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java create mode 100644 core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisCallable.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisRunnable.java rename core/riot-core/src/main/java/com/redis/riot/core/{AbstractExecutable.java => AbstractRiotCallable.java} (86%) delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/AbstractStructImport.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/LoggingWriteListener.java create mode 100644 core/riot-core/src/main/java/com/redis/riot/core/function/CompositeOperator.java create mode 100644 core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageId.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java delete mode 100644 core/riot-core/src/main/java/com/redis/riot/core/function/KeyValueOperator.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/cli/DbExportArgs.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/cli/DbImportArgs.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/cli/FileDumpExportArgs.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/cli/FileDumpImportArgs.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/cli/FileImportArgs.java create mode 100644 plugins/riot/src/test/resources/replicate-live-only-struct diff --git a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java index 493aa21f7..8c5283c3f 100644 --- a/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java +++ b/connectors/riot-db/src/main/java/com/redis/riot/db/DatabaseImport.java @@ -8,9 +8,9 @@ import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; import org.springframework.jdbc.core.ColumnMapRowMapper; -import com.redis.riot.core.AbstractImport; +import com.redis.riot.core.AbstractMapImport; -public class DatabaseImport extends AbstractImport { +public class DatabaseImport extends AbstractMapImport { public static final int DEFAULT_FETCH_SIZE = AbstractCursorItemReader.VALUE_NOT_SET; public static final int DEFAULT_MAX_RESULT_SET_ROWS = AbstractCursorItemReader.VALUE_NOT_SET; diff --git a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java index 3e8cc2bf2..94b5bbf46 100644 --- a/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java +++ b/connectors/riot-faker/src/main/java/com/redis/riot/faker/FakerImport.java @@ -8,16 +8,14 @@ import org.springframework.expression.Expression; import org.springframework.util.Assert; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.api.sync.RediSearchCommands; import com.redis.lettucemod.search.Field; import com.redis.lettucemod.search.IndexInfo; import com.redis.lettucemod.util.RedisModulesUtils; -import com.redis.riot.core.AbstractImport; +import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.RiotUtils; import com.redis.spring.batch.gen.Range; -public class FakerImport extends AbstractImport { +public class FakerImport extends AbstractMapImport { public static final int DEFAULT_COUNT = 1000; public static final Locale DEFAULT_LOCALE = Locale.ENGLISH; @@ -84,13 +82,9 @@ private Map fields() { private Map searchIndexFields() { Map searchFields = new LinkedHashMap<>(); - try (StatefulRedisModulesConnection connection = RedisModulesUtils - .connection(getRedisClient())) { - RediSearchCommands commands = connection.sync(); - IndexInfo info = RedisModulesUtils.indexInfo(commands.ftInfo(searchIndex)); - for (Field field : info.getFields()) { - searchFields.put(field.getName(), RiotUtils.parse(expression(field))); - } + IndexInfo info = RedisModulesUtils.indexInfo(redisCommands.ftInfo(searchIndex)); + for (Field field : info.getFields()) { + searchFields.put(field.getName(), RiotUtils.parse(expression(field))); } return searchFields; } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java index fc20a3c74..474e4fac5 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpExport.java @@ -125,8 +125,7 @@ private JsonObjectMarshaller> xmlMarshaller() { @Override protected Job job() { RedisItemReader> reader = RedisItemReader.struct(); - reader.setClient(getRedisClient()); - configureReader(reader); + configure(reader); ItemProcessor, KeyValue> processor = processor(StringCodec.UTF8); return jobBuilder().start(step(getName(), reader, writer()).processor(processor).build()).build(); } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java index 74730617c..982ba114d 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileDumpImport.java @@ -11,10 +11,11 @@ import org.springframework.batch.item.ItemReader; import org.springframework.core.io.Resource; -import com.redis.riot.core.AbstractStructImport; +import com.redis.riot.core.AbstractImport; import com.redis.spring.batch.KeyValue; +import com.redis.spring.batch.RedisItemWriter; -public class FileDumpImport extends AbstractStructImport { +public class FileDumpImport extends AbstractImport { private List files; @@ -46,7 +47,9 @@ protected Job job() { } List steps = new ArrayList<>(); for (Resource resource : resources) { - steps.add(step(resource.getFilename(), reader(resource), writer()).build()); + RedisItemWriter> writer = RedisItemWriter.struct(); + configure(writer); + steps.add(step(resource.getFilename(), reader(resource), writer).build()); } Iterator iterator = steps.iterator(); SimpleJobBuilder job = jobBuilder().start(iterator.next()); diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java index 74f761cd5..698f261c4 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileImport.java @@ -35,11 +35,11 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; -import com.redis.riot.core.AbstractImport; +import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.RiotUtils; import com.redis.riot.core.function.RegexNamedGroupFunction; -public class FileImport extends AbstractImport { +public class FileImport extends AbstractMapImport { public static final String DEFAULT_CONTINUATION_STRING = "\\"; public static final Character DEFAULT_QUOTE_CHARACTER = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER; diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/KeyValueDeserializer.java b/connectors/riot-file/src/main/java/com/redis/riot/file/KeyValueDeserializer.java index c7faafd3a..91ad2f9e4 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/KeyValueDeserializer.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/KeyValueDeserializer.java @@ -6,8 +6,6 @@ import java.util.Map; import java.util.Set; -import org.springframework.util.StringUtils; - import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonNode; @@ -17,7 +15,7 @@ import com.fasterxml.jackson.databind.node.LongNode; import com.redis.lettucemod.timeseries.Sample; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; +import com.redis.spring.batch.KeyValue.DataType; import io.lettuce.core.ScoredValue; import io.lettuce.core.StreamMessage; @@ -55,10 +53,7 @@ public KeyValue deserialize(JsonParser p, DeserializationContext } JsonNode typeNode = node.get(TYPE); if (typeNode != null) { - String typeString = typeNode.asText(); - if (StringUtils.hasLength(typeString)) { - keyValue.setType(Type.valueOf(typeString.toUpperCase())); - } + keyValue.setType(typeNode.asText()); } LongNode ttlNode = (LongNode) node.get(TTL); if (ttlNode != null) { @@ -68,11 +63,17 @@ public KeyValue deserialize(JsonParser p, DeserializationContext if (memUsageNode != null) { keyValue.setMem(memUsageNode.asLong()); } - keyValue.setValue(value(keyValue.getType(), node.get(VALUE), ctxt)); + JsonNode valueNode = node.get(VALUE); + if (valueNode != null) { + DataType type = KeyValue.type(keyValue); + if (type != null) { + keyValue.setValue(value(type, valueNode, ctxt)); + } + } return keyValue; } - private Object value(Type type, JsonNode node, DeserializationContext ctxt) throws IOException { + private Object value(DataType type, JsonNode node, DeserializationContext ctxt) throws IOException { switch (type) { case STREAM: return streamMessages((ArrayNode) node, ctxt); diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/AbstractFileTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/AbstractFileTests.java index b6a944ef6..d84eb4507 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/AbstractFileTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/AbstractFileTests.java @@ -30,15 +30,17 @@ abstract class AbstractFileTests extends AbstractTestBase { @SuppressWarnings("unchecked") @Test void fileImportJSON(TestInfo info) throws Exception { - FileImport executable = new FileImport(); - executable.setRedisClientOptions(redisClientOptions()); - executable.setFiles(BEERS_JSON_URL); - HsetBuilder hsetBuilder = new HsetBuilder(); - hsetBuilder.setKeyspace(KEYSPACE); - hsetBuilder.setKeyFields(ID); - executable.setOperations(hsetBuilder.build()); - executable.setName(name(info)); - executable.execute(); + try (FileImport executable = new FileImport()) { + executable.setRedisClientOptions(redisClientOptions()); + executable.setFiles(BEERS_JSON_URL); + HsetBuilder hsetBuilder = new HsetBuilder(); + hsetBuilder.setKeyspace(KEYSPACE); + hsetBuilder.setKeyFields(ID); + executable.setOperations(hsetBuilder.build()); + executable.setName(name(info)); + executable.afterPropertiesSet(); + executable.call(); + } List keys = redisCommands.keys("*"); assertEquals(216, keys.size()); for (String key : keys) { @@ -60,16 +62,18 @@ private RedisClientOptions redisClientOptions() { @SuppressWarnings("unchecked") @Test void fileApiImportCSV(TestInfo info) throws Exception { - FileImport executable = new FileImport(); - executable.setRedisClientOptions(redisClientOptions()); - executable.setFiles("https://storage.googleapis.com/jrx/beers.csv"); - executable.setHeader(true); - executable.setName(name(info)); - HsetBuilder hsetBuilder = new HsetBuilder(); - hsetBuilder.setKeyspace(KEYSPACE); - hsetBuilder.setKeyFields(ID); - executable.setOperations(hsetBuilder.build()); - executable.execute(); + try (FileImport executable = new FileImport()) { + executable.setRedisClientOptions(redisClientOptions()); + executable.setFiles("https://storage.googleapis.com/jrx/beers.csv"); + executable.setHeader(true); + executable.setName(name(info)); + HsetBuilder hsetBuilder = new HsetBuilder(); + hsetBuilder.setKeyspace(KEYSPACE); + hsetBuilder.setKeyFields(ID); + executable.setOperations(hsetBuilder.build()); + executable.afterPropertiesSet(); + executable.call(); + } List keys = redisCommands.keys("*"); assertEquals(2410, keys.size()); for (String key : keys) { @@ -87,16 +91,18 @@ void fileApiFileExpansion(TestInfo info) throws Exception { IOUtils.copy(getClass().getClassLoader().getResourceAsStream("beers1.csv"), new FileOutputStream(file1)); File file2 = temp.resolve("beers2.csv").toFile(); IOUtils.copy(getClass().getClassLoader().getResourceAsStream("beers2.csv"), new FileOutputStream(file2)); - FileImport executable = new FileImport(); - executable.setRedisClientOptions(redisClientOptions()); - executable.setFiles(temp.resolve("*.csv").toFile().getPath()); - executable.setHeader(true); - executable.setName(name(info)); - HsetBuilder hsetBuilder = new HsetBuilder(); - hsetBuilder.setKeyspace(KEYSPACE); - hsetBuilder.setKeyFields(ID); - executable.setOperations(hsetBuilder.build()); - executable.execute(); + try (FileImport executable = new FileImport()) { + executable.setRedisClientOptions(redisClientOptions()); + executable.setFiles(temp.resolve("*.csv").toFile().getPath()); + executable.setHeader(true); + executable.setName(name(info)); + HsetBuilder hsetBuilder = new HsetBuilder(); + hsetBuilder.setKeyspace(KEYSPACE); + hsetBuilder.setKeyFields(ID); + executable.setOperations(hsetBuilder.build()); + executable.afterPropertiesSet(); + executable.call(); + } List keys = redisCommands.keys("*"); assertEquals(2410, keys.size()); for (String key : keys) { @@ -109,17 +115,19 @@ void fileApiFileExpansion(TestInfo info) throws Exception { @SuppressWarnings("unchecked") @Test void fileImportCSVMultiThreaded(TestInfo info) throws Exception { - FileImport executable = new FileImport(); - executable.setRedisClientOptions(redisClientOptions()); - executable.setFiles("https://storage.googleapis.com/jrx/beers.csv"); - executable.setHeader(true); - executable.setThreads(3); - executable.setName(name(info)); - HsetBuilder hset = new HsetBuilder(); - hset.setKeyspace(KEYSPACE); - hset.setKeyFields(ID); - executable.setOperations(hset.build()); - executable.execute(); + try (FileImport executable = new FileImport()) { + executable.setRedisClientOptions(redisClientOptions()); + executable.setFiles("https://storage.googleapis.com/jrx/beers.csv"); + executable.setHeader(true); + executable.setThreads(3); + executable.setName(name(info)); + HsetBuilder hset = new HsetBuilder(); + hset.setKeyspace(KEYSPACE); + hset.setKeyFields(ID); + executable.setOperations(hset.build()); + executable.afterPropertiesSet(); + executable.call(); + } List keys = redisCommands.keys("*"); assertEquals(2410, keys.size()); for (String key : keys) { @@ -132,15 +140,17 @@ void fileImportCSVMultiThreaded(TestInfo info) throws Exception { @SuppressWarnings("unchecked") @Test void fileImportJSONL(TestInfo info) throws Exception { - FileImport executable = new FileImport(); - executable.setRedisClientOptions(redisClientOptions()); - executable.setFiles(BEERS_JSONL_URL); - HsetBuilder hsetBuilder = new HsetBuilder(); - hsetBuilder.setKeyspace(KEYSPACE); - hsetBuilder.setKeyFields(ID); - executable.setOperations(hsetBuilder.build()); - executable.setName(name(info)); - executable.execute(); + try (FileImport executable = new FileImport()) { + executable.setRedisClientOptions(redisClientOptions()); + executable.setFiles(BEERS_JSONL_URL); + HsetBuilder hsetBuilder = new HsetBuilder(); + hsetBuilder.setKeyspace(KEYSPACE); + hsetBuilder.setKeyFields(ID); + executable.setOperations(hsetBuilder.build()); + executable.setName(name(info)); + executable.afterPropertiesSet(); + executable.call(); + } List keys = redisCommands.keys("*"); assertEquals(6, keys.size()); for (String key : keys) { diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/JsonSerdeTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/JsonSerdeTests.java index d4e3b999a..381451671 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/JsonSerdeTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/JsonSerdeTests.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.node.DoubleNode; import com.redis.lettucemod.timeseries.Sample; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; +import com.redis.spring.batch.KeyValue.DataType; import com.redis.spring.batch.gen.GeneratorItemReader; import com.redis.spring.batch.gen.ItemToKeyValueFunction; import com.redis.spring.batch.test.AbstractTestBase; @@ -59,7 +59,7 @@ void serialize() throws JsonProcessingException { ts.setKey(key); ts.setMem(memoryUsage); ts.setTtl(ttl); - ts.setType(Type.TIMESERIES); + ts.setType(DataType.TIMESERIES.getString()); Sample sample1 = Sample.of(Instant.now().toEpochMilli(), 123.456); Sample sample2 = Sample.of(Instant.now().toEpochMilli() + 1000, 456.123); ts.setValue(Arrays.asList(sample1, sample2)); diff --git a/connectors/riot-file/src/test/java/com/redis/riot/file/XmlItemWriterTests.java b/connectors/riot-file/src/test/java/com/redis/riot/file/XmlItemWriterTests.java index aba16d888..a752bb2aa 100644 --- a/connectors/riot-file/src/test/java/com/redis/riot/file/XmlItemWriterTests.java +++ b/connectors/riot-file/src/test/java/com/redis/riot/file/XmlItemWriterTests.java @@ -17,7 +17,7 @@ import com.redis.riot.file.resource.XmlResourceItemWriter; import com.redis.riot.file.resource.XmlResourceItemWriterBuilder; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; +import com.redis.spring.batch.KeyValue.DataType; class XmlItemWriterTests { @@ -37,13 +37,13 @@ void test() throws Exception { KeyValue item1 = new KeyValue<>(); item1.setKey("key1"); item1.setTtl(123l); - item1.setType(Type.HASH); + item1.setType(DataType.HASH.getString()); Map hash1 = Map.of("field1", "value1", "field2", "value2"); item1.setValue(hash1); KeyValue item2 = new KeyValue<>(); item2.setKey("key2"); item2.setTtl(456l); - item2.setType(Type.STREAM); + item2.setType(DataType.STREAM.getString()); Map hash2 = Map.of("field1", "value1", "field2", "value2"); item2.setValue(hash2); writer.write(Chunk.of(item1, item2)); diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java index 9eb3ffd2e..9c94cbc4f 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/GeneratorImport.java @@ -6,8 +6,9 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.function.FunctionItemProcessor; -import com.redis.riot.core.AbstractStructImport; +import com.redis.riot.core.AbstractImport; import com.redis.spring.batch.KeyValue; +import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.gen.CollectionOptions; import com.redis.spring.batch.gen.GeneratorItemReader; import com.redis.spring.batch.gen.Item; @@ -20,13 +21,10 @@ import com.redis.spring.batch.gen.TimeSeriesOptions; import com.redis.spring.batch.gen.ZsetOptions; -public class GeneratorImport extends AbstractStructImport { +public class GeneratorImport extends AbstractImport { public static final int DEFAULT_COUNT = 1000; - private static final ItemProcessor> PROCESSOR = new FunctionItemProcessor<>( - new ItemToKeyValueFunction()); - private int count = DEFAULT_COUNT; private String keyspace = GeneratorItemReader.DEFAULT_KEYSPACE; private Range keyRange = GeneratorItemReader.DEFAULT_KEY_RANGE; @@ -43,7 +41,13 @@ public class GeneratorImport extends AbstractStructImport { @Override protected Job job() { - return jobBuilder().start(step(getName(), reader(), writer()).processor(PROCESSOR).build()).build(); + RedisItemWriter> writer = RedisItemWriter.struct(); + configure(writer); + return jobBuilder().start(step(getName(), reader(), writer).processor(processor()).build()).build(); + } + + private ItemProcessor> processor() { + return new FunctionItemProcessor<>(new ItemToKeyValueFunction()); } private GeneratorItemReader reader() { diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java index a8877aa51..4c8cdd4cd 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/KeyComparisonDiffLogger.java @@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.item.Chunk; +import org.springframework.util.ObjectUtils; import com.redis.spring.batch.reader.KeyComparison; import com.redis.spring.batch.reader.KeyComparison.Status; @@ -32,24 +33,31 @@ public void afterWrite(Chunk> items) { public void log(KeyComparison comparison) { switch (comparison.getStatus()) { case MISSING: - log.error("Missing key {}", key(comparison)); + log("Missing key {}", comparison); break; case TYPE: - log.error("Type mismatch on key {}. Expected {} but was {}", key(comparison), - comparison.getSource().getType(), comparison.getTarget().getType()); + log("Type mismatch on key {}. Expected {} but was {}", comparison, comparison.getSource().getType(), + comparison.getTarget().getType()); break; case VALUE: - log.error("Value mismatch on key {}", key(comparison)); + log("Value mismatch on key {}", comparison); break; case TTL: - log.error("TTL mismatch on key {}. Expected {} but was {}", key(comparison), - comparison.getSource().getTtl(), comparison.getTarget().getTtl()); + log("TTL mismatch on key {}. Expected {} but was {}", comparison, comparison.getSource().getTtl(), + comparison.getTarget().getTtl()); break; default: break; } } + private void log(String msg, KeyComparison comparison, Object... args) { + if (log.isErrorEnabled()) { + String key = key(comparison); + log.error(msg, ObjectUtils.addObjectToArray(args, key, 0)); + } + } + private String key(KeyComparison comparison) { return toStringKeyFunction.apply(comparison.getSource().getKey()); } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java index bb7eaf879..fad5a8d19 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Ping.java @@ -13,15 +13,12 @@ import org.springframework.batch.repeat.RepeatStatus; import org.springframework.util.Assert; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.api.sync.RedisModulesCommands; -import com.redis.lettucemod.util.RedisModulesUtils; -import com.redis.riot.core.AbstractRedisRunnable; +import com.redis.riot.core.AbstractRedisCallable; import io.lettuce.core.metrics.CommandMetrics.CommandLatency; import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions; -public class Ping extends AbstractRedisRunnable { +public class Ping extends AbstractRedisCallable { public static final int DEFAULT_ITERATIONS = 1; public static final int DEFAULT_COUNT = 10; @@ -83,7 +80,7 @@ public void setPercentiles(double[] percentiles) { protected Job job() { TaskletStep step = new TaskletStep(); CallableTaskletAdapter tasklet = new CallableTaskletAdapter(); - tasklet.setCallable(this::call); + tasklet.setCallable(this::execute); step.setName(getName()); step.setTransactionManager(getJobFactory().getPlatformTransactionManager()); step.setJobRepository(getJobFactory().getJobRepository()); @@ -91,38 +88,34 @@ protected Job job() { return jobBuilder().start(step).build(); } - private RepeatStatus call() { - try (StatefulRedisModulesConnection connection = RedisModulesUtils - .connection(getRedisClient())) { - RedisModulesCommands commands = connection.sync(); - for (int iteration = 0; iteration < iterations; iteration++) { - LatencyStats stats = new LatencyStats(); - for (int index = 0; index < count; index++) { - long startTime = System.nanoTime(); - String reply = commands.ping(); - Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply); - stats.recordLatency(System.nanoTime() - startTime); - } - Histogram histogram = stats.getIntervalHistogram(); - if (latencyDistribution) { - histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1)); - } - Map percentileMap = new TreeMap<>(); - for (double targetPercentile : percentiles) { - long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile)); - percentileMap.put(targetPercentile, percentile); - } - long min = toTimeUnit(histogram.getMinValue()); - long max = toTimeUnit(histogram.getMaxValue()); - CommandLatency latency = new CommandLatency(min, max, percentileMap); - out.println(latency.toString()); - if (getSleep() != null) { - try { - Thread.sleep(getSleep().toMillis()); - } catch (InterruptedException e) { - // Restore interrupted state... - Thread.currentThread().interrupt(); - } + private RepeatStatus execute() { + for (int iteration = 0; iteration < iterations; iteration++) { + LatencyStats stats = new LatencyStats(); + for (int index = 0; index < count; index++) { + long startTime = System.nanoTime(); + String reply = redisCommands.ping(); + Assert.isTrue("pong".equalsIgnoreCase(reply), "Invalid PING reply received: " + reply); + stats.recordLatency(System.nanoTime() - startTime); + } + Histogram histogram = stats.getIntervalHistogram(); + if (latencyDistribution) { + histogram.outputPercentileDistribution(System.out, (double) timeUnit.toNanos(1)); + } + Map percentileMap = new TreeMap<>(); + for (double targetPercentile : percentiles) { + long percentile = toTimeUnit(histogram.getValueAtPercentile(targetPercentile)); + percentileMap.put(targetPercentile, percentile); + } + long min = toTimeUnit(histogram.getMinValue()); + long max = toTimeUnit(histogram.getMaxValue()); + CommandLatency latency = new CommandLatency(min, max, percentileMap); + out.println(latency.toString()); + if (getSleep() != null) { + try { + Thread.sleep(getSleep().toMillis()); + } catch (InterruptedException e) { + // Restore interrupted state... + Thread.currentThread().interrupt(); } } } diff --git a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java index a16bc0def..b16662d62 100644 --- a/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java +++ b/connectors/riot-redis/src/main/java/com/redis/riot/redis/Replication.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.Job; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.builder.JobFlowBuilder; @@ -20,10 +21,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.expression.spel.support.StandardEvaluationContext; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.util.RedisModulesUtils; import com.redis.riot.core.AbstractExport; -import com.redis.riot.core.LoggingWriteListener; import com.redis.riot.core.RedisClientOptions; import com.redis.riot.core.RedisWriterOptions; import com.redis.spring.batch.KeyValue; @@ -44,19 +42,24 @@ public class Replication extends AbstractExport { + public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL; + public static final Duration DEFAULT_IDLE_TIMEOUT = RedisItemReader.DEFAULT_IDLE_TIMEOUT; + public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = RedisItemReader.DEFAULT_NOTIFICATION_QUEUE_CAPACITY; public static final ReplicationType DEFAULT_TYPE = ReplicationType.DUMP; public static final ReplicationMode DEFAULT_MODE = ReplicationMode.SNAPSHOT; public static final CompareMode DEFAULT_COMPARE_MODE = CompareMode.QUICK; public static final String CONFIG_NOTIFY_KEYSPACE_EVENTS = "notify-keyspace-events"; - public static final String STEP_LIVE = "live"; - public static final String STEP_SCAN = "scan"; - public static final String STEP_COMPARE = "compare"; + public static final String FLOW_LIVE = "live-flow"; + public static final String FLOW_SCAN = "scan-flow"; + public static final String FLOW_REPLICATE = "replicate-flow"; + public static final String STEP_LIVE = "live-step"; + public static final String STEP_SCAN = "scan-step"; + public static final String STEP_COMPARE = "compare-step"; private static final String SOURCE_VAR = "source"; private static final String TARGET_VAR = "target"; - private final Logger log = LoggerFactory.getLogger(Replication.class); - private final Function toString = BatchUtils.toStringKeyFunction(ByteArrayCodec.INSTANCE); + private final Logger log = LoggerFactory.getLogger(getClass()); private ReplicationMode mode = DEFAULT_MODE; private ReplicationType type = DEFAULT_TYPE; @@ -67,6 +70,10 @@ public class Replication extends AbstractExport { private ReadFrom targetReadFrom; private RedisWriterOptions writerOptions = new RedisWriterOptions(); + private Duration flushInterval = DEFAULT_FLUSH_INTERVAL; + private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT; + private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY; + private RedisURI targetRedisURI; private AbstractRedisClient targetRedisClient; @@ -76,39 +83,50 @@ protected boolean isStruct() { } @Override - public void execute() throws Exception { - try { - targetRedisURI = targetRedisClientOptions.redisURI(); - targetRedisClient = targetRedisClientOptions.client(targetRedisURI); - super.execute(); - } finally { - targetRedisClient.close(); - targetRedisClient.getResources().shutdown(); - } + public void afterPropertiesSet() throws Exception { + targetRedisURI = targetRedisClientOptions.redisURI(); + targetRedisClient = targetRedisClientOptions.client(targetRedisURI); + super.afterPropertiesSet(); } @Override protected StandardEvaluationContext evaluationContext() { - StandardEvaluationContext evaluationContext = super.evaluationContext(); - evaluationContext.setVariable(SOURCE_VAR, getRedisURI()); - evaluationContext.setVariable(TARGET_VAR, targetRedisURI); - return evaluationContext; + StandardEvaluationContext context = super.evaluationContext(); + context.setVariable(SOURCE_VAR, redisURI); + context.setVariable(TARGET_VAR, targetRedisURI); + return context; + } + + @Override + public void close() { + if (targetRedisClient != null) { + targetRedisClient.close(); + targetRedisClient.getResources().shutdown(); + } + super.close(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected Job job() { - ItemProcessor, KeyValue> processor = processor( - ByteArrayCodec.INSTANCE); - SimpleStepBuilder, KeyValue> scanStep = step(STEP_SCAN, reader(), - writer()).processor(processor); - RedisItemReader> liveReader = reader(); + ItemProcessor processor = processor(ByteArrayCodec.INSTANCE); + RedisItemReader reader = reader(); + configure(reader); + RedisItemWriter writer = writer(); + configure(writer); + SimpleStepBuilder scanStep = step(STEP_SCAN, reader, writer).processor(processor); + RedisItemReader liveReader = reader(); + configure(liveReader); liveReader.setMode(ReaderMode.LIVE); - FlushingStepBuilder, KeyValue> liveStep = flushingStep( - step(STEP_LIVE, liveReader, writer()).processor(processor)); - if (log.isInfoEnabled()) { - addLoggingWriteListener(scanStep); - addLoggingWriteListener(liveStep); - } + RedisItemWriter liveWriter = writer(); + configure(liveWriter); + FlushingStepBuilder liveStep = new FlushingStepBuilder<>(step(STEP_LIVE, liveReader, liveWriter)) + .processor(processor); + liveStep.flushInterval(flushInterval); + liveStep.idleTimeout(idleTimeout); + LoggingWriteListener listener = new LoggingWriteListener(); + scanStep.listener(listener); + liveStep.listener(listener); KeyComparisonStatusCountItemWriter compareWriter = new KeyComparisonStatusCountItemWriter(); TaskletStep compareStep = step(STEP_COMPARE, comparisonReader(), compareWriter).build(); switch (mode) { @@ -116,9 +134,9 @@ protected Job job() { return jobBuilder().start(compareStep).build(); case LIVE: checkKeyspaceNotificationEnabled(); - SimpleFlow scanFlow = flow("scan").start(scanStep.build()).build(); - SimpleFlow liveFlow = flow("live").start(liveStep.build()).build(); - SimpleFlow replicateFlow = flow("replicate").split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow) + SimpleFlow scanFlow = flow(FLOW_SCAN).start(scanStep.build()).build(); + SimpleFlow liveFlow = flow(FLOW_LIVE).start(liveStep.build()).build(); + SimpleFlow replicateFlow = flow(FLOW_REPLICATE).split(new SimpleAsyncTaskExecutor()).add(liveFlow, scanFlow) .build(); JobFlowBuilder live = jobBuilder().start(replicateFlow); if (shouldCompare()) { @@ -139,16 +157,16 @@ protected Job job() { } } - private void addLoggingWriteListener(SimpleStepBuilder, KeyValue> step) { - step.listener(new LoggingWriteListener<>(this::log)); - } + public static class LoggingWriteListener implements ItemWriteListener> { - private void log(Chunk> chunk) { - chunk.getItems().stream().forEach(this::log); - } + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Function toString = BatchUtils.toStringKeyFunction(ByteArrayCodec.INSTANCE); + + @Override + public void afterWrite(Chunk> chunk) { + chunk.forEach(t -> log.info("Wrote {}", toString.apply(t.getKey()))); + } - private void log(KeyValue keyValue) { - log.info("Wrote {}", toString.apply(keyValue.getKey())); } private FlowBuilder flow(String name) { @@ -172,11 +190,10 @@ protected FaultTolerantStepBuilder step(String name, ItemReader } private void checkKeyspaceNotificationEnabled() { - try (StatefulRedisModulesConnection connection = RedisModulesUtils - .connection(getRedisClient())) { - String config = connection.sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS) + try { + String config = redisCommands.configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS) .getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, ""); - if (!config.contains("K")) { + if (!config.contains("K") || !config.contains("E")) { log.error("Keyspace notifications not property configured ({}={}). Use the string KEA to enable them.", CONFIG_NOTIFY_KEYSPACE_EVENTS, config); } @@ -185,15 +202,16 @@ private void checkKeyspaceNotificationEnabled() { } } - private RedisItemReader> reader() { - RedisItemReader> reader = createReader(); - reader.setClient(getRedisClient()); - configureReader(reader); - return reader; + @Override + protected void configure(RedisItemReader reader) { + super.configure(reader); + reader.setFlushInterval(flushInterval); + reader.setIdleTimeout(idleTimeout); + reader.setNotificationQueueCapacity(notificationQueueCapacity); } @SuppressWarnings({ "unchecked", "rawtypes" }) - private RedisItemReader> createReader() { + private RedisItemReader> reader() { if (isStruct()) { return RedisItemReader.struct(ByteArrayCodec.INSTANCE); } @@ -204,8 +222,7 @@ private KeyComparisonItemReader comparisonReader() { KeyComparisonItemReader reader = compareMode == CompareMode.FULL ? RedisItemReader.compare(ByteArrayCodec.INSTANCE) : RedisItemReader.compareQuick(ByteArrayCodec.INSTANCE); - configureReader(reader); - reader.setClient(getRedisClient()); + configure(reader); reader.setTargetClient(targetRedisClient); reader.setTargetPoolSize(writerOptions.getPoolSize()); reader.setTargetReadFrom(targetReadFrom); @@ -227,19 +244,18 @@ private StreamMessageIdPolicy streamMessageIdPolicy() { return StreamMessageIdPolicy.COMPARE; } - private RedisItemWriter> writer() { - RedisItemWriter> writer = createWriter(); + @Override + protected void configure(RedisItemWriter writer) { + super.configure(writer); writer.setClient(targetRedisClient); - writer(writer, writerOptions); - return writer; + writerOptions.configure(writer); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private RedisItemWriter> createWriter() { + private RedisItemWriter> writer() { if (isStruct()) { return RedisItemWriter.struct(ByteArrayCodec.INSTANCE); } - return (RedisItemWriter) RedisItemWriter.dump(); + return RedisItemWriter.dump(); } public CompareMode getCompareMode() { @@ -306,4 +322,28 @@ public void setType(ReplicationType type) { this.type = type; } + public Duration getFlushInterval() { + return flushInterval; + } + + public void setFlushInterval(Duration interval) { + this.flushInterval = interval; + } + + public Duration getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(Duration timeout) { + this.idleTimeout = timeout; + } + + public int getNotificationQueueCapacity() { + return notificationQueueCapacity; + } + + public void setNotificationQueueCapacity(int capacity) { + this.notificationQueueCapacity = capacity; + } + } diff --git a/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java b/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java index 89d8b856c..661cac4ce 100644 --- a/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java +++ b/connectors/riot-redis/src/test/java/com/redis/riot/redis/ReplicationTests.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.slf4j.simple.SimpleLogger; import org.springframework.batch.item.support.ListItemWriter; import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex; @@ -17,6 +18,7 @@ import com.redis.riot.core.PredicateItemProcessor; import com.redis.riot.core.RedisClientOptions; import com.redis.riot.core.RiotUtils; +import com.redis.riot.redis.Replication.LoggingWriteListener; import com.redis.spring.batch.KeyValue; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.common.FlushingStepBuilder; @@ -46,12 +48,16 @@ protected static double abv(Map beer) { } protected void execute(Replication replication, TestInfo info) throws Exception { - replication.setName(name(info)); - replication.setJobFactory(jobFactory); - replication.setRedisClientOptions(redisOptions(getRedisServer())); - replication.setTargetRedisClientOptions(redisOptions(getTargetRedisServer())); - replication.getReaderOptions().setIdleTimeout(getIdleTimeout()); - replication.execute(); + try (replication) { + System.setProperty(SimpleLogger.LOG_KEY_PREFIX + LoggingWriteListener.class.getName(), "error"); + replication.setName(name(info)); + replication.setJobFactory(jobFactory); + replication.setRedisClientOptions(redisOptions(getRedisServer())); + replication.setTargetRedisClientOptions(redisOptions(getTargetRedisServer())); + replication.setIdleTimeout(getIdleTimeout()); + replication.afterPropertiesSet(); + replication.call(); + } } private RedisClientOptions redisOptions(RedisServer redis) { @@ -77,7 +83,7 @@ void keyProcessor(TestInfo info) throws Throwable { redisCommands.set(key1, value1); Replication replication = new Replication(); replication.setType(ReplicationType.STRUCT); - replication.setProcessorOptions(processorOptions("#{type.getCode()}:#{key}")); + replication.setProcessorOptions(processorOptions("#{type}:#{key}")); execute(replication, info); Assertions.assertEquals(value1, targetRedisCommands.get("string:" + key1)); } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java index 6644667cf..0fa3960b5 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractExport.java @@ -1,127 +1,114 @@ package com.redis.riot.core; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Stream; -import org.springframework.batch.core.step.builder.SimpleStepBuilder; -import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.function.FunctionItemProcessor; -import org.springframework.batch.item.support.PassThroughItemProcessor; -import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.util.CollectionUtils; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.util.RedisModulesUtils; -import com.redis.riot.core.function.DropStreamMessageIdFunction; +import com.redis.riot.core.function.CompositeOperator; +import com.redis.riot.core.function.DropStreamMessageId; import com.redis.riot.core.function.ExpressionFunction; -import com.redis.riot.core.function.KeyValueOperator; import com.redis.riot.core.function.LongExpressionFunction; import com.redis.riot.core.function.StringKeyValueFunction; import com.redis.riot.core.function.ToStringKeyValueFunction; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; import com.redis.spring.batch.RedisItemReader; -import com.redis.spring.batch.common.FlushingStepBuilder; -import com.redis.spring.batch.operation.KeyValueRead; +import com.redis.spring.batch.util.BatchUtils; +import com.redis.spring.batch.util.Predicates; import io.lettuce.core.codec.RedisCodec; -public abstract class AbstractExport extends AbstractRedisRunnable { +public abstract class AbstractExport extends AbstractRedisCallable { - private static final String REDIS_VAR = "redis"; - - private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions(); private RedisReaderOptions readerOptions = new RedisReaderOptions(); + private KeyFilterOptions keyFilterOptions = new KeyFilterOptions(); private ExportProcessorOptions processorOptions = new ExportProcessorOptions(); - protected ItemProcessor, KeyValue> processor(RedisCodec codec) { + protected FunctionItemProcessor, KeyValue> processor(RedisCodec codec) { if (processorOptions.isEmpty()) { return null; } - ToStringKeyValueFunction code = new ToStringKeyValueFunction<>(codec); - StringKeyValueFunction decode = new StringKeyValueFunction<>(codec); - KeyValueOperator operator = keyValueOperator(); + Function, KeyValue> code = new ToStringKeyValueFunction<>(codec); + Function, KeyValue> decode = new StringKeyValueFunction<>(codec); + CompositeOperator> operator = new CompositeOperator<>(processorConsumers()); return new FunctionItemProcessor<>(code.andThen(operator).andThen(decode)); } - protected StandardEvaluationContext evaluationContext() { - StandardEvaluationContext evaluationContext = evaluationContextOptions.evaluationContext(); - StatefulRedisModulesConnection connection = RedisModulesUtils.connection(getRedisClient()); - evaluationContext.setVariable(REDIS_VAR, connection.sync()); - return evaluationContext; - } - - private KeyValueOperator keyValueOperator() { - KeyValueOperator operator = new KeyValueOperator(); - StandardEvaluationContext evaluationContext = evaluationContext(); + private List>> processorConsumers() { + List>> consumers = new ArrayList<>(); if (processorOptions.getKeyExpression() != null) { - operator.setKeyFunction(ExpressionFunction.of(evaluationContext, processorOptions.getKeyExpression())); + ExpressionFunction function = expressionFunction( + processorOptions.getKeyExpression().getExpression()); + consumers.add(t -> t.setKey(function.apply(t))); } if (processorOptions.isDropTtl()) { - operator.setTtlFunction(t -> 0); - } else { - if (processorOptions.getTtlExpression() != null) { - operator.setTtlFunction( - new LongExpressionFunction<>(evaluationContext, processorOptions.getTtlExpression())); - } + consumers.add(t -> t.setTtl(0)); + } + if (processorOptions.getTtlExpression() != null) { + LongExpressionFunction function = longExpressionFunction(processorOptions.getTtlExpression()); + consumers.add(t -> t.setTtl(function.applyAsLong(t))); } if (processorOptions.isDropStreamMessageId() && isStruct()) { - operator.setValueFunction(new DropStreamMessageIdFunction()); + consumers.add(new DropStreamMessageId()); } if (processorOptions.getTypeExpression() != null) { - Function, String> function = ExpressionFunction.of(evaluationContext, + ExpressionFunction, String> function = expressionFunction( processorOptions.getTypeExpression()); - operator.setTypeFunction(function.andThen(Type::of)); + consumers.add(t -> t.setType(function.apply(t))); } - return operator; + return consumers; } protected abstract boolean isStruct(); - protected void configureReader(RedisItemReader reader) { + @Override + protected void configure(RedisItemReader reader) { reader.setJobFactory(getJobFactory()); - reader.setChunkSize(readerOptions.getChunkSize()); - reader.setDatabase(getRedisURI().getDatabase()); - reader.setKeyProcessor(keyFilteringProcessor(reader.getCodec())); - reader.setKeyPattern(readerOptions.getKeyPattern()); - if (readerOptions.getKeyType() != null) { - reader.setKeyType(readerOptions.getKeyType().name()); - } - reader.setFlushInterval(readerOptions.getFlushInterval()); - reader.setIdleTimeout(readerOptions.getIdleTimeout()); - reader.setNotificationQueueCapacity(readerOptions.getNotificationQueueCapacity()); - reader.setPollTimeout(readerOptions.getPollTimeout()); - reader.setQueueCapacity(readerOptions.getQueueCapacity()); - reader.setReadFrom(readerOptions.getReadFrom()); - reader.setScanCount(readerOptions.getScanCount()); - reader.setThreads(readerOptions.getThreads()); - if (reader.getOperation() instanceof KeyValueRead) { - KeyValueRead operation = (KeyValueRead) reader.getOperation(); - operation.setMemUsageLimit(readerOptions.getMemoryUsageLimit()); - operation.setMemUsageSamples(readerOptions.getMemoryUsageSamples()); + reader.setDatabase(redisURI.getDatabase()); + if (!keyFilterOptions.isEmpty()) { + Predicate predicate = keyFilterPredicate(reader.getCodec()); + reader.setKeyProcessor(new PredicateItemProcessor<>(predicate)); } - reader.setPoolSize(readerOptions.getPoolSize()); + readerOptions.configure(reader); + super.configure(reader); + } + public Predicate keyFilterPredicate(RedisCodec codec) { + return slotsPredicate(codec).and(globPredicate(codec)); } - public ItemProcessor keyFilteringProcessor(RedisCodec codec) { - Predicate predicate = RiotUtils.keyFilterPredicate(codec, readerOptions.getKeyFilterOptions()); - if (predicate == null) { - return new PassThroughItemProcessor<>(); + private Predicate slotsPredicate(RedisCodec codec) { + if (CollectionUtils.isEmpty(keyFilterOptions.getSlots())) { + return Predicates.isTrue(); } - return new PredicateItemProcessor<>(predicate); + Stream> predicates = keyFilterOptions.getSlots().stream() + .map(r -> Predicates.slotRange(codec, r.getStart(), r.getEnd())); + return Predicates.or(predicates); } - protected FlushingStepBuilder flushingStep(SimpleStepBuilder step) { - return new FlushingStepBuilder<>(step).flushInterval(readerOptions.getFlushInterval()) - .idleTimeout(readerOptions.getIdleTimeout()); + private Predicate globPredicate(RedisCodec codec) { + return Predicates.map(BatchUtils.toStringKeyFunction(codec), globPredicate()); + } + + private Predicate globPredicate() { + Predicate include = RiotUtils.globPredicate(keyFilterOptions.getIncludes()); + if (CollectionUtils.isEmpty(keyFilterOptions.getExcludes())) { + return include; + } + return include.and(RiotUtils.globPredicate(keyFilterOptions.getExcludes()).negate()); } public RedisReaderOptions getReaderOptions() { return readerOptions; } - public void setReaderOptions(RedisReaderOptions readerOptions) { - this.readerOptions = readerOptions; + public void setReaderOptions(RedisReaderOptions options) { + this.readerOptions = options; } public ExportProcessorOptions getProcessorOptions() { @@ -132,8 +119,12 @@ public void setProcessorOptions(ExportProcessorOptions options) { this.processorOptions = options; } - public void setEvaluationContextOptions(EvaluationContextOptions spelProcessorOptions) { - this.evaluationContextOptions = spelProcessorOptions; + public KeyFilterOptions getKeyFilterOptions() { + return keyFilterOptions; + } + + public void setKeyFilterOptions(KeyFilterOptions options) { + this.keyFilterOptions = options; } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractImport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractImport.java index 1194086dd..f8db98144 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractImport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractImport.java @@ -1,47 +1,10 @@ package com.redis.riot.core; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.ItemWriter; -import org.springframework.expression.spel.support.StandardEvaluationContext; -import org.springframework.util.Assert; - import com.redis.spring.batch.RedisItemWriter; -import com.redis.spring.batch.operation.Operation; - -import io.lettuce.core.AbstractRedisClient; -public abstract class AbstractImport extends AbstractRedisRunnable { +public abstract class AbstractImport extends AbstractRedisCallable { private RedisWriterOptions writerOptions = new RedisWriterOptions(); - private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions(); - private ImportProcessorOptions processorOptions = new ImportProcessorOptions(); - private List, Object>> operations; - - @SuppressWarnings("unchecked") - public void setOperations(Operation, Object>... operations) { - setOperations(Arrays.asList(operations)); - } - - public List, Object>> getOperations() { - return operations; - } - - public void setOperations(List, Object>> operations) { - this.operations = operations; - } - - public EvaluationContextOptions getEvaluationContextOptions() { - return evaluationContextOptions; - } - - public void setEvaluationContextOptions(EvaluationContextOptions evaluationContextOptions) { - this.evaluationContextOptions = evaluationContextOptions; - } public RedisWriterOptions getWriterOptions() { return writerOptions; @@ -51,32 +14,10 @@ public void setWriterOptions(RedisWriterOptions options) { this.writerOptions = options; } - public ImportProcessorOptions getProcessorOptions() { - return processorOptions; - } - - public void setProcessorOptions(ImportProcessorOptions options) { - this.processorOptions = options; - } - - protected ItemWriter> writer() { - Assert.notEmpty(operations, "No operation specified"); - return RiotUtils.writer(operations.stream().map(o -> writer(getRedisClient(), o)).collect(Collectors.toList())); - } - - private ItemWriter writer(AbstractRedisClient client, Operation operation) { - RedisItemWriter writer = RedisItemWriter.operation(operation); - writer.setClient(client); - writer(writer, writerOptions); - return writer; - } - - protected ItemProcessor, Map> processor() { - return processorOptions.processor(evaluationContext()); - } - - protected StandardEvaluationContext evaluationContext() { - return evaluationContextOptions.evaluationContext(); + @Override + protected void configure(RedisItemWriter writer) { + writerOptions.configure(writer); + super.configure(writer); } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java index d387cdbde..6894c1dd4 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapExport.java @@ -32,8 +32,7 @@ protected Job job() { protected RedisItemReader> reader() { RedisItemReader> reader = RedisItemReader.struct(); - reader.setClient(getRedisClient()); - configureReader(reader); + configure(reader); return reader; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java new file mode 100644 index 000000000..61315951f --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractMapImport.java @@ -0,0 +1,70 @@ +package com.redis.riot.core; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.util.Assert; + +import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.operation.Operation; + +public abstract class AbstractMapImport extends AbstractImport { + + private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions(); + private ImportProcessorOptions processorOptions = new ImportProcessorOptions(); + private List, Object>> operations; + + @SuppressWarnings("unchecked") + public void setOperations(Operation, Object>... operations) { + setOperations(Arrays.asList(operations)); + } + + public List, Object>> getOperations() { + return operations; + } + + public void setOperations(List, Object>> operations) { + this.operations = operations; + } + + public EvaluationContextOptions getEvaluationContextOptions() { + return evaluationContextOptions; + } + + public void setEvaluationContextOptions(EvaluationContextOptions evaluationContextOptions) { + this.evaluationContextOptions = evaluationContextOptions; + } + + public ImportProcessorOptions getProcessorOptions() { + return processorOptions; + } + + public void setProcessorOptions(ImportProcessorOptions options) { + this.processorOptions = options; + } + + protected ItemWriter> writer() { + Assert.notEmpty(operations, "No operation specified"); + return RiotUtils.writer(operations.stream().map(this::writer).collect(Collectors.toList())); + } + + private ItemWriter writer(Operation operation) { + RedisItemWriter writer = RedisItemWriter.operation(operation); + configure(writer); + return writer; + } + + protected ItemProcessor, Map> processor() { + return processorOptions.processor(evaluationContext()); + } + + protected StandardEvaluationContext evaluationContext() { + return evaluationContextOptions.evaluationContext(); + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisCallable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisCallable.java new file mode 100644 index 000000000..0db6a47d3 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisCallable.java @@ -0,0 +1,92 @@ +package com.redis.riot.core; + +import org.springframework.expression.Expression; +import org.springframework.expression.spel.support.StandardEvaluationContext; + +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.api.sync.RedisModulesCommands; +import com.redis.lettucemod.util.RedisModulesUtils; +import com.redis.riot.core.function.ExpressionFunction; +import com.redis.riot.core.function.LongExpressionFunction; +import com.redis.spring.batch.RedisItemReader; +import com.redis.spring.batch.RedisItemWriter; + +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisURI; + +public abstract class AbstractRedisCallable extends AbstractRiotCallable { + + private static final String CONTEXT_VAR_REDIS = "redis"; + + private RedisClientOptions redisClientOptions = new RedisClientOptions(); + private EvaluationContextOptions evaluationContextOptions = new EvaluationContextOptions(); + + protected RedisURI redisURI; + private AbstractRedisClient redisClient; + private StatefulRedisModulesConnection redisConnection; + protected RedisModulesCommands redisCommands; + private StandardEvaluationContext evaluationContext; + + @Override + public void afterPropertiesSet() throws Exception { + redisURI = redisClientOptions.redisURI(); + redisClient = redisClientOptions.client(redisURI); + redisConnection = RedisModulesUtils.connection(redisClient); + redisCommands = redisConnection.sync(); + evaluationContext = evaluationContext(); + super.afterPropertiesSet(); + } + + protected StandardEvaluationContext evaluationContext() { + StandardEvaluationContext context = evaluationContextOptions.evaluationContext(); + context.setVariable(CONTEXT_VAR_REDIS, redisCommands); + return context; + } + + @Override + public void close() { + evaluationContext = null; + redisCommands = null; + if (redisConnection != null) { + redisConnection.close(); + redisConnection = null; + } + if (redisClient != null) { + redisClient.close(); + redisClient.getResources().shutdown(); + } + } + + public RedisClientOptions getRedisClientOptions() { + return redisClientOptions; + } + + public void setRedisClientOptions(RedisClientOptions options) { + this.redisClientOptions = options; + } + + public StandardEvaluationContext getEvaluationContext() { + return evaluationContext; + } + + public void setEvaluationContextOptions(EvaluationContextOptions spelProcessorOptions) { + this.evaluationContextOptions = spelProcessorOptions; + } + + protected void configure(RedisItemReader reader) { + reader.setClient(redisClient); + } + + protected void configure(RedisItemWriter writer) { + writer.setClient(redisClient); + } + + protected ExpressionFunction expressionFunction(Expression expression) { + return new ExpressionFunction<>(evaluationContext, expression, String.class); + } + + protected LongExpressionFunction longExpressionFunction(Expression expression) { + return new LongExpressionFunction<>(evaluationContext, expression); + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisRunnable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisRunnable.java deleted file mode 100644 index bf5d5c5fb..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractRedisRunnable.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.redis.riot.core; - -import io.lettuce.core.AbstractRedisClient; -import io.lettuce.core.RedisURI; - -public abstract class AbstractRedisRunnable extends AbstractExecutable { - - private RedisClientOptions redisClientOptions = new RedisClientOptions(); - - private RedisURI redisURI; - private AbstractRedisClient redisClient; - - @Override - public void execute() throws Exception { - redisURI = redisClientOptions.redisURI(); - try { - redisClient = redisClientOptions.client(redisURI); - super.execute(); - } finally { - redisClient.close(); - redisClient.getResources().shutdown(); - } - } - - public RedisClientOptions getRedisClientOptions() { - return redisClientOptions; - } - - public void setRedisClientOptions(RedisClientOptions options) { - this.redisClientOptions = options; - } - - protected RedisURI getRedisURI() { - return redisURI; - } - - protected AbstractRedisClient getRedisClient() { - return redisClient; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExecutable.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotCallable.java similarity index 86% rename from core/riot-core/src/main/java/com/redis/riot/core/AbstractExecutable.java rename to core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotCallable.java index b0ee3eee8..480f05aa8 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractExecutable.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractRiotCallable.java @@ -5,8 +5,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; import org.springframework.batch.core.step.builder.SimpleStepBuilder; @@ -18,19 +20,17 @@ import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.SynchronizedItemReader; import org.springframework.batch.item.support.SynchronizedItemStreamReader; +import org.springframework.beans.factory.InitializingBean; import org.springframework.retry.policy.MaxAttemptsRetryPolicy; import org.springframework.util.ClassUtils; import com.redis.spring.batch.RedisItemReader; -import com.redis.spring.batch.RedisItemWriter; import com.redis.spring.batch.common.JobFactory; -import com.redis.spring.batch.operation.KeyValueWrite; -import com.redis.spring.batch.operation.KeyValueWrite.WriteMode; import io.lettuce.core.RedisCommandExecutionException; import io.lettuce.core.RedisCommandTimeoutException; -public abstract class AbstractExecutable { +public abstract class AbstractRiotCallable implements InitializingBean, Callable, AutoCloseable { public static final SkipPolicy DEFAULT_SKIP_POLICY = new NeverSkipItemSkipPolicy(); public static final int DEFAULT_SKIP_LIMIT = 0; @@ -49,7 +49,7 @@ public abstract class AbstractExecutable { private int retryLimit = DEFAULT_RETRY_LIMIT; private JobFactory jobFactory; - protected AbstractExecutable() { + protected AbstractRiotCallable() { setName(ClassUtils.getShortName(getClass())); } @@ -72,12 +72,17 @@ public void setJobFactory(JobFactory jobFactory) { this.jobFactory = jobFactory; } - public void execute() throws Exception { + @Override + public void afterPropertiesSet() throws Exception { if (jobFactory == null) { jobFactory = new JobFactory(); jobFactory.afterPropertiesSet(); } - JobFactory.checkJobExecution(jobFactory.run(job())); + } + + @Override + public JobExecution call() throws Exception { + return JobFactory.checkJobExecution(jobFactory.run(job())); } protected JobBuilder jobBuilder() { @@ -86,17 +91,6 @@ protected JobBuilder jobBuilder() { protected abstract Job job(); - protected void writer(RedisItemWriter writer, RedisWriterOptions options) { - writer.setMultiExec(options.isMultiExec()); - writer.setPoolSize(options.getPoolSize()); - writer.setWaitReplicas(options.getWaitReplicas()); - writer.setWaitTimeout(options.getWaitTimeout()); - if (writer.getOperation() instanceof KeyValueWrite) { - KeyValueWrite operation = (KeyValueWrite) writer.getOperation(); - operation.setMode(options.isMerge() ? WriteMode.MERGE : WriteMode.OVERWRITE); - } - } - protected FaultTolerantStepBuilder step(String name, ItemReader reader, ItemWriter writer) { SimpleStepBuilder builder = jobFactory.step(name, chunkSize); if (reader instanceof ItemStreamSupport) { diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractStructImport.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractStructImport.java deleted file mode 100644 index 3d23327eb..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractStructImport.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.redis.riot.core; - -import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.RedisItemWriter; - -public abstract class AbstractStructImport extends AbstractRedisRunnable { - - private RedisWriterOptions writerOptions = new RedisWriterOptions(); - - public void setWriterOptions(RedisWriterOptions options) { - this.writerOptions = options; - } - - protected RedisItemWriter> writer() { - RedisItemWriter> writer = RedisItemWriter.struct(); - writer.setClient(getRedisClient()); - writer(writer, writerOptions); - return writer; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/KeyFilterOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/KeyFilterOptions.java index b51bc5af9..87cebff80 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/KeyFilterOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/KeyFilterOptions.java @@ -2,6 +2,8 @@ import java.util.List; +import org.springframework.util.CollectionUtils; + public class KeyFilterOptions { private List includes; @@ -32,4 +34,8 @@ public void setSlots(List ranges) { this.slots = ranges; } + public boolean isEmpty() { + return CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes) && CollectionUtils.isEmpty(slots); + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/LoggingWriteListener.java b/core/riot-core/src/main/java/com/redis/riot/core/LoggingWriteListener.java deleted file mode 100644 index 1a993f8d2..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/LoggingWriteListener.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.redis.riot.core; - -import java.util.function.Consumer; - -import org.springframework.batch.core.ItemWriteListener; -import org.springframework.batch.item.Chunk; - -public class LoggingWriteListener implements ItemWriteListener { - - private final Consumer> consumer; - - public LoggingWriteListener(Consumer> log) { - this.consumer = log; - } - - @Override - public void afterWrite(Chunk items) { - consumer.accept(items); - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java index a1f507da0..45afd8574 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RedisReaderOptions.java @@ -4,7 +4,6 @@ import org.springframework.util.unit.DataSize; -import com.redis.spring.batch.KeyValue.Type; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.operation.KeyValueRead; import com.redis.spring.batch.reader.AbstractPollableItemReader; @@ -20,13 +19,10 @@ public class RedisReaderOptions { public static final int DEFAULT_POOL_SIZE = RedisItemReader.DEFAULT_POOL_SIZE; public static final DataSize DEFAULT_MEMORY_USAGE_LIMIT = KeyValueRead.DEFAULT_MEM_USAGE_LIMIT; public static final int DEFAULT_MEMORY_USAGE_SAMPLES = KeyValueRead.DEFAULT_MEM_USAGE_SAMPLES; - public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = RedisItemReader.DEFAULT_NOTIFICATION_QUEUE_CAPACITY; public static final long DEFAULT_SCAN_COUNT = 1000; - public static final Duration DEFAULT_FLUSH_INTERVAL = RedisItemReader.DEFAULT_FLUSH_INTERVAL; - public static final Duration DEFAULT_IDLE_TIMEOUT = RedisItemReader.DEFAULT_IDLE_TIMEOUT; private String keyPattern; - private Type keyType; + private String keyType; private long scanCount = DEFAULT_SCAN_COUNT; private int queueCapacity = DEFAULT_QUEUE_CAPACITY; private Duration pollTimeout = DEFAULT_POLL_TIMEOUT; @@ -36,42 +32,6 @@ public class RedisReaderOptions { private ReadFrom readFrom; private DataSize memoryUsageLimit = DEFAULT_MEMORY_USAGE_LIMIT; private int memoryUsageSamples = DEFAULT_MEMORY_USAGE_SAMPLES; - private int notificationQueueCapacity = DEFAULT_NOTIFICATION_QUEUE_CAPACITY; - private Duration flushInterval = DEFAULT_FLUSH_INTERVAL; - private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT; - private KeyFilterOptions keyFilterOptions = new KeyFilterOptions(); - - public KeyFilterOptions getKeyFilterOptions() { - return keyFilterOptions; - } - - public void setKeyFilterOptions(KeyFilterOptions options) { - this.keyFilterOptions = options; - } - - public Duration getFlushInterval() { - return flushInterval; - } - - public void setFlushInterval(Duration interval) { - this.flushInterval = interval; - } - - public Duration getIdleTimeout() { - return idleTimeout; - } - - public void setIdleTimeout(Duration timeout) { - this.idleTimeout = timeout; - } - - public int getNotificationQueueCapacity() { - return notificationQueueCapacity; - } - - public void setNotificationQueueCapacity(int capacity) { - this.notificationQueueCapacity = capacity; - } public String getKeyPattern() { return keyPattern; @@ -89,11 +49,11 @@ public void setScanCount(long count) { this.scanCount = count; } - public Type getKeyType() { + public String getKeyType() { return keyType; } - public void setKeyType(Type type) { + public void setKeyType(String type) { this.keyType = type; } @@ -160,4 +120,22 @@ public int getMemoryUsageSamples() { public void setMemoryUsageSamples(int memoryUsageSamples) { this.memoryUsageSamples = memoryUsageSamples; } + + public void configure(RedisItemReader reader) { + reader.setChunkSize(chunkSize); + reader.setKeyPattern(keyPattern); + reader.setKeyType(keyType); + reader.setPollTimeout(pollTimeout); + reader.setQueueCapacity(queueCapacity); + reader.setReadFrom(readFrom); + reader.setScanCount(scanCount); + reader.setThreads(threads); + if (reader.getOperation() instanceof KeyValueRead) { + KeyValueRead operation = (KeyValueRead) reader.getOperation(); + operation.setMemUsageLimit(memoryUsageLimit); + operation.setMemUsageSamples(memoryUsageSamples); + } + reader.setPoolSize(poolSize); + + } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RedisWriterOptions.java b/core/riot-core/src/main/java/com/redis/riot/core/RedisWriterOptions.java index 9be626200..cabbaa3ef 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RedisWriterOptions.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RedisWriterOptions.java @@ -3,6 +3,8 @@ import java.time.Duration; import com.redis.spring.batch.RedisItemWriter; +import com.redis.spring.batch.operation.KeyValueWrite; +import com.redis.spring.batch.operation.KeyValueWrite.WriteMode; public class RedisWriterOptions { @@ -55,4 +57,22 @@ public void setWaitTimeout(Duration timeout) { this.waitTimeout = timeout; } + public void configure(RedisItemWriter writer) { + writer.setMultiExec(multiExec); + writer.setPoolSize(poolSize); + writer.setWaitReplicas(waitReplicas); + writer.setWaitTimeout(waitTimeout); + if (writer.getOperation() instanceof KeyValueWrite) { + ((KeyValueWrite) writer.getOperation()).setMode(writeMode()); + } + + } + + private WriteMode writeMode() { + if (merge) { + return WriteMode.MERGE; + } + return WriteMode.OVERWRITE; + } + } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java index 73901d54b..8fea70e24 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/RiotUtils.java @@ -7,7 +7,6 @@ import java.util.List; import java.util.Objects; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; @@ -19,12 +18,8 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.util.CollectionUtils; -import com.redis.spring.batch.util.BatchUtils; import com.redis.spring.batch.util.Predicates; -import io.lettuce.core.codec.RedisCodec; -import io.lettuce.core.codec.StringCodec; - public abstract class RiotUtils { private static final SpelExpressionParser parser = new SpelExpressionParser(); @@ -46,55 +41,6 @@ public static Predicate predicate(EvaluationContext context, Expression e return t -> expression.getValue(context, t, Boolean.class); } - public static Predicate keyFilterPredicate(KeyFilterOptions options) { - return keyFilterPredicate(StringCodec.UTF8, options); - } - - public static Predicate keyFilterPredicate(RedisCodec codec, KeyFilterOptions options) { - Predicate globPredicate = globPredicate(codec, options); - Predicate slotsPredicate = slotsPredicate(codec, options); - if (globPredicate == null) { - if (slotsPredicate == null) { - return null; - } - return slotsPredicate; - } - if (slotsPredicate == null) { - return globPredicate; - } - return slotsPredicate.and(globPredicate); - } - - public static Predicate slotsPredicate(RedisCodec codec, KeyFilterOptions options) { - if (CollectionUtils.isEmpty(options.getSlots())) { - return null; - } - List> predicates = options.getSlots().stream() - .map(r -> Predicates.slotRange(codec, r.getStart(), r.getEnd())).collect(Collectors.toList()); - return Predicates.or(predicates); - } - - public static Predicate globPredicate(RedisCodec codec, KeyFilterOptions options) { - Predicate predicate = globPredicate(options); - if (predicate == null) { - return null; - } - return Predicates.map(BatchUtils.toStringKeyFunction(codec), predicate); - } - - public static Predicate globPredicate(KeyFilterOptions options) { - if (CollectionUtils.isEmpty(options.getIncludes())) { - if (CollectionUtils.isEmpty(options.getExcludes())) { - return null; - } - return globPredicate(options.getExcludes()).negate(); - } - if (CollectionUtils.isEmpty(options.getExcludes())) { - return globPredicate(options.getIncludes()); - } - return globPredicate(options.getIncludes()).and(globPredicate(options.getExcludes()).negate()); - } - public static Predicate globPredicate(List patterns) { if (CollectionUtils.isEmpty(patterns)) { return Predicates.isTrue(); diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/CompositeOperator.java b/core/riot-core/src/main/java/com/redis/riot/core/function/CompositeOperator.java new file mode 100644 index 000000000..f7f76fc50 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/CompositeOperator.java @@ -0,0 +1,27 @@ +package com.redis.riot.core.function; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; + +public class CompositeOperator implements UnaryOperator { + + private final List> consumers; + + @SuppressWarnings("unchecked") + public CompositeOperator(Consumer... consumers) { + this(Arrays.asList(consumers)); + } + + public CompositeOperator(List> consumers) { + this.consumers = consumers; + } + + @Override + public T apply(T t) { + consumers.forEach(c -> c.accept(t)); + return t; + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageId.java b/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageId.java new file mode 100644 index 000000000..ba134d416 --- /dev/null +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageId.java @@ -0,0 +1,29 @@ +package com.redis.riot.core.function; + +import java.util.Collection; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.redis.spring.batch.KeyValue; +import com.redis.spring.batch.KeyValue.DataType; + +import io.lettuce.core.StreamMessage; + +@SuppressWarnings("unchecked") +public class DropStreamMessageId implements Consumer> { + + @Override + public void accept(KeyValue t) { + if (t.getValue() == null || KeyValue.type(t) != DataType.STREAM) { + return; + } + Collection> messages = (Collection>) t.getValue(); + t.setValue(messages.stream().map(this::message).collect(Collectors.toList())); + } + + @SuppressWarnings("rawtypes") + private StreamMessage message(StreamMessage message) { + return new StreamMessage(message.getStream(), null, message.getBody()); + } + +} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java deleted file mode 100644 index f9681b0b6..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/DropStreamMessageIdFunction.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.redis.riot.core.function; - -import java.util.Collection; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.springframework.util.CollectionUtils; - -import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; - -import io.lettuce.core.StreamMessage; - -@SuppressWarnings("unchecked") -public class DropStreamMessageIdFunction implements Function, Object> { - - @SuppressWarnings("rawtypes") - @Override - public Object apply(KeyValue t) { - if (t.getType() == Type.STREAM) { - Collection messages = (Collection) t.getValue(); - if (!CollectionUtils.isEmpty(messages)) { - return messages.stream().map(this::message).collect(Collectors.toList()); - } - } - return t.getValue(); - } - - @SuppressWarnings("rawtypes") - private StreamMessage message(StreamMessage message) { - return new StreamMessage(message.getStream(), null, message.getBody()); - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/ExpressionFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/ExpressionFunction.java index 463aab10d..80930444f 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/ExpressionFunction.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/ExpressionFunction.java @@ -6,40 +6,31 @@ import org.springframework.expression.Expression; import org.springframework.util.Assert; -import com.redis.riot.core.TemplateExpression; - public class ExpressionFunction implements Function { - private final EvaluationContext context; - - private final Expression expression; - - private final Class type; + private final EvaluationContext context; - public ExpressionFunction(EvaluationContext context, Expression expression, Class type) { - Assert.notNull(context, "A SpEL evaluation context is required."); - Assert.notNull(expression, "A SpEL expression is required."); - Assert.notNull(type, "A type is required."); - this.context = context; - this.expression = expression; - this.type = type; - } + private final Expression expression; - @Override - public R apply(T t) { - return getValue(t); - } + private final Class type; - protected R getValue(T t) { - return expression.getValue(context, t, type); - } + public ExpressionFunction(EvaluationContext context, Expression expression, Class type) { + Assert.notNull(context, "A SpEL evaluation context is required."); + Assert.notNull(expression, "A SpEL expression is required."); + Assert.notNull(type, "A type is required."); + this.context = context; + this.expression = expression; + this.type = type; + } - public static ExpressionFunction of(EvaluationContext context, Expression expression) { - return new ExpressionFunction<>(context, expression, String.class); - } + @Override + public R apply(T t) { + R value = getValue(t); + return value; + } - public static ExpressionFunction of(EvaluationContext context, TemplateExpression expression) { - return of(context, expression.getExpression()); - } + protected R getValue(T t) { + return expression.getValue(context, t, type); + } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/KeyValueOperator.java b/core/riot-core/src/main/java/com/redis/riot/core/function/KeyValueOperator.java deleted file mode 100644 index f5b1cf6e9..000000000 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/KeyValueOperator.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.redis.riot.core.function; - -import java.util.function.Function; -import java.util.function.ToLongFunction; -import java.util.function.UnaryOperator; - -import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; - -public class KeyValueOperator implements UnaryOperator> { - - private Function, String> keyFunction = KeyValue::getKey; - private ToLongFunction> ttlFunction = KeyValue::getTtl; - private Function, Type> typeFunction = KeyValue::getType; - private Function, ?> valueFunction = KeyValue::getValue; - - public void setKeyFunction(Function, String> key) { - this.keyFunction = key; - } - - public void setTtlFunction(ToLongFunction> ttl) { - this.ttlFunction = ttl; - } - - public void setTypeFunction(Function, Type> function) { - this.typeFunction = function; - } - - public void setValueFunction(Function, ?> value) { - this.valueFunction = value; - } - - @Override - public KeyValue apply(KeyValue t) { - t.setKey(keyFunction.apply(t)); - t.setTtl(ttlFunction.applyAsLong(t)); - t.setType(typeFunction.apply(t)); - t.setValue(valueFunction.apply(t)); - return t; - } - -} diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/LongExpressionFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/LongExpressionFunction.java index e90e9c63b..edb9828cf 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/LongExpressionFunction.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/LongExpressionFunction.java @@ -7,17 +7,17 @@ public class LongExpressionFunction extends ExpressionFunction implements ToLongFunction { - public LongExpressionFunction(EvaluationContext context, Expression expression) { - super(context, expression, Long.class); - } + public LongExpressionFunction(EvaluationContext context, Expression expression) { + super(context, expression, Long.class); + } - @Override - public long applyAsLong(T value) { - Long result = getValue(value); - if (result == null) { - return 0; - } - return result; - } + @Override + public long applyAsLong(T value) { + Long result = getValue(value); + if (result == null) { + return 0; + } + return result; + } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/StringKeyValueFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/StringKeyValueFunction.java index 4960adf4b..3ce4e7af6 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/StringKeyValueFunction.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/StringKeyValueFunction.java @@ -7,7 +7,7 @@ import io.lettuce.core.codec.RedisCodec; -public class StringKeyValueFunction implements Function, KeyValue> { +public class StringKeyValueFunction implements Function, KeyValue> { private final Function stringKeyFunction; @@ -17,8 +17,8 @@ public StringKeyValueFunction(RedisCodec codec) { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public KeyValue apply(KeyValue t) { - KeyValue result = new KeyValue<>((KeyValue) t); + public KeyValue apply(KeyValue t) { + KeyValue result = new KeyValue<>((KeyValue) t); result.setKey(stringKeyFunction.apply(t.getKey())); return result; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/StructToMapFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/StructToMapFunction.java index 113921a7d..b75fe530d 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/StructToMapFunction.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/StructToMapFunction.java @@ -11,7 +11,7 @@ import com.redis.lettucemod.timeseries.Sample; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; +import com.redis.spring.batch.KeyValue.DataType; import io.lettuce.core.ScoredValue; import io.lettuce.core.StreamMessage; @@ -27,7 +27,7 @@ public class StructToMapFunction implements Function, M private ZsetToMapFunction zset = new ZsetToMapFunction(); private Function> json = new StringToMapFunction(); private Function> string = new StringToMapFunction(); - private Function> defaultFunction = s -> null; + private Function> defaultFunction = s -> Collections.emptyMap(); public void setKey(Function> key) { this.key = key; @@ -65,34 +65,38 @@ public void setDefaultFunction(Function> function) { public Map apply(KeyValue t) { Map map = new LinkedHashMap<>(); map.putAll(key.apply(t.getKey())); - map.putAll(value(t.getType(), t.getValue())); + map.putAll(value(t)); return map; } @SuppressWarnings("unchecked") - private Map value(Type type, Object value) { - if (type == null || value == null) { + private Map value(KeyValue t) { + if (!KeyValue.hasType(t) || !KeyValue.hasValue(t)) { + return Collections.emptyMap(); + } + DataType type = KeyValue.type(t); + if (type == null) { return Collections.emptyMap(); } switch (type) { case HASH: - return hash.apply((Map) value); + return hash.apply((Map) t.getValue()); case LIST: - return list.apply((List) value); + return list.apply((List) t.getValue()); case SET: - return set.apply((Collection) value); + return set.apply((Collection) t.getValue()); case ZSET: - return zset.apply((List>) value); + return zset.apply((List>) t.getValue()); case STREAM: - return stream.apply((List>) value); + return stream.apply((List>) t.getValue()); case JSON: - return json.apply((String) value); + return json.apply((String) t.getValue()); case STRING: - return string.apply((String) value); + return string.apply((String) t.getValue()); case TIMESERIES: - return timeseries.apply((List) value); + return timeseries.apply((List) t.getValue()); default: - return defaultFunction.apply(value); + return defaultFunction.apply(t.getValue()); } } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/function/ToStringKeyValueFunction.java b/core/riot-core/src/main/java/com/redis/riot/core/function/ToStringKeyValueFunction.java index 8fdb43447..7b0e96c2d 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/function/ToStringKeyValueFunction.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/function/ToStringKeyValueFunction.java @@ -7,7 +7,7 @@ import io.lettuce.core.codec.RedisCodec; -public class ToStringKeyValueFunction implements Function, KeyValue> { +public class ToStringKeyValueFunction implements Function, KeyValue> { private final Function toStringKeyFunction; @@ -17,8 +17,8 @@ public ToStringKeyValueFunction(RedisCodec codec) { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public KeyValue apply(KeyValue t) { - KeyValue result = new KeyValue<>((KeyValue) t); + public KeyValue apply(KeyValue t) { + KeyValue result = new KeyValue<>((KeyValue) t); result.setKey(toStringKeyFunction.apply(t.getKey())); return result; } diff --git a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java index e52116d9b..70a80accf 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/operation/ExpireAtBuilder.java @@ -18,7 +18,7 @@ public ExpireAtBuilder ttl(String field) { protected ExpireAt> operation( Function, String> keyFunction) { ExpireAt> operation = new ExpireAt<>(keyFunction); - operation.setEpochFunction(toLong(ttl, 0)); + operation.epoch(toLong(ttl, 0)); return operation; } diff --git a/core/riot-core/src/test/java/com/redis/riot/core/FunctionTests.java b/core/riot-core/src/test/java/com/redis/riot/core/FunctionTests.java index 065ef653a..3aee7330b 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/FunctionTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/FunctionTests.java @@ -9,7 +9,7 @@ import com.redis.riot.core.function.StringToMapFunction; import com.redis.riot.core.function.StructToMapFunction; import com.redis.spring.batch.KeyValue; -import com.redis.spring.batch.KeyValue.Type; +import com.redis.spring.batch.KeyValue.DataType; class FunctionTests { @@ -18,14 +18,14 @@ void keyValueToMap() { StructToMapFunction function = new StructToMapFunction(); KeyValue string = new KeyValue<>(); string.setKey("beer:1"); - string.setType(Type.STRING); + string.setType(DataType.STRING.getString()); String value = "sdfsdf"; string.setValue(value); Map stringMap = function.apply(string); Assertions.assertEquals(value, stringMap.get(StringToMapFunction.DEFAULT_KEY)); KeyValue hash = new KeyValue<>(); hash.setKey("beer:2"); - hash.setType(Type.HASH); + hash.setType(DataType.HASH.getString()); Map map = new HashMap<>(); map.put("field1", "value1"); hash.setValue(map); diff --git a/core/riot-core/src/test/java/com/redis/riot/core/ProcessorTests.java b/core/riot-core/src/test/java/com/redis/riot/core/ProcessorTests.java index 3486ebdca..85ac470bd 100644 --- a/core/riot-core/src/test/java/com/redis/riot/core/ProcessorTests.java +++ b/core/riot-core/src/test/java/com/redis/riot/core/ProcessorTests.java @@ -10,20 +10,40 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemProcessor; import org.springframework.expression.Expression; import org.springframework.expression.spel.support.StandardEvaluationContext; +import io.lettuce.core.codec.StringCodec; + class ProcessorTests { + private static class TestExport extends AbstractExport { + + @Override + protected boolean isStruct() { + return false; + } + + @Override + protected Job job() { + return null; + } + + } + @Test void keyFilter() { KeyFilterOptions options = new KeyFilterOptions(); options.setIncludes(Arrays.asList("foo*", "bar*")); - Predicate predicate = RiotUtils.keyFilterPredicate(options); - Assertions.assertTrue(predicate.test("foobar")); - Assertions.assertTrue(predicate.test("barfoo")); - Assertions.assertFalse(predicate.test("key")); + try (TestExport export = new TestExport()) { + export.setKeyFilterOptions(options); + Predicate predicate = export.keyFilterPredicate(StringCodec.UTF8); + Assertions.assertTrue(predicate.test("foobar")); + Assertions.assertTrue(predicate.test("barfoo")); + Assertions.assertFalse(predicate.test("key")); + } } @Test diff --git a/gradle.properties b/gradle.properties index fc965bd10..a462dd72d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -38,7 +38,7 @@ latencyutilsVersion = 2.0.3 lettucemodVersion = 3.7.3 picocliVersion = 4.7.5 progressbarVersion = 0.10.0 -springBatchRedisVersion = 4.1.5 +springBatchRedisVersion = 4.2.0 testcontainersRedisVersion = 2.2.0 org.gradle.daemon = false diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java index afefc8c53..7f7e63369 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractExportCommand.java @@ -5,7 +5,7 @@ import org.springframework.batch.item.ItemReader; import com.redis.riot.core.AbstractExport; -import com.redis.riot.core.AbstractRedisRunnable; +import com.redis.riot.core.AbstractRedisCallable; import com.redis.spring.batch.RedisItemReader; import com.redis.spring.batch.RedisItemReader.ReaderMode; import com.redis.spring.batch.reader.ScanSizeEstimator; @@ -14,17 +14,21 @@ public abstract class AbstractExportCommand extends AbstractRiotCommand { - @ArgGroup(exclusive = false, heading = "Reader options%n") - RedisReaderArgs readerArgs = new RedisReaderArgs(); + @ArgGroup(exclusive = false) + private RedisReaderArgs readerArgs = new RedisReaderArgs(); - @ArgGroup(exclusive = false, heading = "Processor options%n") - KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); + @ArgGroup(exclusive = false) + private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs(); + + @ArgGroup(exclusive = false) + private KeyFilterArgs keyFilterArgs = new KeyFilterArgs(); @Override - protected AbstractRedisRunnable runnable() { + protected AbstractRedisCallable runnable() { AbstractExport export = exportRunnable(); export.setReaderOptions(readerArgs.readerOptions()); export.setProcessorOptions(processorArgs.processorOptions()); + export.setKeyFilterOptions(keyFilterArgs.keyFilterOptions()); return export; } @@ -36,11 +40,35 @@ protected LongSupplier initialMaxSupplier(String stepName, ItemReader reader) return () -> ProgressStepExecutionListener.UNKNOWN_SIZE; } ScanSizeEstimator estimator = new ScanSizeEstimator(((RedisItemReader) reader).getClient()); - estimator.setKeyPattern(readerArgs.scanMatch); - if (readerArgs.scanType != null) { - estimator.setKeyType(readerArgs.scanType.getCode()); + estimator.setKeyPattern(readerArgs.getScanMatch()); + if (readerArgs.getScanType() != null) { + estimator.setKeyType(readerArgs.getScanType()); } return estimator; } + public RedisReaderArgs getReaderArgs() { + return readerArgs; + } + + public void setReaderArgs(RedisReaderArgs readerArgs) { + this.readerArgs = readerArgs; + } + + public KeyValueProcessorArgs getProcessorArgs() { + return processorArgs; + } + + public void setProcessorArgs(KeyValueProcessorArgs processorArgs) { + this.processorArgs = processorArgs; + } + + public KeyFilterArgs getKeyFilterArgs() { + return keyFilterArgs; + } + + public void setKeyFilterArgs(KeyFilterArgs keyFilterArgs) { + this.keyFilterArgs = keyFilterArgs; + } + } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java index 176870b06..5dd649e90 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractImportCommand.java @@ -20,7 +20,7 @@ import com.redis.riot.cli.redis.TsAddCommand; import com.redis.riot.cli.redis.XaddCommand; import com.redis.riot.cli.redis.ZaddCommand; -import com.redis.riot.core.AbstractImport; +import com.redis.riot.core.AbstractMapImport; import com.redis.riot.core.ImportProcessorOptions; import com.redis.spring.batch.operation.Operation; @@ -35,13 +35,13 @@ public abstract class AbstractImportCommand extends AbstractRiotCommand { @Option(arity = "1..*", names = "--proc", description = "SpEL expressions in the form field1=\"exp\" field2=\"exp\"...", paramLabel = "") - Map processorExpressions; + private Map processorExpressions; @Option(names = "--filter", description = "Discard records using a SpEL expression.", paramLabel = "") - Expression filter; + private Expression filter; @ArgGroup(exclusive = false) - EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs(); + private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs(); /** * Initialized manually during command parsing @@ -61,8 +61,8 @@ protected List, Object>> operation } @Override - protected AbstractImport runnable() { - AbstractImport runnable = importRunnable(); + protected AbstractMapImport runnable() { + AbstractMapImport runnable = importRunnable(); runnable.setOperations(operations()); runnable.setEvaluationContextOptions(evaluationContextArgs.evaluationContextOptions()); runnable.setProcessorOptions(processorOptions()); @@ -76,7 +76,7 @@ private ImportProcessorOptions processorOptions() { return options; } - protected abstract AbstractImport importRunnable(); + protected abstract AbstractMapImport importRunnable(); @Override protected String taskName(String stepName) { diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractMainCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractMainCommand.java index 9d5747af1..97cbd4fc7 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractMainCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractMainCommand.java @@ -27,8 +27,8 @@ public abstract class AbstractMainCommand extends BaseCommand implements Runnabl PrintWriter err; - @ArgGroup(exclusive = false, heading = "Redis connection options%n") - RedisArgs redisArgs = new RedisArgs(); + @ArgGroup(exclusive = false) + private RedisArgs redisArgs = new RedisArgs(); @Override public void run() { @@ -83,4 +83,12 @@ private static int executionStrategy(ParseResult parseResult) { return new RunLast().execute(parseResult); // default execution strategy } + public RedisArgs getRedisArgs() { + return redisArgs; + } + + public void setRedisArgs(RedisArgs redisArgs) { + this.redisArgs = redisArgs; + } + } diff --git a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractRiotCommand.java b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractRiotCommand.java index ecc290935..16de97633 100644 --- a/plugins/riot/src/main/java/com/redis/riot/cli/AbstractRiotCommand.java +++ b/plugins/riot/src/main/java/com/redis/riot/cli/AbstractRiotCommand.java @@ -14,8 +14,8 @@ import org.springframework.batch.item.ItemWriter; import org.springframework.util.ClassUtils; -import com.redis.riot.core.AbstractExecutable; -import com.redis.riot.core.AbstractRedisRunnable; +import com.redis.riot.core.AbstractRiotCallable; +import com.redis.riot.core.AbstractRedisCallable; import me.tongfei.progressbar.DelegatingProgressBarConsumer; import me.tongfei.progressbar.ProgressBarBuilder; @@ -42,20 +42,20 @@ public enum ProgressStyle { long sleep; @Option(names = "--threads", description = "Number of concurrent threads to use for batch processing (default: ${DEFAULT-VALUE}).", paramLabel = "") - int threads = AbstractExecutable.DEFAULT_THREADS; + int threads = AbstractRiotCallable.DEFAULT_THREADS; @Option(names = { "-b", "--batch" }, description = "Number of items in each batch (default: ${DEFAULT-VALUE}).", paramLabel = "") - int chunkSize = AbstractExecutable.DEFAULT_CHUNK_SIZE; + int chunkSize = AbstractRiotCallable.DEFAULT_CHUNK_SIZE; @Option(names = "--dry-run", description = "Enable dummy writes.") boolean dryRun; @Option(names = "--skip-limit", description = "Max number of failed items before considering the transfer has failed (default: ${DEFAULT-VALUE}).", paramLabel = "") - int skipLimit = AbstractExecutable.DEFAULT_SKIP_LIMIT; + int skipLimit = AbstractRiotCallable.DEFAULT_SKIP_LIMIT; @Option(names = "--retry-limit", description = "Maximum number of times to try failed items. 0 and 1 both mean no retry. (default: ${DEFAULT-VALUE}).", paramLabel = "") - private int retryLimit = AbstractExecutable.DEFAULT_RETRY_LIMIT; + private int retryLimit = AbstractRiotCallable.DEFAULT_RETRY_LIMIT; @Option(names = "--progress", description = "Progress style: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "