Skip to content

Commit

Permalink
[pinpoint-apm#11640] Async timeseries command support
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 7, 2024
1 parent 499294e commit 1076fd7
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.3.2.RELEASE</version>
<version>6.5.0.RELEASE</version>
</dependency>

<dependency>
Expand Down
10 changes: 10 additions & 0 deletions redis-timeseries/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring-data.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>com.redis</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.navercorp.pinpoint.redis.timeseries;

import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import io.lettuce.core.RedisFuture;

import java.util.List;

public interface RedisTimeseriesAsyncCommands {


RedisFuture<Long> tsAdd(String key, long timestamp, double value);

RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs addOptions);


/**
* Delete data in the range of fromTimestamp to toTimestamp.
* @param key key
* @param fromTimestamp fromTimestamp
* @param toTimestamp toTimestamp
* @return timestamp
*/
// long tsDel(String key, long fromTimestamp, long toTimestamp);
//
RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp);
//
// TimestampValuePair tsGet(String key);
//
// List<TimestampValuePair> tsRevrange(String key, long fromTimestamp, long toTimestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.navercorp.pinpoint.redis.timeseries;

import com.google.common.base.Preconditions;
import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection;
import com.navercorp.pinpoint.redis.timeseries.connection.CommandDispatcher;
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import com.navercorp.pinpoint.redis.timeseries.protocol.TS;
import io.lettuce.core.CompositeArgument;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.IntegerOutput;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.RedisCommand;

import java.util.List;
import java.util.Objects;

public class RedisTimeseriesAsyncCommandsImpl implements RedisTimeseriesAsyncCommands {

private final RedisCodec<String, String> commandCodec = StringCodec.ASCII;

private final RedisCodec<String, String> outputCodec = StringCodec.UTF8;

private final AsyncConnection<String, String> connection;

public RedisTimeseriesAsyncCommandsImpl(AsyncConnection<String, String> connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

public RedisFuture<Long> tsAdd(String key, long timestamp, double value) {
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0");

return tsAdd(key, timestamp, value, null);
}

public RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs options) {
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0");

CommandArgs<String, String> args = new CommandArgs<>(commandCodec)
.addKey(key)
.add(timestamp)
.addValue(Double.toString(value));

applyOptions(args, options);

final CommandOutput<String, String, Long> output = new IntegerOutput<>(outputCodec);

RedisCommand<String, String, Long> command = new Command<>(TS.ADD, output, args);

return commands().dispatch(command);
}

private CommandDispatcher<String, String> commands() {
return connection.async();
}

private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) {
if (options != null) {
options.build(args);
}
}

@Override
public RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp) {
Preconditions.checkArgument(fromTimestamp >= 0, "fromTimestamp must be greater than or equal to 0");
Preconditions.checkArgument(toTimestamp >= 0, "toTimestamp must be greater than or equal to 0");

CommandArgs<String, String> args = new CommandArgs<>(commandCodec)
.addKey(key)
.add(fromTimestamp)
.add(toTimestamp);
ArrayTimestampValueOutput<String, String> output = new ArrayTimestampValueOutput<>(outputCodec);

RedisCommand<String, String, List<TimestampValuePair>> command = new Command<>(TS.RANGE, output, args);
return commands().dispatch(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import java.util.List;

public interface RedisTimeseriesCommands extends AutoCloseable {
public interface RedisTimeseriesCommands {


long tsAdd(String key, long timestamp, double value);
Expand All @@ -26,6 +26,4 @@ public interface RedisTimeseriesCommands extends AutoCloseable {
TimestampValuePair tsGet(String key);

List<TimestampValuePair> tsRevrange(String key, long fromTimestamp, long toTimestamp);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import com.navercorp.pinpoint.redis.timeseries.protocol.TS;
import io.lettuce.core.CompositeArgument;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.RedisCodec;
Expand All @@ -14,16 +15,18 @@
import java.util.List;
import java.util.Objects;

public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands, AutoCloseable {
public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands {

private final RedisCodec<String, String> commandCodec = StringCodec.ASCII;

private final RedisCodec<String, String> outputCodec = StringCodec.UTF8;

private final StatefulRedisConnection<String, String> connection;

public RedisTimeseriesCommandsImpl(StatefulRedisConnection<String, String> connection) {
this.connection = Objects.requireNonNull(connection, "connect");
private StatefulRedisConnection<String, String> connection;

public RedisTimeseriesCommandsImpl(RedisClient client) {
Objects.requireNonNull(client, "client");
this.connection = client.connect();
}

@Override
Expand Down Expand Up @@ -61,7 +64,57 @@ public long tsAdd(String key, long timestamp, double value, TsAddArgs options) {
return commands.dispatch(TS.ADD, new IntegerOutput<>(outputCodec), args);
}

private void applyOptions(CommandArgs<String, String> args, CompositeArgument options) {


// public RedisFuture<Long> tsAddExecute(String key, long timestamp, double value, TsAddArgs options) {
// Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0");
//
// CommandArgs<byte[], byte[]> args = new CommandArgs<>(byteArrayCodec)
// .addKey(key.getBytes(StandardCharsets.UTF_8))
// .add(timestamp)
// .add(value);
//
// applyOptions(args, options);
//
// LettuceConnection lettuce = getConnection();
//
// final CommandOutput<byte[], byte[], Long> stringStringIntegerOutput = new IntegerOutput<>(byteArrayCodec);
//
//// lettuce.invoke().just(t -> {
//// return new LettuceInvoker.ConnectionFunction0<Long>() {
//// @Override
//// public RedisFuture<Long> apply(RedisClusterAsyncCommands<byte[], byte[]> connection) {
//// RedisFuture<Long> dispatch = t.dispatch(TS.ADD, stringStringIntegerOutput, args);
//// return dispatch;
//// }
//// };
//// });
// LettuceInvoker.SingleInvocationSpec<Long> from = lettuce.invoke().from(t -> {
// return t.dispatch(TS.ADD, stringStringIntegerOutput, args);
// });
// Long l = from.get(Converters.identityConverter());
//
// return null;
// }

// public <T> T doBatchExecute(RedisCallback<T> action) {
// LettuceConnection connection = getConnection();
//
// connection.openPipeline();
// try {
// action.doInRedis(connection);
// } finally {
// connection.closePipeline();
// // RedisConnectionUtils.releaseConnection(connection, factory);
// }
// return null;
// }

// private LettuceConnection getConnection() {
// return (LettuceConnection) RedisConnectionUtils.getConnection(factory, false);
// }

private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) {
if (options != null) {
options.build(args);
}
Expand Down Expand Up @@ -102,8 +155,4 @@ private List<TimestampValuePair> tsRangeCommand(TS command, String key, long fro
}


@Override
public void close() {
connection.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

public interface AsyncConnection<K, V> extends AutoCloseable {
CommandDispatcher<K, V> async();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;

import java.util.Collection;
import java.util.List;
import java.util.Objects;

public class ClusterAsyncConnection<K, V> implements AsyncConnection<K, V> {
private final StatefulRedisClusterConnection<K, V> connection;

public ClusterAsyncConnection(StatefulRedisClusterConnection<K, V> connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

@Override
public CommandDispatcher<K, V> async() {
return new CommandDispatcher<>() {
@Override
public <T> RedisFuture<T> dispatch(RedisCommand<K, V, T> command) {
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(command);

RedisCommand<K, V, T> result = connection.dispatch(asyncCommand);

return (AsyncCommand<K, V, T>) result;
}

@Override
public <T> Collection<RedisFuture<T>> dispatch(Collection<RedisCommand<K, V, T>> command) {
List<AsyncCommand<K, V, T>> async = command.stream().map(AsyncCommand::new).toList();

Collection<RedisCommand<K, V, ?>> dispatch = connection.dispatch(async);

return dispatch.stream().map(c -> (RedisFuture<T>) c).toList();
}
};
}

private RedisClusterAsyncCommands<K, V> commands() {
// pipelining
// connection.setAutoFlushCommands(false);
return connection.async();
}

@Override
public void close() {
connection.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.protocol.RedisCommand;

import java.util.Collection;

public interface CommandDispatcher<K, V> {

<T> RedisFuture<T> dispatch(RedisCommand<K, V, T> command);

<T> Collection<RedisFuture<T>> dispatch(Collection<RedisCommand<K, V, T>> commands);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

public interface ConnectionFactory<K, V> {
AsyncConnection<K, V> getConnection();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;

import java.util.Collection;
import java.util.List;
import java.util.Objects;

public class SimpleAsyncConnection<K, V> implements AsyncConnection<K, V> {
private final StatefulRedisConnection<K, V> connection;

public SimpleAsyncConnection(StatefulRedisConnection<K, V> connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

public CommandDispatcher<K, V> async() {
return new CommandDispatcher<>() {
@Override
public <T> RedisFuture<T> dispatch(RedisCommand<K, V, T> command) {
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(command);
RedisCommand<K, V, T> result = connection.dispatch(asyncCommand);
return (AsyncCommand<K, V, T>) result;
}

@Override
public <T> Collection<RedisFuture<T>> dispatch(Collection<RedisCommand<K, V, T>> command) {
List<AsyncCommand<K, V, T>> async = command.stream().map(AsyncCommand::new).toList();

Collection<RedisCommand<K, V, ?>> dispatch = connection.dispatch(async);

return dispatch.stream().map(c -> (RedisFuture<T>) c).toList();
}
};
}

private RedisClusterAsyncCommands<K, V> commands() {
// pipelining
// connection.setAutoFlushCommands(false);
return connection.async();
}

@Override
public void close() {
connection.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class RedisServer {

@SuppressWarnings("resource")
public static RedisContainer newRedisServer() {
return new RedisContainer(DockerImageName.parse("redis:8.0-M01"))
return new RedisContainer(DockerImageName.parse("redis:8.0-M02"))
.withExposedPorts(6379);
}
}
Loading

0 comments on commit 1076fd7

Please sign in to comment.