-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package com.navercorp.pinpoint.redis.timeseries; | ||
|
||
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; | ||
import io.lettuce.core.RedisFuture; | ||
|
||
import java.util.List; | ||
|
||
public interface RedisTimeseriesAsyncCommands { | ||
|
||
|
||
RedisFuture<Long> tsAdd(String key, long timestamp, double value); | ||
|
||
RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs addOptions); | ||
|
||
|
||
/** | ||
* Delete data in the range of fromTimestamp to toTimestamp. | ||
* @param key key | ||
* @param fromTimestamp fromTimestamp | ||
* @param toTimestamp toTimestamp | ||
* @return timestamp | ||
*/ | ||
RedisFuture<Long> tsDel(String key, long fromTimestamp, long toTimestamp); | ||
|
||
RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp); | ||
// | ||
RedisFuture<TimestampValuePair> tsGet(String key); | ||
// | ||
RedisFuture<List<TimestampValuePair>> tsRevrange(String key, long fromTimestamp, long toTimestamp); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package com.navercorp.pinpoint.redis.timeseries; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection; | ||
import com.navercorp.pinpoint.redis.timeseries.connection.Dispatcher; | ||
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; | ||
import io.lettuce.core.CompositeArgument; | ||
import io.lettuce.core.RedisFuture; | ||
import io.lettuce.core.codec.RedisCodec; | ||
import io.lettuce.core.codec.StringCodec; | ||
import io.lettuce.core.protocol.Command; | ||
import io.lettuce.core.protocol.CommandArgs; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
public class RedisTimeseriesAsyncCommandsImpl implements RedisTimeseriesAsyncCommands { | ||
|
||
private final RedisCodec<String, String> commandCodec = StringCodec.ASCII; | ||
|
||
private final RedisCodec<String, String> outputCodec = StringCodec.UTF8; | ||
|
||
private final AsyncConnection<String, String> connection; | ||
|
||
private final TimeseriesCommandBuilder<String, String> builder = new TimeseriesCommandBuilder<>(commandCodec); | ||
|
||
public RedisTimeseriesAsyncCommandsImpl(AsyncConnection<String, String> connection) { | ||
this.connection = Objects.requireNonNull(connection, "connection"); | ||
} | ||
|
||
private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) { | ||
if (options != null) { | ||
options.build(args); | ||
Check warning on line 33 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java#L33
|
||
} | ||
} | ||
Check warning on line 35 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java#L35
|
||
|
||
@Override | ||
public RedisFuture<Long> tsAdd(String key, long timestamp, double value) { | ||
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0"); | ||
|
||
return tsAdd(key, timestamp, value, null); | ||
Check warning on line 41 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java#L41
|
||
} | ||
|
||
public RedisFuture<Long> tsAdd(String key, long timestamp, double value, TsAddArgs options) { | ||
Preconditions.checkArgument(timestamp >= 0, "timestamp must be greater than or equal to 0"); | ||
|
||
Command<String, String, Long> command = this.builder.tsAdd(key, timestamp, value, options); | ||
|
||
return commands().dispatch(command); | ||
} | ||
|
||
|
||
private Dispatcher<String, String> commands() { | ||
return connection.dispatcher(); | ||
} | ||
|
||
@Override | ||
public RedisFuture<List<TimestampValuePair>> tsRange(String key, long fromTimestamp, long toTimestamp) { | ||
Preconditions.checkArgument(fromTimestamp >= 0, "fromTimestamp must be greater than or equal to 0"); | ||
Preconditions.checkArgument(toTimestamp >= 0, "toTimestamp must be greater than or equal to 0"); | ||
|
||
Command<String, String, List<TimestampValuePair>> cmd = this.builder.tsRange(key, fromTimestamp, toTimestamp); | ||
return commands().dispatch(cmd); | ||
} | ||
|
||
@Override | ||
public RedisFuture<TimestampValuePair> tsGet(String key) { | ||
Command<String, String, TimestampValuePair> cmd = this.builder.toGet(key); | ||
return commands().dispatch(cmd); | ||
} | ||
|
||
@Override | ||
public RedisFuture<Long> tsDel(String key, long fromTimestamp, long toTimestamp) { | ||
Command<String, String, Long> cmd = this.builder.toDel(key, fromTimestamp, toTimestamp); | ||
return commands().dispatch(cmd); | ||
Check warning on line 75 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java#L74-L75
|
||
} | ||
|
||
@Override | ||
public RedisFuture<List<TimestampValuePair>> tsRevrange(String key, long fromTimestamp, long toTimestamp) { | ||
Command<String, String, List<TimestampValuePair>> cmd = this.builder.tsRevrange(key, fromTimestamp, toTimestamp); | ||
return commands().dispatch(cmd); | ||
Check warning on line 81 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/RedisTimeseriesAsyncCommandsImpl.java#L80-L81
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package com.navercorp.pinpoint.redis.timeseries; | ||
|
||
import com.navercorp.pinpoint.redis.timeseries.model.TimestampValuePair; | ||
import com.navercorp.pinpoint.redis.timeseries.protocol.TS; | ||
import io.lettuce.core.CompositeArgument; | ||
import io.lettuce.core.codec.RedisCodec; | ||
import io.lettuce.core.output.CommandOutput; | ||
import io.lettuce.core.output.IntegerOutput; | ||
import io.lettuce.core.protocol.BaseRedisCommandBuilder; | ||
import io.lettuce.core.protocol.Command; | ||
import io.lettuce.core.protocol.CommandArgs; | ||
|
||
import java.util.List; | ||
|
||
public class TimeseriesCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> { | ||
|
||
public TimeseriesCommandBuilder(RedisCodec<K, V> codec) { | ||
super(codec); | ||
} | ||
|
||
public Command<K, V, Long> tsAdd(K key, long timestamp, double value, TsAddArgs options) { | ||
CommandArgs<K, V> args = new CommandArgs<>(codec) | ||
.addKey(key) | ||
.add(timestamp) | ||
.add(value); | ||
|
||
applyOptions(args, options); | ||
|
||
CommandOutput<K, V, Long> output = new IntegerOutput<>(codec); | ||
|
||
return tsCommand(TS.ADD, output, args); | ||
} | ||
|
||
private <T> Command<K, V, T> tsCommand(TS ts, CommandOutput<K, V, T> output, CommandArgs<K, V> args) { | ||
return new Command<>(ts, output, args); | ||
} | ||
|
||
|
||
private <K, V> void applyOptions(CommandArgs<K, V> args, CompositeArgument options) { | ||
if (options != null) { | ||
options.build(args); | ||
} | ||
} | ||
|
||
public Command<K, V, List<TimestampValuePair>> tsGet(K key) { | ||
CommandArgs<K, V> args = new CommandArgs<>(codec) | ||
.addKey(key); | ||
ArrayTimestampValueOutput<K, V> output = new ArrayTimestampValueOutput<>(codec); | ||
return tsCommand(TS.GET, output, args); | ||
Check warning on line 49 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java#L46-L49
|
||
} | ||
|
||
|
||
public Command<K, V, List<TimestampValuePair>> tsRange(K key, long fromTimestamp, long toTimestamp) { | ||
return tsRange(TS.RANGE, key, fromTimestamp, toTimestamp); | ||
} | ||
|
||
public Command<K, V, List<TimestampValuePair>> tsRevrange(K key, long fromTimestamp, long toTimestamp) { | ||
return tsRange(TS.REVRANGE, key, fromTimestamp, toTimestamp); | ||
Check warning on line 58 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java#L58
|
||
} | ||
|
||
public Command<K, V, List<TimestampValuePair>> tsRange(TS ts, K key, long fromTimestamp, long toTimestamp) { | ||
CommandArgs<K, V> args = new CommandArgs<>(codec) | ||
.addKey(key) | ||
.add(fromTimestamp) | ||
.add(toTimestamp); | ||
ArrayTimestampValueOutput<K, V> output = new ArrayTimestampValueOutput<>(codec); | ||
return tsCommand(ts, output, args); | ||
} | ||
|
||
public Command<K, V, Long> toDel(K key, long fromTimestamp, long toTimestamp) { | ||
CommandArgs<K, V> args = new CommandArgs<>(codec) | ||
.addKey(key) | ||
.add(fromTimestamp) | ||
.add(toTimestamp); | ||
Check warning on line 74 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java#L71-L74
|
||
|
||
return tsCommand(TS.DEL, new IntegerOutput<>(codec), args); | ||
Check warning on line 76 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/TimeseriesCommandBuilder.java#L76
|
||
} | ||
|
||
public Command<K, V, TimestampValuePair> toGet(K key) { | ||
CommandArgs<K, V> args = new CommandArgs<>(codec) | ||
.addKey(key); | ||
return tsCommand(TS.GET, new MetricOutput<>(codec), args); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package com.navercorp.pinpoint.redis.timeseries.connection; | ||
|
||
public interface AsyncConnection<K, V> extends AutoCloseable { | ||
Dispatcher<K, V> dispatcher(); | ||
|
||
SyncDispatcher<K, V> sync(); | ||
|
||
@Override | ||
void close(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package com.navercorp.pinpoint.redis.timeseries.connection; | ||
|
||
import io.lettuce.core.RedisFuture; | ||
import io.lettuce.core.api.StatefulConnection; | ||
import io.lettuce.core.protocol.AsyncCommand; | ||
import io.lettuce.core.protocol.RedisCommand; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Objects; | ||
|
||
public class AsyncDispatcher<K, V> implements Dispatcher<K, V> { | ||
|
||
private final StatefulConnection<K, V> connection; | ||
|
||
public AsyncDispatcher(StatefulConnection<K, V> connection) { | ||
this.connection = Objects.requireNonNull(connection, "connection"); | ||
} | ||
|
||
@Override | ||
public <T> RedisFuture<T> dispatch(RedisCommand<K, V, T> command) { | ||
AsyncCommand<K, V, T> asyncCommand = wrapAsync(command); | ||
|
||
RedisCommand<K, V, T> result = connection.dispatch(asyncCommand); | ||
|
||
return (AsyncCommand<K, V, T>) result; | ||
} | ||
|
||
private <T> AsyncCommand<K, V, T> wrapAsync(RedisCommand<K, V, T> command) { | ||
return new AsyncCommand<>(command); | ||
} | ||
|
||
@Override | ||
public <T> Collection<RedisFuture<T>> dispatch(Collection<RedisCommand<K, V, T>> command) { | ||
List<AsyncCommand<K, V, T>> async = wrapAsyncCommands(command); | ||
Check warning on line 36 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java#L36
|
||
|
||
Collection<RedisCommand<K, V, ?>> result = connection.dispatch(async); | ||
Check warning on line 38 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java#L38
|
||
|
||
// List<? extends RedisFuture<?>> list1 = result.stream().map(c -> (RedisFuture<?>) c).toList(); | ||
return (Collection<RedisFuture<T>>) (Collection<?>) result; | ||
Check warning on line 41 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java#L41
|
||
} | ||
|
||
private <T> List<AsyncCommand<K, V, T>> wrapAsyncCommands(Collection<RedisCommand<K, V, T>> command) { | ||
List<AsyncCommand<K, V, T>> result = new ArrayList<>(command.size()); | ||
Check warning on line 45 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java#L45
|
||
for (RedisCommand<K, V, T> redisCommand : command) { | ||
result.add(wrapAsync(redisCommand)); | ||
} | ||
return result; | ||
Check warning on line 49 in redis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java Codecov / codecov/patchredis-timeseries/src/main/java/com/navercorp/pinpoint/redis/timeseries/connection/AsyncDispatcher.java#L47-L49
|
||
} | ||
} |