Skip to content

Commit

Permalink
fix: Added missing --source-client option
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 26, 2024
1 parent 37d7b4b commit 9e63d3d
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
public abstract class AbstractRedisCommand extends AbstractJobCommand {

@ArgGroup(exclusive = false)
private SimpleRedisArgs redisArgs = new SimpleRedisArgs();
private RedisArgs redisArgs = new RedisArgs();

private RedisContext redisContext;

@Override
protected void execute() throws Exception {
redisContext = redisArgs.redisContext();
redisContext = RedisContext.create(redisArgs.redisURI(), redisArgs.isCluster(), redisArgs.getProtocolVersion(),
redisArgs.getSslArgs());
try {
super.execute();
} finally {
Expand All @@ -31,11 +32,11 @@ protected void configure(RedisItemWriter<?, ?, ?> writer) {
redisContext.configure(writer);
}

public SimpleRedisArgs getRedisArgs() {
public RedisArgs getRedisArgs() {
return redisArgs;
}

public void setRedisArgs(SimpleRedisArgs clientArgs) {
public void setRedisArgs(RedisArgs clientArgs) {
this.redisArgs = clientArgs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public abstract class AbstractRedisExportCommand extends AbstractExportCommand {

@ArgGroup(exclusive = false)
private SimpleRedisArgs redisArgs = new SimpleRedisArgs();
private RedisArgs redisArgs = new RedisArgs();

@Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE;
Expand All @@ -28,7 +28,8 @@ public abstract class AbstractRedisExportCommand extends AbstractExportCommand {

@Override
protected RedisContext sourceRedisContext() {
return redisArgs.redisContext();
return RedisContext.create(redisArgs.redisURI(), redisArgs.isCluster(), redisArgs.getProtocolVersion(),
redisArgs.getSslArgs());
}

@Override
Expand All @@ -46,11 +47,11 @@ protected ItemProcessor<KeyValue<String, Object>, Map<String, Object>> mapProces
return new FunctionItemProcessor<>(mapFunction);
}

public SimpleRedisArgs getRedisArgs() {
public RedisArgs getRedisArgs() {
return redisArgs;
}

public void setRedisArgs(SimpleRedisArgs clientArgs) {
public void setRedisArgs(RedisArgs clientArgs) {
this.redisArgs = clientArgs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public abstract class AbstractRedisImportCommand extends AbstractImportCommand {

@ArgGroup(exclusive = false)
private SimpleRedisArgs redisArgs = new SimpleRedisArgs();
private RedisArgs redisArgs = new RedisArgs();

@Option(names = "--pool", description = "Max number of Redis connections in pool (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private int poolSize = RedisItemWriter.DEFAULT_POOL_SIZE;
Expand All @@ -22,14 +22,15 @@ protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {

@Override
protected RedisContext targetRedisContext() {
return redisArgs.redisContext();
return RedisContext.create(redisArgs.redisURI(), redisArgs.isCluster(), redisArgs.getProtocolVersion(),
redisArgs.getSslArgs());
}

public SimpleRedisArgs getRedisArgs() {
public RedisArgs getRedisArgs() {
return redisArgs;
}

public void setRedisArgs(SimpleRedisArgs clientArgs) {
public void setRedisArgs(RedisArgs clientArgs) {
this.redisArgs = clientArgs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ protected void execute() throws Exception {
@Override
protected RedisContext sourceRedisContext() {
log.info("Creating source Redis context with {} {} {}", sourceRedisUri, sourceRedisArgs, sslArgs);
return sourceRedisArgs.redisContext(sourceRedisUri, sslArgs);
return RedisContext.create(sourceRedisArgs.redisURI(sourceRedisUri), sourceRedisArgs.isCluster(),
sourceRedisArgs.getProtocolVersion(), sslArgs);
}

private RedisContext targetRedisContext() {
log.info("Creating target Redis context with {} {} {}", targetRedisUri, targetRedisArgs, sslArgs);
return targetRedisArgs.redisContext(targetRedisUri, sslArgs);
return RedisContext.create(targetRedisArgs.redisURI(targetRedisUri), targetRedisArgs.isCluster(),
targetRedisArgs.getProtocolVersion(), sslArgs);
}

@Override
Expand Down
228 changes: 148 additions & 80 deletions plugins/riot/src/main/java/com/redis/riot/RedisArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,131 +2,199 @@

import java.time.Duration;

import com.redis.lettucemod.RedisModulesClientBuilder;
import com.redis.lettucemod.RedisURIBuilder;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.RiotVersion;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ClientOptions.Builder;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslVerifyMode;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.protocol.ProtocolVersion;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;

public interface RedisArgs {
public class RedisArgs {

boolean DEFAULT_CLUSTER = false;
String DEFAULT_HOST = RedisURIBuilder.DEFAULT_HOST;
int DEFAULT_PORT = RedisURIBuilder.DEFAULT_PORT;
long DEFAULT_TIMEOUT = RedisURIBuilder.DEFAULT_TIMEOUT;
String DEFAULT_CLIENT_NAME = RiotVersion.riotVersion();
ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.RESP2;
int DEFAULT_DATABASE = 0;
boolean DEFAULT_INSECURE = false;
boolean DEFAULT_TLS = false;
@Option(names = { "-u", "--uri" }, description = "Server URI.", paramLabel = "<uri>")
private RedisURI uri;

default RedisURI getUri() {
return null;
@Option(names = { "-h",
"--host" }, description = "Server hostname (default: ${DEFAULT-VALUE}).", paramLabel = "<host>")
private String host = RedisURIBuilder.DEFAULT_HOST;

@Option(names = { "-p", "--port" }, description = "Server port (default: ${DEFAULT-VALUE}).", paramLabel = "<port>")
private int port = RedisURIBuilder.DEFAULT_PORT;

@Option(names = { "-s",
"--socket" }, description = "Server socket (overrides hostname and port).", paramLabel = "<socket>")
private String socket;

@Option(names = "--user", description = "ACL style 'AUTH username pass'. Needs password.", paramLabel = "<name>")
private String username;

@Option(names = { "-a",
"--pass" }, arity = "0..1", interactive = true, description = "Password to use when connecting to the server.", paramLabel = "<password>")
private char[] password;

@Option(names = "--timeout", description = "Redis command timeout in seconds (default: ${DEFAULT-VALUE}).", paramLabel = "<sec>")
private long timeout = RedisURIBuilder.DEFAULT_TIMEOUT;

@Option(names = { "-n", "--db" }, description = "Database number.", paramLabel = "<db>")
private int database;

@Option(names = "--tls", description = "Establish a secure TLS connection.")
private boolean tls;

@Option(names = "--insecure", description = "Allow insecure TLS connection by skipping cert validation.")
private boolean insecure;

@Option(names = "--client", description = "Client name used to connect to Redis (default: ${DEFAULT-VALUE}).", paramLabel = "<name>")
private String clientName = RiotVersion.riotVersion();

@Option(names = { "-c", "--cluster" }, description = "Enable cluster mode.")
private boolean cluster;

@Option(names = "--resp", description = "Redis protocol version used to connect to Redis: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<ver>")
private ProtocolVersion protocolVersion = ProtocolVersion.RESP2;

@ArgGroup(exclusive = false, heading = "TLS options%n")
private SslArgs sslArgs = new SslArgs();

public RedisURI redisURI() {
RedisURIBuilder builder = new RedisURIBuilder();
builder.clientName(clientName);
builder.database(database);
builder.host(host);
builder.password(password);
builder.port(port);
builder.socket(socket);
builder.timeout(Duration.ofSeconds(timeout));
builder.tls(tls);
builder.uri(uri);
builder.username(username);
if (insecure) {
builder.verifyMode(SslVerifyMode.NONE);
}
return builder.build();
}

default boolean isCluster() {
return DEFAULT_CLUSTER;
public boolean isCluster() {
return cluster;
}

default ProtocolVersion getProtocolVersion() {
return DEFAULT_PROTOCOL_VERSION;
public void setCluster(boolean cluster) {
this.cluster = cluster;
}

default SslArgs getSslArgs() {
return null;
public ProtocolVersion getProtocolVersion() {
return protocolVersion;
}

default String getHost() {
return DEFAULT_HOST;
public void setProtocolVersion(ProtocolVersion version) {
this.protocolVersion = version;
}

default int getPort() {
return DEFAULT_PORT;
public SslArgs getSslArgs() {
return sslArgs;
}

default String getSocket() {
return null;
public void setSslArgs(SslArgs sslArgs) {
this.sslArgs = sslArgs;
}

default String getUsername() {
return null;
public RedisURI getUri() {
return uri;
}

default char[] getPassword() {
return null;
public void setUri(RedisURI uri) {
this.uri = uri;
}

/**
*
* @return timeout duration in seconds
*/
default long getTimeout() {
return DEFAULT_TIMEOUT;
public String getHost() {
return host;
}

default int getDatabase() {
return DEFAULT_DATABASE;
public void setHost(String host) {
this.host = host;
}

default boolean isTls() {
return DEFAULT_TLS;
public int getPort() {
return port;
}

default boolean isInsecure() {
return DEFAULT_INSECURE;
public void setPort(int port) {
this.port = port;
}

default String getClientName() {
return DEFAULT_CLIENT_NAME;
public String getSocket() {
return socket;
}

default RedisURI redisURI() {
return redisURI(getUri());
public void setSocket(String socket) {
this.socket = socket;
}

default RedisURI redisURI(RedisURI uri) {
RedisURIBuilder builder = new RedisURIBuilder();
builder.clientName(getClientName());
builder.database(getDatabase());
builder.host(getHost());
builder.password(getPassword());
builder.port(getPort());
builder.socket(getSocket());
builder.timeout(Duration.ofSeconds(getTimeout()));
builder.tls(isTls());
builder.uri(uri);
builder.username(getUsername());
if (isInsecure()) {
builder.verifyMode(SslVerifyMode.NONE);
}
return builder.build();
public String getUsername() {
return username;
}

default RedisContext redisContext() {
return redisContext(getUri());
public void setUsername(String username) {
this.username = username;
}

default RedisContext redisContext(RedisURI uri) {
return redisContext(uri, getSslArgs());
public char[] getPassword() {
return password;
}

default RedisContext redisContext(RedisURI uri, SslArgs sslArgs) {
RedisURI finalUri = redisURI(uri);
RedisModulesClientBuilder clientBuilder = new RedisModulesClientBuilder();
clientBuilder.cluster(isCluster());
Builder options = isCluster() ? ClusterClientOptions.builder() : ClientOptions.builder();
options.protocolVersion(getProtocolVersion());
if (sslArgs != null) {
options.sslOptions(sslArgs.sslOptions());
}
clientBuilder.clientOptions(options.build());
clientBuilder.uri(finalUri);
return new RedisContext(finalUri, clientBuilder.build());
public void setPassword(char[] password) {
this.password = password;
}

public long getTimeout() {
return timeout;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

public int getDatabase() {
return database;
}

public void setDatabase(int database) {
this.database = database;
}

public boolean isTls() {
return tls;
}

public void setTls(boolean tls) {
this.tls = tls;
}

public boolean isInsecure() {
return insecure;
}

public void setInsecure(boolean insecure) {
this.insecure = insecure;
}

public String getClientName() {
return clientName;
}

public void setClientName(String clientName) {
this.clientName = clientName;
}

@Override
public String toString() {
return "SimpleRedisArgs [uri=" + uri + ", host=" + host + ", port=" + port + ", socket=" + socket
+ ", username=" + username + ", password=" + RiotUtils.mask(password) + ", timeout=" + timeout
+ ", database=" + database + ", tls=" + tls + ", insecure=" + insecure + ", clientName=" + clientName
+ ", cluster=" + cluster + ", protocolVersion=" + protocolVersion + ", sslArgs=" + sslArgs + "]";
}

}
Loading

0 comments on commit 9e63d3d

Please sign in to comment.