From 57f4020afcf70e93440c9073b67f11545e865f06 Mon Sep 17 00:00:00 2001 From: Julien Ruaux Date: Tue, 17 Sep 2019 11:00:53 -0700 Subject: [PATCH] fixed connection pool in multithreaded mode --- src/docs/asciidoc/index.adoc | 4 +- .../com/redislabs/riot/cli/ImportCommand.java | 39 +------- .../redislabs/riot/cli/ProcessorOptions.java | 75 +++++++++++++++ .../riot/cli/db/DatabaseExportCommand.java | 4 +- .../riot/cli/db/DatabaseImportCommand.java | 3 +- .../redislabs/riot/cli/file/FileOptions.java | 3 +- .../cli/generator/FakerGeneratorCommand.java | 3 +- .../cli/generator/SimpleGeneratorCommand.java | 4 +- .../cli/redis/RedisConnectionOptions.java | 78 ++++++++++------ .../riot/redis/AbstractLettuceWriter.java | 92 ------------------- .../riot/redis/LettuSearchWriter.java | 34 ------- .../riot/redis/LettuceClusterWriter.java | 35 ------- .../redislabs/riot/redis/LettuceWriter.java | 90 ++++++++++++++---- .../java/com/redislabs/riot/TestFile.java | 28 +++--- src/test/resources/commands/db-export.txt | 2 +- src/test/resources/commands/db-import.txt | 2 +- .../resources/commands/file-export-csv.txt | 2 +- .../resources/commands/file-export-json.txt | 2 +- .../commands/file-export-json_gz.txt | 2 +- .../commands/file-import-csv-geo.txt | 2 +- .../commands/file-import-csv-geosearch.txt | 1 - .../commands/file-import-csv-hash-date.txt | 1 - .../commands/file-import-csv-hash.txt | 2 +- ...e-import-csv-processor-hash-dateformat.txt | 1 + .../file-import-csv-processor-search-geo.txt | 1 + .../file-import-csv-processor-search.txt | 1 + .../commands/file-import-csv-search.txt | 2 +- .../commands/file-import-json-hash.txt | 2 +- .../commands/file-import-laevents.txt | 1 - .../resources/commands/gen-faker-hash.txt | 2 +- src/test/resources/commands/gen-faker-set.txt | 2 +- .../resources/commands/gen-faker-stream.txt | 2 +- .../resources/commands/gen-faker-zset.txt | 2 +- src/test/resources/commands/gen-simple.txt | 2 +- 34 files changed, 243 insertions(+), 283 deletions(-) create mode 100644 src/main/java/com/redislabs/riot/cli/ProcessorOptions.java delete mode 100644 src/main/java/com/redislabs/riot/redis/AbstractLettuceWriter.java delete mode 100644 src/main/java/com/redislabs/riot/redis/LettuSearchWriter.java delete mode 100644 src/main/java/com/redislabs/riot/redis/LettuceClusterWriter.java delete mode 100644 src/test/resources/commands/file-import-csv-geosearch.txt delete mode 100644 src/test/resources/commands/file-import-csv-hash-date.txt create mode 100644 src/test/resources/commands/file-import-csv-processor-hash-dateformat.txt create mode 100644 src/test/resources/commands/file-import-csv-processor-search-geo.txt create mode 100644 src/test/resources/commands/file-import-csv-processor-search.txt delete mode 100644 src/test/resources/commands/file-import-laevents.txt diff --git a/src/docs/asciidoc/index.adoc b/src/docs/asciidoc/index.adoc index 0ea700f32..2a2e9425e 100644 --- a/src/docs/asciidoc/index.adoc +++ b/src/docs/asciidoc/index.adoc @@ -230,13 +230,13 @@ riot --metrics … .Strings [source,shell] ---- -riot --host redis-12000.redislabs.com --port 12000 --max-total 96 gen simple value=100 --max 100000000 --batch 500 --threads 96 -r string --string-format raw --keyspace string --keys=index --value value +$ riot -s redis-12000.redislabs.com:12000 --pool-max-total 96 gen --batch 500 --threads 96 --max 100000000 --fields value=100 -c set --string-format raw --keyspace string --keys index --string-value value ---- image::images/rs-strings.png[] .Streams [source,shell] ---- -riot --host redis-12000.redislabs.com --port 12000 --max-total 96 gen simple --max 100000000 --batch 500 --threads 96 -r stream --keyspace stream --keys partition +$ riot -s redis-12000.redislabs.com:12000 --pool-max-total 96 gen --batch 500 --threads 96 --max 100000000 -c xadd --keyspace stream --keys partition ---- image::images/rs-streams.png[] \ No newline at end of file diff --git a/src/main/java/com/redislabs/riot/cli/ImportCommand.java b/src/main/java/com/redislabs/riot/cli/ImportCommand.java index 875974f2b..b60b1b2e7 100644 --- a/src/main/java/com/redislabs/riot/cli/ImportCommand.java +++ b/src/main/java/com/redislabs/riot/cli/ImportCommand.java @@ -1,16 +1,10 @@ package com.redislabs.riot.cli; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.Map; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; -import org.springframework.batch.item.support.CompositeItemProcessor; -import com.redislabs.riot.batch.RegexProcessor; -import com.redislabs.riot.batch.SpelProcessor; import com.redislabs.riot.cli.redis.RediSearchCommandOptions; import com.redislabs.riot.cli.redis.RedisCommandOptions; import com.redislabs.riot.cli.redis.RedisKeyOptions; @@ -32,39 +26,12 @@ public abstract class ImportCommand extends TransferCommand, private RedisCommandOptions redis = new RedisCommandOptions(); @ArgGroup(exclusive = false, heading = "RediSearch command options%n") private RediSearchCommandOptions search = new RediSearchCommandOptions(); - @Option(names = { - "--processor" }, description = "SpEL expression to process a field", paramLabel = "") - private Map fields; - @Option(names = { "-r", - "--regex" }, description = "Extract fields from a source field using a regular expression", paramLabel = "") - private Map regexes; - @Option(names = "--processor-variable", description = "Register a variable in the processor context", paramLabel = "") - private Map variables = new LinkedHashMap(); - @Option(names = "--processor-date-format", description = "java.text.SimpleDateFormat pattern for 'date' processor variable (default: ${DEFAULT-VALUE})", paramLabel = "") - private String dateFormat = new SimpleDateFormat().toPattern(); + @ArgGroup(exclusive = false, heading = "Processor options%n") + private ProcessorOptions processorOptions = new ProcessorOptions(); @Override protected ItemProcessor, Map> processor() throws Exception { - if (fields == null) { - if (regexes == null) { - return null; - } - return regexProcessor(); - } - if (regexes == null) { - return spelProcessor(); - } - CompositeItemProcessor, Map> processor = new CompositeItemProcessor<>(); - processor.setDelegates(Arrays.asList(regexProcessor(), spelProcessor())); - return processor; - } - - private ItemProcessor, Map> regexProcessor() { - return new RegexProcessor(regexes); - } - - private ItemProcessor, Map> spelProcessor() { - return new SpelProcessor(getRedisOptions().redis(), new SimpleDateFormat(dateFormat), variables, fields); + return processorOptions.processor(getRedisOptions()); } @Override diff --git a/src/main/java/com/redislabs/riot/cli/ProcessorOptions.java b/src/main/java/com/redislabs/riot/cli/ProcessorOptions.java new file mode 100644 index 000000000..d726fba50 --- /dev/null +++ b/src/main/java/com/redislabs/riot/cli/ProcessorOptions.java @@ -0,0 +1,75 @@ +package com.redislabs.riot.cli; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.support.CompositeItemProcessor; +import org.springframework.batch.item.support.ScriptItemProcessor; +import org.springframework.batch.item.support.builder.ScriptItemProcessorBuilder; +import org.springframework.core.io.FileSystemResource; + +import com.redislabs.riot.batch.RegexProcessor; +import com.redislabs.riot.batch.SpelProcessor; +import com.redislabs.riot.cli.redis.RedisConnectionOptions; + +import picocli.CommandLine.Option; + +public class ProcessorOptions { + + @Option(names = { "--processor-script" }, description = "Use an inline script to process items") + private String processorScript; + @Option(names = { "--processor-script-file" }, description = "Use an external script to process items") + private File processorScriptFile; + @Option(names = { + "--processor-script-language" }, description = "Language for the inline script (default: ${DEFAULT-VALUE})", paramLabel = "") + private String processorScriptLanguage = "ECMAScript"; + @Option(arity = "1..*", names = { + "--processor" }, description = "SpEL expression to process a field", paramLabel = "") + private Map fields; + @Option(arity = "1..*", names = { "-r", + "--regex" }, description = "Extract fields from a source field using a regular expression", paramLabel = "") + private Map regexes; + @Option(arity = "1..*", names = "--processor-variable", description = "Register a variable in the processor context", paramLabel = "") + private Map variables = new LinkedHashMap(); + @Option(names = "--processor-date-format", description = "java.text.SimpleDateFormat pattern for 'date' processor variable (default: ${DEFAULT-VALUE})", paramLabel = "") + private String dateFormat = new SimpleDateFormat().toPattern(); + + public ItemProcessor, Map> processor(RedisConnectionOptions redis) throws Exception { + List, Map>> processors = new ArrayList<>(); + if (regexes != null) { + processors.add(new RegexProcessor(regexes)); + } + if (processorScript != null || processorScriptFile != null) { + System.setProperty("nashorn.args", "--no-deprecation-warning"); + ScriptItemProcessorBuilder, Map> builder = new ScriptItemProcessorBuilder<>(); + builder.language(processorScriptLanguage); + if (processorScript != null) { + builder.scriptSource(processorScript); + } + if (processorScriptFile != null) { + builder.scriptResource(new FileSystemResource(processorScriptFile)); + } + ScriptItemProcessor, Map> processor = builder.build(); + processor.afterPropertiesSet(); + processors.add(processor); + } + if (fields != null) { + processors.add(new SpelProcessor(redis.redis(), new SimpleDateFormat(dateFormat), variables, fields)); + } + if (processors.isEmpty()) { + return null; + } + if (processors.size() == 1) { + return processors.get(0); + } + CompositeItemProcessor, Map> processor = new CompositeItemProcessor<>(); + processor.setDelegates(processors); + return processor; + } + +} diff --git a/src/main/java/com/redislabs/riot/cli/db/DatabaseExportCommand.java b/src/main/java/com/redislabs/riot/cli/db/DatabaseExportCommand.java index 1e7a5a13b..30f822275 100644 --- a/src/main/java/com/redislabs/riot/cli/db/DatabaseExportCommand.java +++ b/src/main/java/com/redislabs/riot/cli/db/DatabaseExportCommand.java @@ -13,14 +13,14 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; -import picocli.CommandLine.Parameters; +import picocli.CommandLine.Option; @Command(name = "db-export", description = "Export to database") public class DatabaseExportCommand extends ExportCommand { @Mixin private DatabaseOptions db; - @Parameters(arity = "1", description = "SQL statement e.g. \"INSERT INTO people (id, name) VALUES (:ssn, :name)\"", paramLabel = "") + @Option(required = true, names = "--sql", description = "SQL statement e.g. \"INSERT INTO people (id, name) VALUES (:ssn, :name)\"", paramLabel = "") private String sql; @Override diff --git a/src/main/java/com/redislabs/riot/cli/db/DatabaseImportCommand.java b/src/main/java/com/redislabs/riot/cli/db/DatabaseImportCommand.java index 04c17f166..929c0250c 100644 --- a/src/main/java/com/redislabs/riot/cli/db/DatabaseImportCommand.java +++ b/src/main/java/com/redislabs/riot/cli/db/DatabaseImportCommand.java @@ -12,14 +12,13 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; import picocli.CommandLine.Option; -import picocli.CommandLine.Parameters; @Command(name = "db-import", description = "Import database") public class DatabaseImportCommand extends ImportCommand { @Mixin private DatabaseOptions db; - @Parameters(arity = "1", description = "Select statement", paramLabel = "") + @Option(required = true, names = "--sql", description = "SELECT statement", paramLabel = "") private String sql; @Option(names = "--fetch", description = "A hint to the driver as to how many rows to return with each fetch", paramLabel = "") private Integer fetchSize; diff --git a/src/main/java/com/redislabs/riot/cli/file/FileOptions.java b/src/main/java/com/redislabs/riot/cli/file/FileOptions.java index e8c64f386..7a0d2eac8 100644 --- a/src/main/java/com/redislabs/riot/cli/file/FileOptions.java +++ b/src/main/java/com/redislabs/riot/cli/file/FileOptions.java @@ -18,11 +18,10 @@ import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; -import picocli.CommandLine.Parameters; public class FileOptions { - @Parameters(arity = "1", description = "File path") + @Option(required = true, names = "--file", description = "File path") private String path; @Option(names = { "-z", "--gzip" }, description = "File is gzip compressed") private boolean gzip; diff --git a/src/main/java/com/redislabs/riot/cli/generator/FakerGeneratorCommand.java b/src/main/java/com/redislabs/riot/cli/generator/FakerGeneratorCommand.java index 34b1b8742..f059d2ab9 100644 --- a/src/main/java/com/redislabs/riot/cli/generator/FakerGeneratorCommand.java +++ b/src/main/java/com/redislabs/riot/cli/generator/FakerGeneratorCommand.java @@ -17,12 +17,11 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Option; -import picocli.CommandLine.Parameters; @Command(name = "faker", description = "Import Faker-generated data") public class FakerGeneratorCommand extends ImportCommand { - @Parameters(paramLabel = "", description = "SpEL expression to generate a field") + @Option(required = true, arity = "1..*", names = "--fields", description = "SpEL expression to generate a field", paramLabel = "") private Map fields; @Option(names = { "-l", "--locale" }, description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "") diff --git a/src/main/java/com/redislabs/riot/cli/generator/SimpleGeneratorCommand.java b/src/main/java/com/redislabs/riot/cli/generator/SimpleGeneratorCommand.java index f4bc5af9b..b770a385b 100644 --- a/src/main/java/com/redislabs/riot/cli/generator/SimpleGeneratorCommand.java +++ b/src/main/java/com/redislabs/riot/cli/generator/SimpleGeneratorCommand.java @@ -8,12 +8,12 @@ import com.redislabs.riot.generator.SimpleGeneratorReader; import picocli.CommandLine.Command; -import picocli.CommandLine.Parameters; +import picocli.CommandLine.Option; @Command(name = "gen", description = "Import simple generated data") public class SimpleGeneratorCommand extends ImportCommand { - @Parameters(description = "Field sizes in bytes", paramLabel = "") + @Option(names = "--fields", arity = "1..*", description = "Field sizes in bytes", paramLabel = "") private Map fields = new LinkedHashMap<>(); @Override diff --git a/src/main/java/com/redislabs/riot/cli/redis/RedisConnectionOptions.java b/src/main/java/com/redislabs/riot/cli/redis/RedisConnectionOptions.java index efa4b600b..7a1b3b39b 100644 --- a/src/main/java/com/redislabs/riot/cli/redis/RedisConnectionOptions.java +++ b/src/main/java/com/redislabs/riot/cli/redis/RedisConnectionOptions.java @@ -7,18 +7,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemWriter; +import com.redislabs.lettusearch.RediSearchAsyncCommands; import com.redislabs.lettusearch.RediSearchClient; +import com.redislabs.lettusearch.StatefulRediSearchConnection; import com.redislabs.riot.redis.JedisClusterWriter; import com.redislabs.riot.redis.JedisWriter; -import com.redislabs.riot.redis.LettuSearchWriter; -import com.redislabs.riot.redis.LettuceClusterWriter; import com.redislabs.riot.redis.LettuceWriter; import com.redislabs.riot.redis.writer.AbstractRedisItemWriter; import com.redislabs.riot.redisearch.AbstractLettuSearchItemWriter; @@ -27,14 +29,21 @@ import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.SslOptions; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; import io.lettuce.core.event.DefaultEventPublisherOptions; import io.lettuce.core.event.metrics.CommandLatencyEvent; import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.lettuce.core.resource.DefaultClientResources.Builder; +import io.lettuce.core.support.ConnectionPoolSupport; +import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; @@ -74,14 +83,6 @@ public class RedisConnectionOptions { private RedisDriver driver = RedisDriver.lettuce; @Option(names = "--ssl", description = "SSL connection") private boolean ssl; - @Option(names = "--max-total", description = "Max connections that can be allocated by the pool at a given time. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "") - private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; - @Option(names = "--min-idle", description = "Min idle connections in pool. Only has an effect if >0 (default: ${DEFAULT-VALUE})", paramLabel = "") - private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; - @Option(names = "--max-idle", description = "Max idle connections in pool. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "") - private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; - @Option(names = "--max-wait", description = "Max duration a connection allocation should block before throwing an exception when pool is exhausted. Use negative value to block indefinitely (default: ${DEFAULT-VALUE})", paramLabel = "") - private long maxWait = GenericObjectPoolConfig.DEFAULT_MAX_WAIT_MILLIS; @Option(names = "--ssl-provider", description = "SSL Provider: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") private SslProvider sslProvider = SslProvider.Jdk; @Option(names = "--keystore", description = "Path to keystore", paramLabel = "") @@ -94,14 +95,32 @@ public class RedisConnectionOptions { private String truststorePassword; @Option(names = "--max-redirects", description = "Number of maximal cluster redirects (-MOVED and -ASK) to follow in case a key was moved from one node to another node (default: ${DEFAULT-VALUE})", paramLabel = "") private int maxRedirects = ClusterClientOptions.DEFAULT_MAX_REDIRECTS; + @ArgGroup(exclusive = false, heading = "Redis connection pool options%n") + private RedisConnectionPoolOptions poolOptions = new RedisConnectionPoolOptions(); + + static class RedisConnectionPoolOptions { + @Option(names = "--pool-max-total", description = "Max connections that can be allocated by the pool at a given time. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "") + private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL; + @Option(names = "--pool-min-idle", description = "Min idle connections in pool. Only has an effect if >0 (default: ${DEFAULT-VALUE})", paramLabel = "") + private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE; + @Option(names = "--pool-max-idle", description = "Max idle connections in pool. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "") + private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE; + @Option(names = "--pool-max-wait", description = "Max duration a connection allocation should block before throwing an exception when pool is exhausted. Use negative value to block indefinitely (default: ${DEFAULT-VALUE})", paramLabel = "") + private long maxWait = GenericObjectPoolConfig.DEFAULT_MAX_WAIT_MILLIS; + + @SuppressWarnings("rawtypes") + public T configure(T poolConfig) { + poolConfig.setMaxTotal(maxTotal); + poolConfig.setMaxIdle(maxIdle); + poolConfig.setMinIdle(minIdle); + poolConfig.setMaxWaitMillis(maxWait); + return poolConfig; + } + + public > GenericObjectPool pool(Supplier supplier) { + return ConnectionPoolSupport.createGenericObjectPool(supplier, configure(new GenericObjectPoolConfig<>())); + } - @SuppressWarnings("rawtypes") - private T configure(T poolConfig) { - poolConfig.setMaxTotal(maxTotal); - poolConfig.setMaxIdle(maxIdle); - poolConfig.setMinIdle(minIdle); - poolConfig.setMaxWaitMillis(maxWait); - return poolConfig; } private RedisURI uri() { @@ -145,7 +164,7 @@ private ClientResources resources() { } public Pool jedisPool() { - JedisPoolConfig poolConfig = configure(new JedisPoolConfig()); + JedisPoolConfig poolConfig = poolOptions.configure(new JedisPoolConfig()); if (sentinelMaster == null) { String host = endpoints.get(0).getHost(); int port = endpoints.get(0).getPort(); @@ -219,10 +238,6 @@ private SslOptions sslOptions() { return builder.build(); } - private GenericObjectPoolConfig poolConfig() { - return configure(new GenericObjectPoolConfig<>()); - } - public Object redis() { if (driver == RedisDriver.jedis) { return jedisPool().getResource(); @@ -236,7 +251,10 @@ public Object redis() { @SuppressWarnings({ "rawtypes", "unchecked" }) public ItemWriter> writer(AbstractRedisItemWriter itemWriter) { if (itemWriter instanceof AbstractLettuSearchItemWriter) { - return new LettuSearchWriter(rediSearchClient(), poolConfig(), (AbstractLettuSearchItemWriter) itemWriter); + RediSearchClient client = rediSearchClient(); + return new LettuceWriter, RediSearchAsyncCommands>( + client, client::getResources, poolOptions.pool(client::connect), itemWriter, + StatefulRediSearchConnection::async); } if (driver == RedisDriver.jedis) { if (cluster) { @@ -245,18 +263,24 @@ public ItemWriter> writer(AbstractRedisItemWriter itemWriter return new JedisWriter(jedisPool(), itemWriter); } if (cluster) { - return new LettuceClusterWriter(clusterClient(), poolConfig(), itemWriter); + RedisClusterClient client = clusterClient(); + return new LettuceWriter, RedisClusterAsyncCommands>( + client, client::getResources, poolOptions.pool(client::connect), itemWriter, + StatefulRedisClusterConnection::async); } - return new LettuceWriter(client(), poolConfig(), itemWriter); + RedisClient client = client(); + return new LettuceWriter, RedisAsyncCommands>(client, + client::getResources, poolOptions.pool(client::connect), itemWriter, StatefulRedisConnection::async); } private JedisCluster jedisCluster() { Set hostAndPort = new HashSet<>(); endpoints.forEach(node -> hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()))); + JedisPoolConfig poolConfig = poolOptions.configure(new JedisPoolConfig()); if (password == null) { - return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, poolConfig()); + return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, poolConfig); } - return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, password, poolConfig()); + return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, password, poolConfig); } } diff --git a/src/main/java/com/redislabs/riot/redis/AbstractLettuceWriter.java b/src/main/java/com/redislabs/riot/redis/AbstractLettuceWriter.java deleted file mode 100644 index 9d0defd28..000000000 --- a/src/main/java/com/redislabs/riot/redis/AbstractLettuceWriter.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.redislabs.riot.redis; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.item.ExecutionContext; -import org.springframework.batch.item.support.AbstractItemStreamItemWriter; -import org.springframework.util.ClassUtils; - -import com.redislabs.riot.redis.writer.LettuceItemWriter; - -import io.lettuce.core.RedisFuture; -import io.lettuce.core.api.StatefulConnection; -import io.lettuce.core.api.async.BaseRedisAsyncCommands; -import io.lettuce.core.support.ConnectionPoolSupport; - -public abstract class AbstractLettuceWriter, C extends BaseRedisAsyncCommands> - extends AbstractItemStreamItemWriter> { - - private final Logger log = LoggerFactory.getLogger(AbstractLettuceWriter.class); - - private GenericObjectPoolConfig poolConfig; - private Supplier supplier; - private GenericObjectPool pool; - private LettuceItemWriter writer; - - public AbstractLettuceWriter(GenericObjectPoolConfig poolConfig, Supplier supplier, - LettuceItemWriter writer) { - setName(ClassUtils.getShortName(this.getClass())); - this.poolConfig = poolConfig; - this.supplier = supplier; - this.writer = writer; - } - - @Override - public void open(ExecutionContext executionContext) { - log.debug("Creating Lettuce pool {}", poolConfig); - this.pool = ConnectionPoolSupport.createGenericObjectPool(supplier, poolConfig); - super.open(executionContext); - } - - @Override - public void write(List> items) throws Exception { - S connection = pool.borrowObject(); - List> futures = new ArrayList<>(); - try { - C commands = commands(connection); - commands.setAutoFlushCommands(false); - for (Map item : items) { - RedisFuture future = writer.write(commands, item); - if (future != null) { - futures.add(future); - } - } - commands.flushCommands(); - for (int index = 0; index < futures.size(); index++) { - RedisFuture future = futures.get(index); - try { - future.get(1, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("Could not write record {}: {}", items.get(index), future.getError()); - } - } - } finally { - pool.returnObject(connection); - } - } - - protected abstract C commands(S connection); - - @Override - public void close() { - // Take care of multi-threaded writer by only closing on the last call - if (pool != null && pool.getNumActive() == 0) { - log.debug("Closing pool"); - pool.close(); - shutdownClient(); - pool = null; - } - super.close(); - } - - protected abstract void shutdownClient(); - -} diff --git a/src/main/java/com/redislabs/riot/redis/LettuSearchWriter.java b/src/main/java/com/redislabs/riot/redis/LettuSearchWriter.java deleted file mode 100644 index 8ccbfd68b..000000000 --- a/src/main/java/com/redislabs/riot/redis/LettuSearchWriter.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.redislabs.riot.redis; - -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; - -import com.redislabs.lettusearch.RediSearchAsyncCommands; -import com.redislabs.lettusearch.RediSearchClient; -import com.redislabs.lettusearch.StatefulRediSearchConnection; -import com.redislabs.riot.redisearch.AbstractLettuSearchItemWriter; - -public class LettuSearchWriter extends - AbstractLettuceWriter, RediSearchAsyncCommands> { - - private RediSearchClient client; - - public LettuSearchWriter(RediSearchClient client, - GenericObjectPoolConfig> poolConfig, - AbstractLettuSearchItemWriter writer) { - super(poolConfig, client::connect, writer); - this.client = client; - } - - @Override - protected void shutdownClient() { - client.shutdown(); - client.getResources().shutdown(); - } - - @Override - protected RediSearchAsyncCommands commands( - StatefulRediSearchConnection connection) { - return connection.async(); - } - -} diff --git a/src/main/java/com/redislabs/riot/redis/LettuceClusterWriter.java b/src/main/java/com/redislabs/riot/redis/LettuceClusterWriter.java deleted file mode 100644 index 2061fa930..000000000 --- a/src/main/java/com/redislabs/riot/redis/LettuceClusterWriter.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.redislabs.riot.redis; - -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; - -import com.redislabs.riot.redis.writer.LettuceItemWriter; - -import io.lettuce.core.cluster.RedisClusterClient; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; - -public class LettuceClusterWriter extends - AbstractLettuceWriter, RedisClusterAsyncCommands> { - - private RedisClusterClient client; - - public LettuceClusterWriter(RedisClusterClient client, - GenericObjectPoolConfig> poolConfig, - LettuceItemWriter> writer) { - super(poolConfig, client::connect, writer); - this.client = client; - } - - @Override - protected void shutdownClient() { - client.shutdown(); - client.getResources().shutdown(); - } - - @Override - protected RedisClusterAsyncCommands commands( - StatefulRedisClusterConnection connection) { - return connection.async(); - } - -} diff --git a/src/main/java/com/redislabs/riot/redis/LettuceWriter.java b/src/main/java/com/redislabs/riot/redis/LettuceWriter.java index e327b85cd..ac13d4aa4 100644 --- a/src/main/java/com/redislabs/riot/redis/LettuceWriter.java +++ b/src/main/java/com/redislabs/riot/redis/LettuceWriter.java @@ -1,33 +1,91 @@ package com.redislabs.riot.redis; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.support.AbstractItemStreamItemWriter; +import org.springframework.util.ClassUtils; import com.redislabs.riot.redis.writer.LettuceItemWriter; -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.async.BaseRedisAsyncCommands; +import io.lettuce.core.resource.ClientResources; + +public class LettuceWriter, C extends BaseRedisAsyncCommands> + extends AbstractItemStreamItemWriter> { -public class LettuceWriter - extends AbstractLettuceWriter, RedisAsyncCommands> { + private final Logger log = LoggerFactory.getLogger(LettuceWriter.class); - private RedisClient client; + private AbstractRedisClient client; + private Supplier resources; + private GenericObjectPool pool; + private LettuceItemWriter writer; + private Function async; - public LettuceWriter(RedisClient client, - GenericObjectPoolConfig> poolConfig, - LettuceItemWriter> writer) { - super(poolConfig, client::connect, writer); + public LettuceWriter(AbstractRedisClient client, Supplier resources, GenericObjectPool pool, + LettuceItemWriter writer, Function async) { + setName(ClassUtils.getShortName(this.getClass())); this.client = client; + this.resources = resources; + this.pool = pool; + this.writer = writer; + this.async = async; + } + + @Override + public void open(ExecutionContext executionContext) { + super.open(executionContext); } @Override - protected void shutdownClient() { - client.shutdown(); - client.getResources().shutdown(); + public void write(List> items) throws Exception { + S connection = pool.borrowObject(); + List> futures = new ArrayList<>(); + try { + C commands = async.apply(connection); + commands.setAutoFlushCommands(false); + for (Map item : items) { + RedisFuture future = writer.write(commands, item); + if (future != null) { + futures.add(future); + } + } + commands.flushCommands(); + for (int index = 0; index < futures.size(); index++) { + RedisFuture future = futures.get(index); + try { + future.get(1, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Could not write record {}: {}", items.get(index), future.getError()); + } + } + } finally { + pool.returnObject(connection); + } } - protected RedisAsyncCommands commands(StatefulRedisConnection connection) { - return connection.async(); + @Override + public void close() { + // Take care of multi-threaded writer by only closing on the last call + if (pool != null && pool.getNumActive() == 0) { + log.debug("Closing pool"); + pool.close(); + pool = null; + client.shutdown(); + resources.get().shutdown(); + } + super.close(); } } diff --git a/src/test/java/com/redislabs/riot/TestFile.java b/src/test/java/com/redislabs/riot/TestFile.java index 5b12f2a5a..3779a347f 100644 --- a/src/test/java/com/redislabs/riot/TestFile.java +++ b/src/test/java/com/redislabs/riot/TestFile.java @@ -40,7 +40,7 @@ public class TestFile extends BaseTest { @Test - public void testExportBeersCsv() throws UnexpectedInputException, ParseException, Exception { + public void testExportCsv() throws UnexpectedInputException, ParseException, Exception { File file = new File("/tmp/beers.csv"); file.delete(); runFile("file-import-json-hash"); @@ -76,7 +76,7 @@ private List readAll(AbstractItemCountingItemStreamItemReader reader) @SuppressWarnings("rawtypes") @Test - public void testExportBeersJson() throws UnexpectedInputException, ParseException, Exception { + public void testExportJson() throws UnexpectedInputException, ParseException, Exception { File file = new File("/tmp/beers.json"); file.delete(); runFile("file-import-json-hash"); @@ -94,7 +94,7 @@ public void testExportBeersJson() throws UnexpectedInputException, ParseExceptio @SuppressWarnings("rawtypes") @Test - public void testExportBeersJsonGz() throws UnexpectedInputException, ParseException, Exception { + public void testExportJsonGz() throws UnexpectedInputException, ParseException, Exception { File file = new File("/tmp/beers.json.gz"); file.delete(); runFile("file-import-json-hash"); @@ -113,14 +113,14 @@ public void testExportBeersJsonGz() throws UnexpectedInputException, ParseExcept } @Test - public void testImportCsv() throws Exception { + public void testImportCsvHash() throws Exception { runFile("file-import-csv-hash"); List keys = commands().keys("beer:*"); Assertions.assertEquals(BEER_COUNT, keys.size()); } @Test - public void testImportBeersSearch() throws Exception { + public void testImportCsvSearch() throws Exception { String FIELD_ABV = "abv"; String FIELD_NAME = "name"; String FIELD_STYLE = "style"; @@ -139,20 +139,20 @@ public void testImportBeersSearch() throws Exception { } @Test - public void testImportAirportsSearch() throws Exception { + public void testImportCsvProcessorSearchGeo() throws Exception { String INDEX = "airports"; commands().flushall(); SchemaBuilder schema = Schema.builder(); schema.field(TextField.builder().name("Name").sortable(true).build()); schema.field(GeoField.builder().name("Location").sortable(true).build()); commands().create(INDEX, schema.build()); - runFile("file-import-csv-geosearch"); + runFile("file-import-csv-processor-search-geo"); SearchResults results = commands().search(INDEX, "@Location:[-77 38 50 mi]"); Assertions.assertEquals(3, results.getCount()); } @Test - public void testImportAirports() throws Exception { + public void testImportCsvGeo() throws Exception { runFile("file-import-csv-geo"); Set results = commands().georadius("airportgeo", -122.4194, 37.7749, 20, Unit.mi); Assertions.assertTrue(results.contains("3469")); @@ -163,7 +163,7 @@ public void testImportAirports() throws Exception { @Test public void testImportElasticacheJson() throws Exception { String url = getClass().getClassLoader().getResource("es_test-index.json").getFile(); - runCommand("file-import %s --keyspace estest --keys _id", url); + runCommand("file-import --file %s --keyspace estest --keys _id", url); Assertions.assertEquals(2, commands().keys("estest:*").size()); Map doc1 = commands().hgetall("estest:doc1"); Assertions.assertEquals("ruan", doc1.get("_source.name")); @@ -171,7 +171,7 @@ public void testImportElasticacheJson() throws Exception { } @Test - public void testImportBeersJson() throws Exception { + public void testImportJsonHash() throws Exception { runFile("file-import-json-hash"); List keys = commands().keys("beer:*"); Assertions.assertEquals(4432, keys.size()); @@ -180,8 +180,8 @@ public void testImportBeersJson() throws Exception { } @Test - public void testImportCsvHashDate() throws Exception { - runFile("file-import-csv-hash-date"); + public void testImportCsvProcessorHashDateFormat() throws Exception { + runFile("file-import-csv-processor-hash-dateformat"); List keys = commands().keys("event:*"); Assertions.assertEquals(568, keys.size()); Map event = commands().hgetall("event:248206"); @@ -189,7 +189,7 @@ public void testImportCsvHashDate() throws Exception { } @Test - public void testImportLAEvents() throws Exception { + public void testImportCsvProcessorSearch() throws Exception { String INDEX = "laevents"; commands().flushall(); SchemaBuilder schema = Schema.builder(); @@ -198,7 +198,7 @@ public void testImportLAEvents() throws Exception { schema.field(NumericField.builder().name("kat").build()); schema.field(GeoField.builder().name("location").sortable(true).build()); commands().create(INDEX, schema.build()); - runFile("file-import-laevents"); + runFile("file-import-csv-processor-search"); SearchResults results = commands().search(INDEX, "@location:[-118.446014 33.998415 10 mi]"); Assertions.assertTrue(results.getCount() > 0); for (SearchResult result : results) { diff --git a/src/test/resources/commands/db-export.txt b/src/test/resources/commands/db-export.txt index 39ef762fd..a1eaff31b 100644 --- a/src/test/resources/commands/db-export.txt +++ b/src/test/resources/commands/db-export.txt @@ -1 +1 @@ -$ riot db-export --scan-keyspace beer --scan-keys id --driver org.hsqldb.jdbc.JDBCDriver --url jdbc:hsqldb:mem:mymemdb "INSERT INTO beers (id, name) VALUES (:id, :name)" \ No newline at end of file +$ riot db-export --scan-keyspace beer --scan-keys id --driver org.hsqldb.jdbc.JDBCDriver --url jdbc:hsqldb:mem:mymemdb --sql "INSERT INTO beers (id, name) VALUES (:id, :name)" \ No newline at end of file diff --git a/src/test/resources/commands/db-import.txt b/src/test/resources/commands/db-import.txt index 1581be57c..0553bc947 100644 --- a/src/test/resources/commands/db-import.txt +++ b/src/test/resources/commands/db-import.txt @@ -1 +1 @@ -$ riot db-import -p dbbeer -k id --driver org.hsqldb.jdbc.JDBCDriver --url jdbc:hsqldb:mem:mymemdb "SELECT * FROM beers" +$ riot db-import -p dbbeer -k id --driver org.hsqldb.jdbc.JDBCDriver --url jdbc:hsqldb:mem:mymemdb --sql "SELECT * FROM beers" diff --git a/src/test/resources/commands/file-export-csv.txt b/src/test/resources/commands/file-export-csv.txt index f29824a29..bfc2335fd 100644 --- a/src/test/resources/commands/file-export-csv.txt +++ b/src/test/resources/commands/file-export-csv.txt @@ -1 +1 @@ -$ riot file-export /tmp/beers.csv --scan-keyspace beer --scan-keys id -f id name brewery_id abv --header -d | \ No newline at end of file +$ riot file-export --file /tmp/beers.csv --scan-keyspace beer --scan-keys id -f id name brewery_id abv --header -d | \ No newline at end of file diff --git a/src/test/resources/commands/file-export-json.txt b/src/test/resources/commands/file-export-json.txt index c42ea38c8..ba4200199 100644 --- a/src/test/resources/commands/file-export-json.txt +++ b/src/test/resources/commands/file-export-json.txt @@ -1 +1 @@ -$ riot file-export /tmp/beers.json --scan-keyspace beer --scan-keys id -f id name brewery_id abv \ No newline at end of file +$ riot file-export --file /tmp/beers.json --scan-keyspace beer --scan-keys id -f id name brewery_id abv \ No newline at end of file diff --git a/src/test/resources/commands/file-export-json_gz.txt b/src/test/resources/commands/file-export-json_gz.txt index c4e1562da..43488b536 100644 --- a/src/test/resources/commands/file-export-json_gz.txt +++ b/src/test/resources/commands/file-export-json_gz.txt @@ -1 +1 @@ -$ riot file-export /tmp/beers.json.gz --scan-keyspace beer --scan-keys id -f id name brewery_id abv \ No newline at end of file +$ riot file-export --file /tmp/beers.json.gz --scan-keyspace beer --scan-keys id -f id name brewery_id abv \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-geo.txt b/src/test/resources/commands/file-import-csv-geo.txt index d9ffa2731..40ab2fe03 100644 --- a/src/test/resources/commands/file-import-csv-geo.txt +++ b/src/test/resources/commands/file-import-csv-geo.txt @@ -1 +1 @@ -$ riot file-import https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat -c geoadd -p airportgeo --members AirportID --geo-lon Longitude --geo-lat Latitude -t csv -f AirportID Name City Country IATA ICAO Latitude Longitude Altitude Timezone DST Tz Type Source \ No newline at end of file +$ riot file-import --file https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat -c geoadd -p airportgeo --members AirportID --geo-lon Longitude --geo-lat Latitude -t csv -f AirportID Name City Country IATA ICAO Latitude Longitude Altitude Timezone DST Tz Type Source \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-geosearch.txt b/src/test/resources/commands/file-import-csv-geosearch.txt deleted file mode 100644 index 5b132bedd..000000000 --- a/src/test/resources/commands/file-import-csv-geosearch.txt +++ /dev/null @@ -1 +0,0 @@ -$ riot file-import https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat -c ftadd --index airports -k AirportID -t csv -f AirportID Name City Country IATA ICAO Latitude Longitude Altitude Timezone DST Tz Type Source --processor "Location=#geo(Longitude,Latitude)" \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-hash-date.txt b/src/test/resources/commands/file-import-csv-hash-date.txt deleted file mode 100644 index 141ff7cf1..000000000 --- a/src/test/resources/commands/file-import-csv-hash-date.txt +++ /dev/null @@ -1 +0,0 @@ -$ riot file-import -p event -k Id --header --processor-date-format "MM/dd/yyyy HH:mm:ss a" --processor "EventStartDate=remove('Event Start Date')" --processor "EpochStart=#date.parse(EventStartDate).getTime()" "https://data.lacity.org/api/views/rx9t-fp7k/rows.csv?accessType=DOWNLOAD" \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-hash.txt b/src/test/resources/commands/file-import-csv-hash.txt index 7e222a618..46957de27 100644 --- a/src/test/resources/commands/file-import-csv-hash.txt +++ b/src/test/resources/commands/file-import-csv-hash.txt @@ -1 +1 @@ -$ riot file-import -p beer -k id --header https://git.io/fjxPs \ No newline at end of file +$ riot file-import -p beer -k id --header --file https://git.io/fjxPs \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-processor-hash-dateformat.txt b/src/test/resources/commands/file-import-csv-processor-hash-dateformat.txt new file mode 100644 index 000000000..bbecebde2 --- /dev/null +++ b/src/test/resources/commands/file-import-csv-processor-hash-dateformat.txt @@ -0,0 +1 @@ +$ riot file-import -p event -k Id --header --processor-date-format "MM/dd/yyyy HH:mm:ss a" --processor "EventStartDate=remove('Event Start Date')" "EpochStart=#date.parse(EventStartDate).getTime()" --file "https://data.lacity.org/api/views/rx9t-fp7k/rows.csv?accessType=DOWNLOAD" \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-processor-search-geo.txt b/src/test/resources/commands/file-import-csv-processor-search-geo.txt new file mode 100644 index 000000000..0612e2833 --- /dev/null +++ b/src/test/resources/commands/file-import-csv-processor-search-geo.txt @@ -0,0 +1 @@ +$ riot file-import --file https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat -c ftadd --index airports -k AirportID -t csv -f AirportID Name City Country IATA ICAO Latitude Longitude Altitude Timezone DST Tz Type Source --processor "Location=#geo(Longitude,Latitude)" \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-processor-search.txt b/src/test/resources/commands/file-import-csv-processor-search.txt new file mode 100644 index 000000000..978a1ecb1 --- /dev/null +++ b/src/test/resources/commands/file-import-csv-processor-search.txt @@ -0,0 +1 @@ +$ riot file-import --file "https://data.lacity.org/api/views/rx9t-fp7k/rows.csv?accessType=DOWNLOAD" -c ftadd -i laevents -k Id --header --regex 'Event Location'="\((?.+),\s+(?.+)\)" --processor location=#geo(lon,lat) \ No newline at end of file diff --git a/src/test/resources/commands/file-import-csv-search.txt b/src/test/resources/commands/file-import-csv-search.txt index 50f010139..959f68513 100644 --- a/src/test/resources/commands/file-import-csv-search.txt +++ b/src/test/resources/commands/file-import-csv-search.txt @@ -1 +1 @@ -$ riot file-import --include 1 3 4 5 6 -c ftadd -i beers -k id --header https://git.io/fjxPs \ No newline at end of file +$ riot file-import --include 1 3 4 5 6 -c ftadd -i beers -k id --header --file https://git.io/fjxPs \ No newline at end of file diff --git a/src/test/resources/commands/file-import-json-hash.txt b/src/test/resources/commands/file-import-json-hash.txt index ec0cbaee0..ab7310620 100644 --- a/src/test/resources/commands/file-import-json-hash.txt +++ b/src/test/resources/commands/file-import-json-hash.txt @@ -1 +1 @@ -$ riot file-import -p beer -k id -t json https://git.io/fjxPZ \ No newline at end of file +$ riot file-import -p beer -k id -t json --file https://git.io/fjxPZ \ No newline at end of file diff --git a/src/test/resources/commands/file-import-laevents.txt b/src/test/resources/commands/file-import-laevents.txt deleted file mode 100644 index 0e4594ddc..000000000 --- a/src/test/resources/commands/file-import-laevents.txt +++ /dev/null @@ -1 +0,0 @@ -$ riot file-import "https://data.lacity.org/api/views/rx9t-fp7k/rows.csv?accessType=DOWNLOAD" -c ftadd -i laevents -k Id --header --regex 'Event Location'="\((?.+),\s+(?.+)\)" --processor location=#geo(lon,lat) \ No newline at end of file diff --git a/src/test/resources/commands/gen-faker-hash.txt b/src/test/resources/commands/gen-faker-hash.txt index b88c65c5f..a4b452eb5 100644 --- a/src/test/resources/commands/gen-faker-hash.txt +++ b/src/test/resources/commands/gen-faker-hash.txt @@ -1 +1 @@ -$ riot faker id=index firstName=name.firstName lastName=name.lastName address=address.fullAddress --max 100 -p person -k id \ No newline at end of file +$ riot faker --fields id=index firstName=name.firstName lastName=name.lastName address=address.fullAddress --max 100 -p person -k id \ No newline at end of file diff --git a/src/test/resources/commands/gen-faker-set.txt b/src/test/resources/commands/gen-faker-set.txt index 5af40acc0..53fe5b38d 100644 --- a/src/test/resources/commands/gen-faker-set.txt +++ b/src/test/resources/commands/gen-faker-set.txt @@ -1 +1 @@ -$ riot faker name=gameOfThrones.character --max 10000 -c sadd --members name -p got:characters \ No newline at end of file +$ riot faker --fields name=gameOfThrones.character --max 10000 -c sadd --members name -p got:characters \ No newline at end of file diff --git a/src/test/resources/commands/gen-faker-stream.txt b/src/test/resources/commands/gen-faker-stream.txt index 68b28fdc1..1b8577991 100644 --- a/src/test/resources/commands/gen-faker-stream.txt +++ b/src/test/resources/commands/gen-faker-stream.txt @@ -1 +1 @@ -$ riot faker id=index category=number.randomDigit -b 50 -m 1000 -c xadd -p teststream -k category \ No newline at end of file +$ riot faker --fields id=index category=number.randomDigit -b 50 -m 1000 -c xadd -p teststream -k category \ No newline at end of file diff --git a/src/test/resources/commands/gen-faker-zset.txt b/src/test/resources/commands/gen-faker-zset.txt index 5207341fb..4932d3afd 100644 --- a/src/test/resources/commands/gen-faker-zset.txt +++ b/src/test/resources/commands/gen-faker-zset.txt @@ -1 +1 @@ -$ riot faker ip=number.digits(4) lease=number.digits(2) time=number.digits(5) -b 50 -m 10000 -c zadd -p leases -k ip -f lease --score=time \ No newline at end of file +$ riot faker --fields ip=number.digits(4) lease=number.digits(2) time=number.digits(5) -b 50 -m 10000 -c zadd -p leases -k ip -f lease --score=time \ No newline at end of file diff --git a/src/test/resources/commands/gen-simple.txt b/src/test/resources/commands/gen-simple.txt index aa6b48366..630fc1844 100644 --- a/src/test/resources/commands/gen-simple.txt +++ b/src/test/resources/commands/gen-simple.txt @@ -1 +1 @@ -$ riot gen field1=100 field2=1000 -b 100 --threads 3 -m 10000 -p test -k index \ No newline at end of file +$ riot -d gen --fields field1=100 field2=1000 -b 100 --threads 3 -m 10000 -p test -k index \ No newline at end of file