Skip to content

Commit

Permalink
[pinpoint-apm#11640] Async timeseries command support
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Nov 25, 2024
1 parent 93e0a08 commit 34efe38
Show file tree
Hide file tree
Showing 20 changed files with 560 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.3.2.RELEASE</version>
<version>6.5.0.RELEASE</version>
</dependency>

<dependency>
Expand Down
5 changes: 5 additions & 0 deletions redis-timeseries/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring-data.version}</version>
</dependency>

<dependency>
<groupId>com.redis</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.navercorp.pinpoint.redis.timeseries;

import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import io.lettuce.core.RedisFuture;

import java.util.List;

public interface RedisTimeseriesAsyncCommands {


RedisFuture<Long> tsAdd(String key, long timestamp, double value);

RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs addOptions);


/**
* Delete data in the range of fromTimestamp to toTimestamp.
* @param key key
* @param fromTimestamp fromTimestamp
* @param toTimestamp toTimestamp
* @return timestamp
*/
RedisFuture<Long> tsDel(String key, long fromTimestamp, long toTimestamp);

RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp);
//
RedisFuture<TimestampValuePair> tsGet(String key);
//
RedisFuture<List<TimestampValuePair>> tsRevrange(String key, long fromTimestamp, long toTimestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.navercorp.pinpoint.redis.timeseries;

import com.google.common.base.Preconditions;
import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection;
import com.navercorp.pinpoint.redis.timeseries.connection.Dispatcher;
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import io.lettuce.core.CompositeArgument;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;

import java.util.List;
import java.util.Objects;

public class RedisTimeseriesAsyncCommandsImpl implements RedisTimeseriesAsyncCommands {

private final RedisCodec<String, String> commandCodec = StringCodec.ASCII;

private final RedisCodec<String, String> outputCodec = StringCodec.UTF8;

private final AsyncConnection<String, String> connection;

private final TimeseriesCommandBuilder<String, String> builder = new TimeseriesCommandBuilder<>(commandCodec);

public RedisTimeseriesAsyncCommandsImpl(AsyncConnection<String, String> connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) {
if (options != null) {
options.build(args);
}
}

@Override
public RedisFuture<Long> tsAdd(String key, long timestamp, double value) {
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0");

return tsAdd(key, timestamp, value, null);
}

public RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs options) {
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0");

Command<String, String, Long> command = this.builder.tsAdd(key, timestamp, value, options);

return commands().dispatch(command);
}


private Dispatcher<String, String> commands() {
return connection.dispatcher();
}

@Override
public RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp) {
Preconditions.checkArgument(fromTimestamp >= 0, "fromTimestamp must be greater than or equal to 0");
Preconditions.checkArgument(toTimestamp >= 0, "toTimestamp must be greater than or equal to 0");

Command<String, String, List<TimestampValuePair>> cmd = this.builder.tsRange(key, fromTimestamp, toTimestamp);
return commands().dispatch(cmd);
}

@Override
public RedisFuture<TimestampValuePair> tsGet(String key) {
Command<String, String, TimestampValuePair> cmd = this.builder.toGet(key);
return commands().dispatch(cmd);
}

@Override
public RedisFuture<Long> tsDel(String key, long fromTimestamp, long toTimestamp) {
Command<String, String, Long> cmd = this.builder.toDel(key, fromTimestamp, toTimestamp);
return commands().dispatch(cmd);
}

@Override
public RedisFuture<List<TimestampValuePair>> tsRevrange(String key, long fromTimestamp, long toTimestamp) {
Command<String, String, List<TimestampValuePair>> cmd = this.builder.tsRevrange(key, fromTimestamp, toTimestamp);
return commands().dispatch(cmd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import java.util.List;

public interface RedisTimeseriesCommands extends AutoCloseable {
public interface RedisTimeseriesCommands {


long tsAdd(String key, long timestamp, double value);
Expand All @@ -26,6 +26,4 @@ public interface RedisTimeseriesCommands extends AutoCloseable {
TimestampValuePair tsGet(String key);

List<TimestampValuePair> tsRevrange(String key, long fromTimestamp, long toTimestamp);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import com.navercorp.pinpoint.redis.timeseries.protocol.TS;
import io.lettuce.core.CompositeArgument;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.RedisCodec;
Expand All @@ -14,16 +15,19 @@
import java.util.List;
import java.util.Objects;

public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands, AutoCloseable {
public class RedisTimeseriesCommandsImpl implements RedisTimeseriesCommands {

private final RedisCodec<String, String> commandCodec = StringCodec.ASCII;

private final RedisCodec<String, String> outputCodec = StringCodec.UTF8;

private final TimeseriesCommandBuilder<String, String> command = new TimeseriesCommandBuilder(commandCodec);

private final StatefulRedisConnection<String, String> connection;

public RedisTimeseriesCommandsImpl(StatefulRedisConnection<String, String> connection) {
this.connection = Objects.requireNonNull(connection, "connect");
public RedisTimeseriesCommandsImpl(RedisClient client) {
Objects.requireNonNull(client, "client");
this.connection = client.connect();
}

@Override
Expand All @@ -41,7 +45,6 @@ public long tsDel(String key, long fromTimestamp, long toTimestamp) {
.add(fromTimestamp)
.add(toTimestamp);


RedisCommands<String, String> commands = connection.sync();
return commands.dispatch(TS.DEL, new IntegerOutput<>(outputCodec), args);
}
Expand All @@ -61,7 +64,7 @@ public long tsAdd(String key, long timestamp, double value, TsAddArgs options) {
return commands.dispatch(TS.ADD, new IntegerOutput<>(outputCodec), args);
}

private void applyOptions(CommandArgs<String, String> args, CompositeArgument options) {
private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) {
if (options != null) {
options.build(args);
}
Expand All @@ -74,11 +77,8 @@ public List<TimestampValuePair> tsRange(String key, long fromTimestamp, long toT

@Override
public TimestampValuePair tsGet(String key) {

CommandArgs<String, String> args = new CommandArgs<>(commandCodec)
.addKey(key);


RedisCommands<String, String> commands = connection.sync();
return commands.dispatch(TS.GET, new MetricOutput<>(outputCodec), args);
}
Expand All @@ -102,8 +102,4 @@ private List<TimestampValuePair> tsRangeCommand(TS command, String key, long fro
}


@Override
public void close() {
connection.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.navercorp.pinpoint.redis.timeseries;

import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair;
import com.navercorp.pinpoint.redis.timeseries.protocol.TS;
import io.lettuce.core.CompositeArgument;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.IntegerOutput;
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;

import java.util.List;

public class TimeseriesCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {

public TimeseriesCommandBuilder(RedisCodec<K, V> codec) {
super(codec);
}

public Command<K, V, Long> tsAdd(K key, long timestamp, double value, TsAddArgs options) {
CommandArgs<K, V> args = new CommandArgs<>(codec)
.addKey(key)
.add(timestamp)
.add(value);

applyOptions(args, options);

CommandOutput<K, V, Long> output = new IntegerOutput<>(codec);

return tsCommand(TS.ADD, output, args);
}

private <T> Command<K, V, T> tsCommand(TS ts, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
return new Command<>(ts, output, args);
}


private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) {
if (options != null) {
options.build(args);
}
}

public Command<K, V, List<TimestampValuePair>> tsGet(K key) {
CommandArgs<K, V> args = new CommandArgs<>(codec)
.addKey(key);
ArrayTimestampValueOutput<K, V> output = new ArrayTimestampValueOutput<>(codec);
return tsCommand(TS.GET, output, args);
}


public Command<K, V, List<TimestampValuePair>> tsRange(K key, long fromTimestamp, long toTimestamp) {
return tsRange(TS.RANGE, key, fromTimestamp, toTimestamp);
}

public Command<K, V, List<TimestampValuePair>> tsRevrange(K key, long fromTimestamp, long toTimestamp) {
return tsRange(TS.REVRANGE, key, fromTimestamp, toTimestamp);
}

public Command<K, V, List<TimestampValuePair>> tsRange(TS ts, K key, long fromTimestamp, long toTimestamp) {
CommandArgs<K, V> args = new CommandArgs<>(codec)
.addKey(key)
.add(fromTimestamp)
.add(toTimestamp);
ArrayTimestampValueOutput<K, V> output = new ArrayTimestampValueOutput<>(codec);
return tsCommand(ts, output, args);
}

public Command<K, V, Long> toDel(K key, long fromTimestamp, long toTimestamp) {
CommandArgs<K, V> args = new CommandArgs<>(codec)
.addKey(key)
.add(fromTimestamp)
.add(toTimestamp);

return tsCommand(TS.DEL, new IntegerOutput<>(codec), args);
}

public Command<K, V, TimestampValuePair> toGet(K key) {
CommandArgs<K, V> args = new CommandArgs<>(codec)
.addKey(key);
return tsCommand(TS.GET, new MetricOutput<>(codec), args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

public interface AsyncConnection<K, V> extends AutoCloseable {
Dispatcher<K, V> dispatcher();

SyncDispatcher<K, V> sync();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.navercorp.pinpoint.redis.timeseries.connection;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

public class AsyncDispatcher<K, V> implements Dispatcher<K, V> {

private final StatefulConnection<K, V> connection;

public AsyncDispatcher(StatefulConnection<K, V> connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

@Override
public <T> RedisFuture<T> dispatch(RedisCommand<K, V, T> command) {
AsyncCommand<K, V, T> asyncCommand = wrapAsync(command);

RedisCommand<K, V, T> result = connection.dispatch(asyncCommand);

return (AsyncCommand<K, V, T>) result;
}

private <T> AsyncCommand<K, V, T> wrapAsync(RedisCommand<K, V, T> command) {
return new AsyncCommand<>(command);
}

@Override
public <T> Collection<RedisFuture<T>> dispatch(Collection<RedisCommand<K, V, T>> command) {
List<AsyncCommand<K, V, T>> async = wrapAsyncCommands(command);

Collection<RedisCommand<K, V, ?>> result = connection.dispatch(async);

// List<? extends RedisFuture<?>> list1 = result.stream().map(c -> (RedisFuture<?>) c).toList();
return (Collection<RedisFuture<T>>) (Collection<?>) result;
}

private <T> List<AsyncCommand<K, V, T>> wrapAsyncCommands(Collection<RedisCommand<K, V, T>> command) {
List<AsyncCommand<K, V, T>> result = new ArrayList<>(command.size());
for (RedisCommand<K, V, T> redisCommand : command) {
result.add(wrapAsync(redisCommand));
}
return result;
}
}
Loading

0 comments on commit 34efe38

Please sign in to comment.