Skip to content
This repository has been archived by the owner on Jan 24, 2023. It is now read-only.

Commit

Permalink
Added support for DUPLICATE_POLICY / ON_DUPLICATE keywords ( TS.INFO …
Browse files Browse the repository at this point in the history
…update accordingly ) (#66)

* [add] Added support for DUPLICATE_POLICY / ON_DUPLICATE keywords ( TS.INFO update accordingly )

* [fix] Fixed CI build issues around pytest not being present
  • Loading branch information
filipecosta90 authored Sep 21, 2020
1 parent 040eaa2 commit b451b43
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 13 deletions.
2 changes: 0 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ jobs:
. venv/bin/activate
REDIS_PORT=6379 coverage run test_commands.py
- early_return_for_forked_pull_requests

- run:
name: codecove
command: |
Expand Down
68 changes: 60 additions & 8 deletions redistimeseries/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TSInfo(object):
# As of RedisTimeseries >= v1.4 max_samples_per_chunk is deprecated in favor of chunk_size
max_samples_per_chunk = None
chunk_size = None
duplicate_policy = None


def __init__(self, args):
Expand All @@ -34,6 +35,10 @@ def __init__(self, args):
self.chunk_size = self.max_samples_per_chunk * 16 # backward compatible changes
if 'chunkSize' in response:
self.chunk_size = response['chunkSize']
if 'duplicatePolicy' in response:
self.duplicate_policy = response['duplicatePolicy']
if type(self.duplicate_policy) == bytes:
self.duplicate_policy = self.duplicate_policy.decode()

def list_to_dict(aList):
return {nativestr(aList[i][0]):nativestr(aList[i][1])
Expand Down Expand Up @@ -163,7 +168,15 @@ def appendChunkSize(params, chunk_size):
if chunk_size is not None:
params.extend(['CHUNK_SIZE', chunk_size])

def create(self, key, retention_msecs=None, uncompressed=False, labels={}, chunk_size=None):
@staticmethod
def appendDuplicatePolicy(params, command, duplicate_policy):
if duplicate_policy is not None:
if command == 'TS.ADD':
params.extend(['ON_DUPLICATE', duplicate_policy])
else:
params.extend(['DUPLICATE_POLICY', duplicate_policy])

def create(self, key, **kwargs):
"""
Create a new time-series.
Expand All @@ -177,28 +190,45 @@ def create(self, key, retention_msecs=None, uncompressed=False, labels={}, chunk
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. )
Can be one of:
- 'block': an error will occur for any out of order sample
- 'first': ignore the new value
- 'last': override with latest value
- 'min': only override if the value is lower than the existing value
- 'max': only override if the value is higher than the existing value
When this is not set, the server-wide default will be used.
"""
retention_msecs = kwargs.get('retention_msecs', None)
uncompressed = kwargs.get('uncompressed', False)
labels = kwargs.get('labels', {})
chunk_size = kwargs.get('chunk_size', None)
duplicate_policy = kwargs.get('duplicate_policy', None)
params = [key]
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendDuplicatePolicy(params, self.CREATE_CMD, duplicate_policy)
self.appendLabels(params, labels)

return self.redis.execute_command(self.CREATE_CMD, *params)

def alter(self, key, retention_msecs=None, labels={}):
def alter(self, key, **kwargs):
"""
Update the retention, labels of an existing key. The parameters
are the same as TS.CREATE.
"""
retention_msecs = kwargs.get('retention_msecs', None)
labels = kwargs.get('labels', {})
duplicate_policy = kwargs.get('duplicate_policy', None)
params = [key]
self.appendRetention(params, retention_msecs)
self.appendDuplicatePolicy(params, self.ALTER_CMD, duplicate_policy)
self.appendLabels(params, labels)

return self.redis.execute_command(self.ALTER_CMD, *params)

def add(self, key, timestamp, value, retention_msecs=None,
uncompressed=False, labels={}, chunk_size=None):
def add(self, key, timestamp, value, **kwargs):
"""
Append (or create and append) a new sample to the series.
Expand All @@ -214,11 +244,25 @@ def add(self, key, timestamp, value, retention_msecs=None,
labels: Set of label-value pairs that represent metadata labels of the key.
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
duplicate_policy: since RedisTimeSeries v1.4 you can specify the duplicate sample policy ( Configure what to do on duplicate sample. )
Can be one of:
- 'block': an error will occur for any out of order sample
- 'first': ignore the new value
- 'last': override with latest value
- 'min': only override if the value is lower than the existing value
- 'max': only override if the value is higher than the existing value
When this is not set, the server-wide default will be used.
"""
retention_msecs = kwargs.get('retention_msecs', None)
uncompressed = kwargs.get('uncompressed', False)
labels = kwargs.get('labels', {})
chunk_size = kwargs.get('chunk_size', None)
duplicate_policy = kwargs.get('duplicate_policy', None)
params = [key, timestamp, value]
self.appendRetention(params, retention_msecs)
self.appendUncompressed(params, uncompressed)
self.appendChunkSize(params, chunk_size)
self.appendDuplicatePolicy(params, self.ADD_CMD, duplicate_policy)
self.appendLabels(params, labels)

return self.redis.execute_command(self.ADD_CMD, *params)
Expand All @@ -237,8 +281,7 @@ def madd(self, ktv_tuples):

return self.redis.execute_command(self.MADD_CMD, *params)

def incrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}, chunk_size=None):
def incrby(self, key, value, **kwargs):
"""
Increment (or create an time-series and increment) the latest sample's of a series.
This command can be used as a counter or gauge that automatically gets history as a time series.
Expand All @@ -256,6 +299,11 @@ def incrby(self, key, value, timestamp=None, retention_msecs=None,
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
timestamp = kwargs.get('timestamp', None)
retention_msecs = kwargs.get('retention_msecs', None)
uncompressed = kwargs.get('uncompressed', False)
labels = kwargs.get('labels', {})
chunk_size = kwargs.get('chunk_size', None)
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
Expand All @@ -265,8 +313,7 @@ def incrby(self, key, value, timestamp=None, retention_msecs=None,

return self.redis.execute_command(self.INCRBY_CMD, *params)

def decrby(self, key, value, timestamp=None, retention_msecs=None,
uncompressed=False, labels={}, chunk_size=None):
def decrby(self, key, value, **kwargs):
"""
Decrement (or create an time-series and decrement) the latest sample's of a series.
This command can be used as a counter or gauge that automatically gets history as a time series.
Expand All @@ -284,6 +331,11 @@ def decrby(self, key, value, timestamp=None, retention_msecs=None,
chunk_size: Each time-serie uses chunks of memory of fixed size for time series samples.
You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes).
"""
timestamp = kwargs.get('timestamp', None)
retention_msecs = kwargs.get('retention_msecs', None)
uncompressed = kwargs.get('uncompressed', False)
labels = kwargs.get('labels', {})
chunk_size = kwargs.get('chunk_size', None)
params = [key, value]
self.appendTimestamp(params, timestamp)
self.appendRetention(params, retention_msecs)
Expand Down
55 changes: 52 additions & 3 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,35 @@ def testCreate(self):
info = rts.info("time-serie-1")
self.assertEqual(128, info.chunk_size)

# Test for duplicate policy
for duplicate_policy in ["block","last","first","min","max"]:
ts_name = "time-serie-ooo-{0}".format(duplicate_policy)
self.assertTrue(rts.create(ts_name, duplicate_policy=duplicate_policy))
info = rts.info(ts_name)
self.assertEqual(duplicate_policy, info.duplicate_policy)

def testAlter(self):
'''Test TS.ALTER calls'''

rts.create(1)
self.assertTrue(rts.create(1))
self.assertEqual(0, rts.info(1).retention_msecs)
rts.alter(1, retention_msecs=10)
self.assertTrue(rts.alter(1, retention_msecs=10))
self.assertEqual({}, rts.info(1).labels)
self.assertEqual(10, rts.info(1).retention_msecs)
rts.alter(1, labels={'Time':'Series'})
self.assertTrue(rts.alter(1, labels={'Time':'Series'}))
self.assertEqual('Series', rts.info(1).labels['Time'])
self.assertEqual(10, rts.info(1).retention_msecs)
pipe = rts.pipeline()
self.assertTrue(pipe.create(2))

if version is None or version < 14000:
return
info = rts.info(1)
self.assertEqual(None, info.duplicate_policy)
self.assertTrue(rts.alter(1, duplicate_policy='min'))
info = rts.info(1)
self.assertEqual('min', info.duplicate_policy)

def testAdd(self):
'''Test TS.ADD calls'''

Expand All @@ -76,6 +90,34 @@ def testAdd(self):
info = rts.info("time-serie-1")
self.assertEqual(128, info.chunk_size)

# Test for duplicate policy BLOCK
self.assertEqual(1, rts.add("time-serie-add-ooo-block", 1, 5.0))
try:
rts.add("time-serie-add-ooo-block", 1, 5.0, duplicate_policy='block')
except Exception as e:
self.assertEqual("TSDB: Error at upsert, update is not supported in BLOCK mode",e.__str__())

# Test for duplicate policy LAST
self.assertEqual(1, rts.add("time-serie-add-ooo-last", 1, 5.0))
self.assertEqual(1, rts.add("time-serie-add-ooo-last", 1, 10.0, duplicate_policy='last'))
self.assertEqual(10.0, rts.get("time-serie-add-ooo-last")[1])

# Test for duplicate policy FIRST
self.assertEqual(1, rts.add("time-serie-add-ooo-first", 1, 5.0))
self.assertEqual(1, rts.add("time-serie-add-ooo-first", 1, 10.0, duplicate_policy='first'))
self.assertEqual(5.0, rts.get("time-serie-add-ooo-first")[1])

# Test for duplicate policy MAX
self.assertEqual(1, rts.add("time-serie-add-ooo-max", 1, 5.0))
self.assertEqual(1, rts.add("time-serie-add-ooo-max", 1, 10.0, duplicate_policy='max'))
self.assertEqual(10.0, rts.get("time-serie-add-ooo-max")[1])

# Test for duplicate policy MIN
self.assertEqual(1, rts.add("time-serie-add-ooo-min", 1, 5.0))
self.assertEqual(1, rts.add("time-serie-add-ooo-min", 1, 10.0, duplicate_policy='min'))
self.assertEqual(5.0, rts.get("time-serie-add-ooo-min")[1])


def testMAdd(self):
'''Test TS.MADD calls'''

Expand Down Expand Up @@ -259,6 +301,13 @@ def testInfo(self):
info = rts.info(1)
self.assertEqual(5, info.retention_msecs)
self.assertEqual(info.labels['currentLabel'], 'currentData')
if version is None or version < 14000:
return
self.assertEqual(None, info.duplicate_policy)

rts.create('time-serie-2', duplicate_policy='min')
info = rts.info('time-serie-2')
self.assertEqual('min', info.duplicate_policy)

def testQueryIndex(self):
'''Test TS.QUERYINDEX calls'''
Expand Down

0 comments on commit b451b43

Please sign in to comment.