From 34efe38c6dfa6119cab45c88882f83e29f0b9c07 Mon Sep 17 00:00:00 2001 From: emeroad Date: Tue, 5 Nov 2024 10:57:35 +0900 Subject: [PATCH] [#11640] Async timeseries command support --- pom.xml | 2 +- redis-timeseries/pom.xml | 5 + .../RedisTimeseriesAsyncCommands.java | 31 ++++ .../RedisTimeseriesAsyncCommandsImpl.java | 83 +++++++++++ .../timeseries/RedisTimeseriesCommands.java | 4 +- .../RedisTimeseriesCommandsImpl.java | 20 +-- .../timeseries/TimeseriesCommandBuilder.java | 84 +++++++++++ .../connection/AsyncConnection.java | 10 ++ .../connection/AsyncDispatcher.java | 51 +++++++ .../connection/ClusterAsyncConnection.java | 38 +++++ .../connection/ConnectionFactory.java | 5 + .../timeseries/connection/Dispatcher.java | 17 +++ .../connection/SimpleAsyncConnection.java | 32 ++++ .../timeseries/connection/SyncDispatcher.java | 18 +++ .../timeseries/model/TimestampValuePair.java | 2 +- .../redis/timeseries/RedisServer.java | 4 +- .../RedisTimeseriesAsyncCommandsTest.java | 139 ++++++++++++++++++ .../RedisTimeseriesCommandsTest.java | 42 ++++-- .../src/test/resources/log4j2-test.xml | 2 +- .../redis/pubsub/RedisPubChannelProvider.java | 3 + 20 files changed, 560 insertions(+), 32 deletions(-) create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommands.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncConnection.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ClusterAsyncConnection.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ConnectionFactory.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/Dispatcher.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SimpleAsyncConnection.java create mode 100644 redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SyncDispatcher.java create mode 100644 redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsTest.java diff --git a/pom.xml b/pom.xml index efa9b78a5efd..2f088ec6d89d 100644 --- a/pom.xml +++ b/pom.xml @@ -1652,7 +1652,7 @@ io.lettuce lettuce-core - 6.3.2.RELEASE + 6.5.0.RELEASE diff --git a/redis-timeseries/pom.xml b/redis-timeseries/pom.xml index d0cef42e9925..2d6d47e7366a 100644 --- a/redis-timeseries/pom.xml +++ b/redis-timeseries/pom.xml @@ -43,6 +43,11 @@ org.apache.logging.log4j log4j-slf4j2-impl + + org.springframework.data + spring-data-redis + ${spring-data.version} + com.redis diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommands.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommands.java new file mode 100644 index 000000000000..99b19f536919 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommands.java @@ -0,0 +1,31 @@ +package com.navercorp.pinpoint.redis.timeseries; + +import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; +import io.lettuce.core.RedisFuture; + +import java.util.List; + +public interface RedisTimeseriesAsyncCommands { + + + RedisFuture tsAdd(String key, long timestamp, double value); + + RedisFuture tsAdd(String key, long timestamp, double value, TsAddArgs addOptions); + + + /** + * Delete data in the range of fromTimestamp to toTimestamp. + * @param key key + * @param fromTimestamp fromTimestamp + * @param toTimestamp toTimestamp + * @return timestamp + */ + RedisFuture tsDel(String key, long fromTimestamp, long toTimestamp); + + RedisFuture> tsRange(String key, long fromTimestamp, long toTimestamp); +// + RedisFuture tsGet(String key); +// + RedisFuture> tsRevrange(String key, long fromTimestamp, long toTimestamp); + +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java new file mode 100644 index 000000000000..434ff6933bad --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java @@ -0,0 +1,83 @@ +package com.navercorp.pinpoint.redis.timeseries; + +import com.google.common.base.Preconditions; +import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection; +import com.navercorp.pinpoint.redis.timeseries.connection.Dispatcher; +import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; +import io.lettuce.core.CompositeArgument; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandArgs; + +import java.util.List; +import java.util.Objects; + +public class RedisTimeseriesAsyncCommandsImpl implements RedisTimeseriesAsyncCommands { + + private final RedisCodec commandCodec = StringCodec.ASCII; + + private final RedisCodec outputCodec = StringCodec.UTF8; + + private final AsyncConnection connection; + + private final TimeseriesCommandBuilder builder = new TimeseriesCommandBuilder<>(commandCodec); + + public RedisTimeseriesAsyncCommandsImpl(AsyncConnection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + } + + private void applyOptions(CommandArgs args, CompositeArgument options) { + if (options != null) { + options.build(args); + } + } + + @Override + public RedisFuture tsAdd(String key, long timestamp, double value) { + Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0"); + + return tsAdd(key, timestamp, value, null); + } + + public RedisFuture tsAdd(String key, long timestamp, double value, TsAddArgs options) { + Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0"); + + Command command = this.builder.tsAdd(key, timestamp, value, options); + + return commands().dispatch(command); + } + + + private Dispatcher commands() { + return connection.dispatcher(); + } + + @Override + public RedisFuture> tsRange(String key, long fromTimestamp, long toTimestamp) { + Preconditions.checkArgument(fromTimestamp >= 0, "fromTimestamp must be greater than or equal to 0"); + Preconditions.checkArgument(toTimestamp >= 0, "toTimestamp must be greater than or equal to 0"); + + Command> cmd = this.builder.tsRange(key, fromTimestamp, toTimestamp); + return commands().dispatch(cmd); + } + + @Override + public RedisFuture tsGet(String key) { + Command cmd = this.builder.toGet(key); + return commands().dispatch(cmd); + } + + @Override + public RedisFuture tsDel(String key, long fromTimestamp, long toTimestamp) { + Command cmd = this.builder.toDel(key, fromTimestamp, toTimestamp); + return commands().dispatch(cmd); + } + + @Override + public RedisFuture> tsRevrange(String key, long fromTimestamp, long toTimestamp) { + Command> cmd = this.builder.tsRevrange(key, fromTimestamp, toTimestamp); + return commands().dispatch(cmd); + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommands.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommands.java index 884c666cdf01..fd4be8aabd4e 100644 --- a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommands.java +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommands.java @@ -4,7 +4,7 @@ import java.util.List; -public interface RedisTimeseriesCommands extends AutoCloseable { +public interface RedisTimeseriesCommands { long tsAdd(String key, long timestamp, double value); @@ -26,6 +26,4 @@ public interface RedisTimeseriesCommands extends AutoCloseable { TimestampValuePair tsGet(String key); List tsRevrange(String key, long fromTimestamp, long toTimestamp); - - void close(); } diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsImpl.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsImpl.java index 1728dd6c77a9..f56c28e35a35 100644 --- a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsImpl.java +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsImpl.java @@ -4,6 +4,7 @@ import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; import com.navercorp.pinpoint.redis.timeseries.protocol.TS; import io.lettuce.core.CompositeArgument; +import io.lettuce.core.RedisClient; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.codec.RedisCodec; @@ -14,16 +15,19 @@ import java.util.List; import java.util.Objects; -public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands, AutoCloseable { +public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands { private final RedisCodec commandCodec = StringCodec.ASCII; private final RedisCodec outputCodec = StringCodec.UTF8; + private final TimeseriesCommandBuilder command = new TimeseriesCommandBuilder(commandCodec); + private final StatefulRedisConnection connection; - public RedisTimeseriesCommandsImpl(StatefulRedisConnection connection) { - this.connection = Objects.requireNonNull(connection, "connect"); + public RedisTimeseriesCommandsImpl(RedisClient client) { + Objects.requireNonNull(client, "client"); + this.connection = client.connect(); } @Override @@ -41,7 +45,6 @@ public long tsDel(String key, long fromTimestamp, long toTimestamp) { .add(fromTimestamp) .add(toTimestamp); - RedisCommands commands = connection.sync(); return commands.dispatch(TS.DEL, new IntegerOutput<>(outputCodec), args); } @@ -61,7 +64,7 @@ public long tsAdd(String key, long timestamp, double value, TsAddArgs options) { return commands.dispatch(TS.ADD, new IntegerOutput<>(outputCodec), args); } - private void applyOptions(CommandArgs args, CompositeArgument options) { + private void applyOptions(CommandArgs args, CompositeArgument options) { if (options != null) { options.build(args); } @@ -74,11 +77,8 @@ public List tsRange(String key, long fromTimestamp, long toT @Override public TimestampValuePair tsGet(String key) { - CommandArgs args = new CommandArgs<>(commandCodec) .addKey(key); - - RedisCommands commands = connection.sync(); return commands.dispatch(TS.GET, new MetricOutput<>(outputCodec), args); } @@ -102,8 +102,4 @@ private List tsRangeCommand(TS command, String key, long fro } - @Override - public void close() { - connection.close(); - } } diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java new file mode 100644 index 000000000000..0bb8ed04423e --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java @@ -0,0 +1,84 @@ +package com.navercorp.pinpoint.redis.timeseries; + +import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; +import com.navercorp.pinpoint.redis.timeseries.protocol.TS; +import io.lettuce.core.CompositeArgument; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.output.IntegerOutput; +import io.lettuce.core.protocol.BaseRedisCommandBuilder; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandArgs; + +import java.util.List; + +public class TimeseriesCommandBuilder extends BaseRedisCommandBuilder { + + public TimeseriesCommandBuilder(RedisCodec codec) { + super(codec); + } + + public Command tsAdd(K key, long timestamp, double value, TsAddArgs options) { + CommandArgs args = new CommandArgs<>(codec) + .addKey(key) + .add(timestamp) + .add(value); + + applyOptions(args, options); + + CommandOutput output = new IntegerOutput<>(codec); + + return tsCommand(TS.ADD, output, args); + } + + private Command tsCommand(TS ts, CommandOutput output, CommandArgs args) { + return new Command<>(ts, output, args); + } + + + private void applyOptions(CommandArgs args, CompositeArgument options) { + if (options != null) { + options.build(args); + } + } + + public Command> tsGet(K key) { + CommandArgs args = new CommandArgs<>(codec) + .addKey(key); + ArrayTimestampValueOutput output = new ArrayTimestampValueOutput<>(codec); + return tsCommand(TS.GET, output, args); + } + + + public Command> tsRange(K key, long fromTimestamp, long toTimestamp) { + return tsRange(TS.RANGE, key, fromTimestamp, toTimestamp); + } + + public Command> tsRevrange(K key, long fromTimestamp, long toTimestamp) { + return tsRange(TS.REVRANGE, key, fromTimestamp, toTimestamp); + } + + public Command> tsRange(TS ts, K key, long fromTimestamp, long toTimestamp) { + CommandArgs args = new CommandArgs<>(codec) + .addKey(key) + .add(fromTimestamp) + .add(toTimestamp); + ArrayTimestampValueOutput output = new ArrayTimestampValueOutput<>(codec); + return tsCommand(ts, output, args); + } + + public Command toDel(K key, long fromTimestamp, long toTimestamp) { + CommandArgs args = new CommandArgs<>(codec) + .addKey(key) + .add(fromTimestamp) + .add(toTimestamp); + + return tsCommand(TS.DEL, new IntegerOutput<>(codec), args); + } + + public Command toGet(K key) { + CommandArgs args = new CommandArgs<>(codec) + .addKey(key); + return tsCommand(TS.GET, new MetricOutput<>(codec), args); + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncConnection.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncConnection.java new file mode 100644 index 000000000000..10fe3e798492 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncConnection.java @@ -0,0 +1,10 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +public interface AsyncConnection extends AutoCloseable { + Dispatcher dispatcher(); + + SyncDispatcher sync(); + + @Override + void close(); +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java new file mode 100644 index 000000000000..45724681ebfb --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java @@ -0,0 +1,51 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.protocol.RedisCommand; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +public class AsyncDispatcher implements Dispatcher { + + private final StatefulConnection connection; + + public AsyncDispatcher(StatefulConnection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + } + + @Override + public RedisFuture dispatch(RedisCommand command) { + AsyncCommand asyncCommand = wrapAsync(command); + + RedisCommand result = connection.dispatch(asyncCommand); + + return (AsyncCommand) result; + } + + private AsyncCommand wrapAsync(RedisCommand command) { + return new AsyncCommand<>(command); + } + + @Override + public Collection> dispatch(Collection> command) { + List> async = wrapAsyncCommands(command); + + Collection> result = connection.dispatch(async); + +// List> list1 = result.stream().map(c -> (RedisFuture) c).toList(); + return (Collection>) (Collection) result; + } + + private List> wrapAsyncCommands(Collection> command) { + List> result = new ArrayList<>(command.size()); + for (RedisCommand redisCommand : command) { + result.add(wrapAsync(redisCommand)); + } + return result; + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ClusterAsyncConnection.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ClusterAsyncConnection.java new file mode 100644 index 000000000000..faabb8ba76c6 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ClusterAsyncConnection.java @@ -0,0 +1,38 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; + +import java.util.Objects; + +public class ClusterAsyncConnection implements AsyncConnection { + private final StatefulRedisClusterConnection connection; + private final AsyncDispatcher dispatcher; + private final SyncDispatcher sync; + + public ClusterAsyncConnection(StatefulRedisClusterConnection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + this.dispatcher = new AsyncDispatcher<>(connection); + this.sync = new SyncDispatcher<>(connection.sync()); + } + + @Override + public Dispatcher dispatcher() { + return dispatcher; + } + + public SyncDispatcher sync() { + return sync; + } + + RedisClusterAsyncCommands commands() { + // pipelining + // connection.setAutoFlushCommands(false); + return connection.async(); + } + + @Override + public void close() { + connection.close(); + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ConnectionFactory.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ConnectionFactory.java new file mode 100644 index 000000000000..fc111c6fb69d --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/ConnectionFactory.java @@ -0,0 +1,5 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +public interface ConnectionFactory { + AsyncConnection getConnection(); +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/Dispatcher.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/Dispatcher.java new file mode 100644 index 000000000000..594ed22e5003 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/Dispatcher.java @@ -0,0 +1,17 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +import io.lettuce.core.RedisFuture; +import io.lettuce.core.protocol.RedisCommand; + +import java.util.Collection; + +public interface Dispatcher { + + RedisFuture dispatch(RedisCommand command); + + Collection> dispatch(Collection> commands); + +// default Collection> dispatch(Collection> commands) { +// return dispatch(commands); +// } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SimpleAsyncConnection.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SimpleAsyncConnection.java new file mode 100644 index 000000000000..0d62be66af68 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SimpleAsyncConnection.java @@ -0,0 +1,32 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +import io.lettuce.core.api.StatefulRedisConnection; + +import java.util.Objects; + +public class SimpleAsyncConnection implements AsyncConnection { + private final StatefulRedisConnection connection; + private final AsyncDispatcher dispatcher; + private final SyncDispatcher sync; + + public SimpleAsyncConnection(StatefulRedisConnection connection) { + this.connection = Objects.requireNonNull(connection, "connection"); + this.dispatcher = new AsyncDispatcher<>(connection); + this.sync = new SyncDispatcher<>(connection.sync()); + } + + public Dispatcher dispatcher() { + return dispatcher; + } + + public SyncDispatcher sync() { + // pipelining + // connection.setAutoFlushCommands(false); + return sync; + } + + @Override + public void close() { + connection.close(); + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SyncDispatcher.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SyncDispatcher.java new file mode 100644 index 000000000000..c2679f87c938 --- /dev/null +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/SyncDispatcher.java @@ -0,0 +1,18 @@ +package com.navercorp.pinpoint.redis.timeseries.connection; + +import io.lettuce.core.cluster.api.sync.RedisClusterCommands; +import io.lettuce.core.protocol.Command; + +import java.util.Objects; + +public class SyncDispatcher { + private final RedisClusterCommands command; + + public SyncDispatcher(RedisClusterCommands command) { + this.command = Objects.requireNonNull(command, "command"); + } + + public T dispatch(Command cmd) { + return command.dispatch(cmd.getType(), cmd.getOutput(), cmd.getArgs()); + } +} diff --git a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/model/TimestampValuePair.java b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/model/TimestampValuePair.java index dd4a4356ed20..a9584fca9f9c 100644 --- a/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/model/TimestampValuePair.java +++ b/redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/model/TimestampValuePair.java @@ -43,7 +43,7 @@ public TimestampValuePair buildAndClear() { @Override public String toString() { - return "TimestampValuePair{" + + return "TimestampValue{" + timestamp + '=' + value + '}'; diff --git a/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisServer.java b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisServer.java index b92661c4aa6b..0e006e04b74f 100644 --- a/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisServer.java +++ b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisServer.java @@ -7,7 +7,7 @@ public class RedisServer { @SuppressWarnings("resource") public static RedisContainer newRedisServer() { - return new RedisContainer(DockerImageName.parse("redis:8.0-M01")) - .withExposedPorts(6379); + RedisContainer container = new RedisContainer(DockerImageName.parse("redis:8.0-M02")); + return container.withExposedPorts(6379); } } diff --git a/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsTest.java b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsTest.java new file mode 100644 index 000000000000..c4d69cdd3ba5 --- /dev/null +++ b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsTest.java @@ -0,0 +1,139 @@ +package com.navercorp.pinpoint.redis.timeseries; + +import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection; +import com.navercorp.pinpoint.redis.timeseries.connection.ClusterAsyncConnection; +import com.navercorp.pinpoint.redis.timeseries.connection.SimpleAsyncConnection; +import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; +import com.navercorp.pinpoint.redis.timeseries.protocol.OnDuplicate; +import com.navercorp.pinpoint.redis.timeseries.protocol.Retention; +import com.redis.testcontainers.RedisContainer; +import io.lettuce.core.LettuceFutures; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class RedisTimeseriesAsyncCommandsTest { + + @AutoClose + private static RedisContainer server; + + private final Logger logger = LogManager.getLogger(this.getClass()); + + static RedisClient client; + static RedisClusterClient clusterClient; + + AsyncConnection connection; + + RedisTimeseriesAsyncCommandsImpl commands; + + @BeforeAll + static void beforeAll() { + Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker not enabled"); + server = RedisServer.newRedisServer(); + server.start(); + RedisURI redisURI = RedisURI.create(server.getRedisURI()); + client = RedisClient.create(redisURI); + +// RedisClusterClient.create(redisURI); +// clusterClient = RedisClusterClient.create(redisURI); +// final RedisClusterNode node = new RedisClusterNode(); +// node.setUri(redisURI); +// node.setSlots(IntStream.range(1, 2).boxed().collect(Collectors.toList())); +// +// final Partitions partitions = new Partitions(); +// partitions.add(node); +// +// ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() +// .enablePeriodicRefresh(10, TimeUnit.MINUTES) +// .build(); +// +// clusterClient.setOptions(ClusterClientOptions.builder() +// .topologyRefreshOptions(topologyRefreshOptions) +// .build()); + } + + @AfterAll + static void afterAll() { + if (client != null) { + client.close(); + } + if (clusterClient != null) { + clusterClient.close(); + } + } + + @AfterEach + void tearDown() { + if (connection != null) { + connection.close(); + } + } + + @BeforeEach + void setUp() { + if (client != null) { + this.connection = new SimpleAsyncConnection<>(client.connect()); + } else { + this.connection = new ClusterAsyncConnection<>(clusterClient.connect()); + } + this.commands = new RedisTimeseriesAsyncCommandsImpl(connection); + } + + + @Test + public void ts_add() throws ExecutionException, InterruptedException { + TsAddArgs options = new TsAddArgs() + .onDuplicate(OnDuplicate.last()) + .retention(Retention.of(3, TimeUnit.SECONDS)); + + RedisFuture f1 = commands.tsAdd("test1", 1000, 1, options); + RedisFuture f2 = commands.tsAdd("test1", 2000, 2, options); + RedisFuture f3 = commands.tsAdd("test1", 1000, 4, options); + Long l1 = LettuceFutures.awaitOrCancel(f1, 100, TimeUnit.MILLISECONDS); + Long l2 = LettuceFutures.awaitOrCancel(f2, 100, TimeUnit.MILLISECONDS); + Long l3 = LettuceFutures.awaitOrCancel(f3, 100, TimeUnit.MILLISECONDS); + Assertions.assertEquals(1000, l1); + Assertions.assertEquals(2000, l2); + Assertions.assertEquals(1000, l3); + } + + + @Test + public void ts_range() throws ExecutionException, InterruptedException { + TsAddArgs options = new TsAddArgs() + .onDuplicate(OnDuplicate.last()) + .retention(Retention.of(3, TimeUnit.SECONDS)); + + RedisFuture f1 = commands.tsAdd("test1", 1000, 1, options); + RedisFuture f2 = commands.tsAdd("test1", 2000, 2, options); + RedisFuture f3 = commands.tsAdd("test1", 3000, 3, options); + + Assertions.assertTrue(LettuceFutures.awaitAll(1000L, TimeUnit.MILLISECONDS, f1, f2, f3)); + + RedisFuture> timeFuture = commands.tsRange("test1", 0, 3000); + List timestampValuePairs = timeFuture.get(); + Assertions.assertEquals(3, timestampValuePairs.size()); + Assertions.assertEquals(1, timestampValuePairs.get(0).value()); + + RedisFuture test1 = commands.tsGet("test1"); + TimestampValuePair pair = LettuceFutures.awaitOrCancel(test1, 1000, TimeUnit.MILLISECONDS); + System.out.println(pair); + } + +} diff --git a/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsTest.java b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsTest.java index 4d375a83091b..52d936afa80c 100644 --- a/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsTest.java +++ b/redis-timeseries/src/test/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesCommandsTest.java @@ -16,17 +16,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.AutoClose; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; import java.util.List; import java.util.concurrent.TimeUnit; public class RedisTimeseriesCommandsTest { - private static RedisContainer server = RedisServer.newRedisServer(); + @AutoClose + private static RedisContainer server; private final Logger logger = LogManager.getLogger(this.getClass()); @@ -36,8 +39,9 @@ public class RedisTimeseriesCommandsTest { @BeforeAll static void beforeAll() { + Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker not enabled"); + server = RedisServer.newRedisServer(); server.start(); - RedisURI redisURI = RedisURI.create(server.getRedisURI()); client = RedisClient.create(redisURI); } @@ -47,27 +51,41 @@ static void afterAll() { if (client != null) { client.shutdown(); } - server.stop(); } @BeforeEach void setUp() { if (client != null) { - commands = new RedisTimeseriesCommandsImpl(client.connect()); + commands = new RedisTimeseriesCommandsImpl(client); } } - @AfterEach - void tearDown() { - if (commands != null) { - commands.close(); - } - } + @Test public void ts_add() { - RedisTimeseriesCommands commands = new RedisTimeseriesCommandsImpl(client.connect()); + + TsAddArgs options = new TsAddArgs() + .onDuplicate(OnDuplicate.last()) + .retention(Retention.of(3, TimeUnit.SECONDS)); + + commands.tsAdd("test1", 1000, 1, options); + commands.tsAdd("test1", 2000, 2, options); + commands.tsAdd("test1", 1000, 4, options); + TimestampValuePair tsGet = commands.tsGet("test1"); + logger.debug("tsGet:{}", tsGet); + + List revResult = commands.tsRevrange("test1", 0, 3000); + logger.debug("rev:{}", revResult); + } + + @Test + public void ts_add2() { + +// RedisTimeseriesCommands commands = new RedisTimeseriesCommandsImpl(client.connect()); +// connection.execute("TS.ADD", null, "test1", "1000", "1", "RETENTION", "); +// connection.getNativeConnection(); TsAddArgs options = new TsAddArgs() .onDuplicate(OnDuplicate.last()) diff --git a/redis-timeseries/src/test/resources/log4j2-test.xml b/redis-timeseries/src/test/resources/log4j2-test.xml index 753dafc93bc2..294a62d94a2e 100644 --- a/redis-timeseries/src/test/resources/log4j2-test.xml +++ b/redis-timeseries/src/test/resources/log4j2-test.xml @@ -26,7 +26,7 @@ - + diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubChannelProvider.java index 441d6c192a6c..87114b73a1ec 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubChannelProvider.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.channel.PubChannel; import com.navercorp.pinpoint.channel.PubChannelProvider; import org.springframework.data.redis.core.ReactiveRedisTemplate; +import reactor.core.publisher.Mono; import java.util.Objects; @@ -34,6 +35,8 @@ class RedisPubChannelProvider implements PubChannelProvider { @Override public PubChannel getPubChannel(String key) { + Mono mono = this.template.opsForList().leftPush(key, "init"); + mono.block(); return new RedisPubChannel(this.template, key); }