Skip to content

Commit

Permalink
fixed connection pool in multithreaded mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 17, 2019
1 parent 7dde43b commit 57f4020
Show file tree
Hide file tree
Showing 34 changed files with 243 additions and 283 deletions.
4 changes: 2 additions & 2 deletions src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ riot --metrics …
.Strings
[source,shell]
----
riot --host redis-12000.redislabs.com --port 12000 --max-total 96 gen simple value=100 --max 100000000 --batch 500 --threads 96 -r string --string-format raw --keyspace string --keys=index --value value
$ riot -s redis-12000.redislabs.com:12000 --pool-max-total 96 gen --batch 500 --threads 96 --max 100000000 --fields value=100 -c set --string-format raw --keyspace string --keys index --string-value value
----
image::images/rs-strings.png[]

.Streams
[source,shell]
----
riot --host redis-12000.redislabs.com --port 12000 --max-total 96 gen simple --max 100000000 --batch 500 --threads 96 -r stream --keyspace stream --keys partition
$ riot -s redis-12000.redislabs.com:12000 --pool-max-total 96 gen --batch 500 --threads 96 --max 100000000 -c xadd --keyspace stream --keys partition
----
image::images/rs-streams.png[]
39 changes: 3 additions & 36 deletions src/main/java/com/redislabs/riot/cli/ImportCommand.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package com.redislabs.riot.cli;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;

import com.redislabs.riot.batch.RegexProcessor;
import com.redislabs.riot.batch.SpelProcessor;
import com.redislabs.riot.cli.redis.RediSearchCommandOptions;
import com.redislabs.riot.cli.redis.RedisCommandOptions;
import com.redislabs.riot.cli.redis.RedisKeyOptions;
Expand All @@ -32,39 +26,12 @@ public abstract class ImportCommand extends TransferCommand<Map<String, Object>,
private RedisCommandOptions redis = new RedisCommandOptions();
@ArgGroup(exclusive = false, heading = "RediSearch command options%n")
private RediSearchCommandOptions search = new RediSearchCommandOptions();
@Option(names = {
"--processor" }, description = "SpEL expression to process a field", paramLabel = "<name=expression>")
private Map<String, String> fields;
@Option(names = { "-r",
"--regex" }, description = "Extract fields from a source field using a regular expression", paramLabel = "<source=regex>")
private Map<String, String> regexes;
@Option(names = "--processor-variable", description = "Register a variable in the processor context", paramLabel = "<name=expression>")
private Map<String, String> variables = new LinkedHashMap<String, String>();
@Option(names = "--processor-date-format", description = "java.text.SimpleDateFormat pattern for 'date' processor variable (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
private String dateFormat = new SimpleDateFormat().toPattern();
@ArgGroup(exclusive = false, heading = "Processor options%n")
private ProcessorOptions processorOptions = new ProcessorOptions();

@Override
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() throws Exception {
if (fields == null) {
if (regexes == null) {
return null;
}
return regexProcessor();
}
if (regexes == null) {
return spelProcessor();
}
CompositeItemProcessor<Map<String, Object>, Map<String, Object>> processor = new CompositeItemProcessor<>();
processor.setDelegates(Arrays.asList(regexProcessor(), spelProcessor()));
return processor;
}

private ItemProcessor<Map<String, Object>, Map<String, Object>> regexProcessor() {
return new RegexProcessor(regexes);
}

private ItemProcessor<Map<String, Object>, Map<String, Object>> spelProcessor() {
return new SpelProcessor(getRedisOptions().redis(), new SimpleDateFormat(dateFormat), variables, fields);
return processorOptions.processor(getRedisOptions());
}

@Override
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/com/redislabs/riot/cli/ProcessorOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.redislabs.riot.cli;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.ScriptItemProcessor;
import org.springframework.batch.item.support.builder.ScriptItemProcessorBuilder;
import org.springframework.core.io.FileSystemResource;

import com.redislabs.riot.batch.RegexProcessor;
import com.redislabs.riot.batch.SpelProcessor;
import com.redislabs.riot.cli.redis.RedisConnectionOptions;

import picocli.CommandLine.Option;

public class ProcessorOptions {

@Option(names = { "--processor-script" }, description = "Use an inline script to process items")
private String processorScript;
@Option(names = { "--processor-script-file" }, description = "Use an external script to process items")
private File processorScriptFile;
@Option(names = {
"--processor-script-language" }, description = "Language for the inline script (default: ${DEFAULT-VALUE})", paramLabel = "<lang>")
private String processorScriptLanguage = "ECMAScript";
@Option(arity = "1..*", names = {
"--processor" }, description = "SpEL expression to process a field", paramLabel = "<name=expression>")
private Map<String, String> fields;
@Option(arity = "1..*", names = { "-r",
"--regex" }, description = "Extract fields from a source field using a regular expression", paramLabel = "<source=regex>")
private Map<String, String> regexes;
@Option(arity = "1..*", names = "--processor-variable", description = "Register a variable in the processor context", paramLabel = "<name=expression>")
private Map<String, String> variables = new LinkedHashMap<String, String>();
@Option(names = "--processor-date-format", description = "java.text.SimpleDateFormat pattern for 'date' processor variable (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
private String dateFormat = new SimpleDateFormat().toPattern();

public ItemProcessor<Map<String, Object>, Map<String, Object>> processor(RedisConnectionOptions redis) throws Exception {
List<ItemProcessor<Map<String, Object>, Map<String, Object>>> processors = new ArrayList<>();
if (regexes != null) {
processors.add(new RegexProcessor(regexes));
}
if (processorScript != null || processorScriptFile != null) {
System.setProperty("nashorn.args", "--no-deprecation-warning");
ScriptItemProcessorBuilder<Map<String, Object>, Map<String, Object>> builder = new ScriptItemProcessorBuilder<>();
builder.language(processorScriptLanguage);
if (processorScript != null) {
builder.scriptSource(processorScript);
}
if (processorScriptFile != null) {
builder.scriptResource(new FileSystemResource(processorScriptFile));
}
ScriptItemProcessor<Map<String, Object>, Map<String, Object>> processor = builder.build();
processor.afterPropertiesSet();
processors.add(processor);
}
if (fields != null) {
processors.add(new SpelProcessor(redis.redis(), new SimpleDateFormat(dateFormat), variables, fields));
}
if (processors.isEmpty()) {
return null;
}
if (processors.size() == 1) {
return processors.get(0);
}
CompositeItemProcessor<Map<String, Object>, Map<String, Object>> processor = new CompositeItemProcessor<>();
processor.setDelegates(processors);
return processor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Parameters;
import picocli.CommandLine.Option;

@Command(name = "db-export", description = "Export to database")
public class DatabaseExportCommand extends ExportCommand {

@Mixin
private DatabaseOptions db;
@Parameters(arity = "1", description = "SQL statement e.g. \"INSERT INTO people (id, name) VALUES (:ssn, :name)\"", paramLabel = "<sql>")
@Option(required = true, names = "--sql", description = "SQL statement e.g. \"INSERT INTO people (id, name) VALUES (:ssn, :name)\"", paramLabel = "<sql>")
private String sql;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

@Command(name = "db-import", description = "Import database")
public class DatabaseImportCommand extends ImportCommand {

@Mixin
private DatabaseOptions db;
@Parameters(arity = "1", description = "Select statement", paramLabel = "<sql>")
@Option(required = true, names = "--sql", description = "SELECT statement", paramLabel = "<sql>")
private String sql;
@Option(names = "--fetch", description = "A hint to the driver as to how many rows to return with each fetch", paramLabel = "<size>")
private Integer fetchSize;
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/redislabs/riot/cli/file/FileOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

public class FileOptions {

@Parameters(arity = "1", description = "File path")
@Option(required = true, names = "--file", description = "File path")
private String path;
@Option(names = { "-z", "--gzip" }, description = "File is gzip compressed")
private boolean gzip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

@Command(name = "faker", description = "Import Faker-generated data")
public class FakerGeneratorCommand extends ImportCommand {

@Parameters(paramLabel = "<name=SpEL>", description = "SpEL expression to generate a field")
@Option(required = true, arity = "1..*", names = "--fields", description = "SpEL expression to generate a field", paramLabel = "<name=SpEL>")
private Map<String, String> fields;
@Option(names = { "-l",
"--locale" }, description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "<tag>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import com.redislabs.riot.generator.SimpleGeneratorReader;

import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;
import picocli.CommandLine.Option;

@Command(name = "gen", description = "Import simple generated data")
public class SimpleGeneratorCommand extends ImportCommand {

@Parameters(description = "Field sizes in bytes", paramLabel = "<field=size>")
@Option(names = "--fields", arity = "1..*", description = "Field sizes in bytes", paramLabel = "<field=size>")
private Map<String, Integer> fields = new LinkedHashMap<>();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;

import com.redislabs.lettusearch.RediSearchAsyncCommands;
import com.redislabs.lettusearch.RediSearchClient;
import com.redislabs.lettusearch.StatefulRediSearchConnection;
import com.redislabs.riot.redis.JedisClusterWriter;
import com.redislabs.riot.redis.JedisWriter;
import com.redislabs.riot.redis.LettuSearchWriter;
import com.redislabs.riot.redis.LettuceClusterWriter;
import com.redislabs.riot.redis.LettuceWriter;
import com.redislabs.riot.redis.writer.AbstractRedisItemWriter;
import com.redislabs.riot.redisearch.AbstractLettuSearchItemWriter;
Expand All @@ -27,14 +29,21 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslOptions;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.event.DefaultEventPublisherOptions;
import io.lettuce.core.event.metrics.CommandLatencyEvent;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import io.lettuce.core.resource.DefaultClientResources.Builder;
import io.lettuce.core.support.ConnectionPoolSupport;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
Expand Down Expand Up @@ -74,14 +83,6 @@ public class RedisConnectionOptions {
private RedisDriver driver = RedisDriver.lettuce;
@Option(names = "--ssl", description = "SSL connection")
private boolean ssl;
@Option(names = "--max-total", description = "Max connections that can be allocated by the pool at a given time. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
@Option(names = "--min-idle", description = "Min idle connections in pool. Only has an effect if >0 (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
@Option(names = "--max-idle", description = "Max idle connections in pool. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
@Option(names = "--max-wait", description = "Max duration a connection allocation should block before throwing an exception when pool is exhausted. Use negative value to block indefinitely (default: ${DEFAULT-VALUE})", paramLabel = "<millis>")
private long maxWait = GenericObjectPoolConfig.DEFAULT_MAX_WAIT_MILLIS;
@Option(names = "--ssl-provider", description = "SSL Provider: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
private SslProvider sslProvider = SslProvider.Jdk;
@Option(names = "--keystore", description = "Path to keystore", paramLabel = "<file>")
Expand All @@ -94,14 +95,32 @@ public class RedisConnectionOptions {
private String truststorePassword;
@Option(names = "--max-redirects", description = "Number of maximal cluster redirects (-MOVED and -ASK) to follow in case a key was moved from one node to another node (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int maxRedirects = ClusterClientOptions.DEFAULT_MAX_REDIRECTS;
@ArgGroup(exclusive = false, heading = "Redis connection pool options%n")
private RedisConnectionPoolOptions poolOptions = new RedisConnectionPoolOptions();

static class RedisConnectionPoolOptions {
@Option(names = "--pool-max-total", description = "Max connections that can be allocated by the pool at a given time. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
@Option(names = "--pool-min-idle", description = "Min idle connections in pool. Only has an effect if >0 (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
@Option(names = "--pool-max-idle", description = "Max idle connections in pool. Use negative value for no limit (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
@Option(names = "--pool-max-wait", description = "Max duration a connection allocation should block before throwing an exception when pool is exhausted. Use negative value to block indefinitely (default: ${DEFAULT-VALUE})", paramLabel = "<millis>")
private long maxWait = GenericObjectPoolConfig.DEFAULT_MAX_WAIT_MILLIS;

@SuppressWarnings("rawtypes")
public <T extends GenericObjectPoolConfig> T configure(T poolConfig) {
poolConfig.setMaxTotal(maxTotal);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMinIdle(minIdle);
poolConfig.setMaxWaitMillis(maxWait);
return poolConfig;
}

public <T extends StatefulConnection<String, String>> GenericObjectPool<T> pool(Supplier<T> supplier) {
return ConnectionPoolSupport.createGenericObjectPool(supplier, configure(new GenericObjectPoolConfig<>()));
}

@SuppressWarnings("rawtypes")
private <T extends GenericObjectPoolConfig> T configure(T poolConfig) {
poolConfig.setMaxTotal(maxTotal);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMinIdle(minIdle);
poolConfig.setMaxWaitMillis(maxWait);
return poolConfig;
}

private RedisURI uri() {
Expand Down Expand Up @@ -145,7 +164,7 @@ private ClientResources resources() {
}

public Pool<Jedis> jedisPool() {
JedisPoolConfig poolConfig = configure(new JedisPoolConfig());
JedisPoolConfig poolConfig = poolOptions.configure(new JedisPoolConfig());
if (sentinelMaster == null) {
String host = endpoints.get(0).getHost();
int port = endpoints.get(0).getPort();
Expand Down Expand Up @@ -219,10 +238,6 @@ private SslOptions sslOptions() {
return builder.build();
}

private <T> GenericObjectPoolConfig<T> poolConfig() {
return configure(new GenericObjectPoolConfig<>());
}

public Object redis() {
if (driver == RedisDriver.jedis) {
return jedisPool().getResource();
Expand All @@ -236,7 +251,10 @@ public Object redis() {
@SuppressWarnings({ "rawtypes", "unchecked" })
public ItemWriter<Map<String, Object>> writer(AbstractRedisItemWriter itemWriter) {
if (itemWriter instanceof AbstractLettuSearchItemWriter) {
return new LettuSearchWriter(rediSearchClient(), poolConfig(), (AbstractLettuSearchItemWriter) itemWriter);
RediSearchClient client = rediSearchClient();
return new LettuceWriter<StatefulRediSearchConnection<String, String>, RediSearchAsyncCommands<String, String>>(
client, client::getResources, poolOptions.pool(client::connect), itemWriter,
StatefulRediSearchConnection::async);
}
if (driver == RedisDriver.jedis) {
if (cluster) {
Expand All @@ -245,18 +263,24 @@ public ItemWriter<Map<String, Object>> writer(AbstractRedisItemWriter itemWriter
return new JedisWriter(jedisPool(), itemWriter);
}
if (cluster) {
return new LettuceClusterWriter(clusterClient(), poolConfig(), itemWriter);
RedisClusterClient client = clusterClient();
return new LettuceWriter<StatefulRedisClusterConnection<String, String>, RedisClusterAsyncCommands<String, String>>(
client, client::getResources, poolOptions.pool(client::connect), itemWriter,
StatefulRedisClusterConnection::async);
}
return new LettuceWriter(client(), poolConfig(), itemWriter);
RedisClient client = client();
return new LettuceWriter<StatefulRedisConnection<String, String>, RedisAsyncCommands<String, String>>(client,
client::getResources, poolOptions.pool(client::connect), itemWriter, StatefulRedisConnection::async);
}

private JedisCluster jedisCluster() {
Set<HostAndPort> hostAndPort = new HashSet<>();
endpoints.forEach(node -> hostAndPort.add(new HostAndPort(node.getHost(), node.getPort())));
JedisPoolConfig poolConfig = poolOptions.configure(new JedisPoolConfig());
if (password == null) {
return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, poolConfig());
return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, poolConfig);
}
return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, password, poolConfig());
return new JedisCluster(hostAndPort, connectTimeout, socketTimeout, maxRedirects, password, poolConfig);
}

}
Loading

0 comments on commit 57f4020

Please sign in to comment.