From 975772e37a8ca164c293574b06a4e341244cc8ad Mon Sep 17 00:00:00 2001 From: John Wehr Date: Fri, 27 Jan 2012 14:23:05 -0500 Subject: [PATCH 1/7] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d641c45..a298910 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup( name='telephus', - version='0.8.0~beta1', + version='1.0.0', description='connection pooled, low-level client API for Cassandra in Twisted python', author='brandon@faltering.com', url='http://github.com/driftx/Telephus', From 627ffda7b4d6b7bc0ace9a522c7c03b710f8b516 Mon Sep 17 00:00:00 2001 From: John Wehr Date: Fri, 10 Feb 2012 16:46:21 -0500 Subject: [PATCH 2/7] Batched multi-key inserts, batch adds, and batch multi-key adds. Tests for these functions. Fixed Cassandra version checking in the tests. --- setup.py | 2 +- telephus/client.py | 66 ++++++++++++++++++++++++++++++++++++ telephus/translate.py | 2 +- test/test_cassandraclient.py | 40 ++++++++++++++++++++++ 4 files changed, 108 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index a298910..111e7e2 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/python -from distutils.core import setup +from setuptools import setup setup( name='telephus', version='1.0.0', diff --git a/telephus/client.py b/telephus/client.py index 8093396..71f0d28 100644 --- a/telephus/client.py +++ b/telephus/client.py @@ -179,6 +179,34 @@ def remove_counter(self, key=None, column_family=None, column=None, super_column req = ManagedThriftRequest('remove_counter', key, cp, consistency) return self.manager.pushRequest(req, retries=retries) + @requirekwargs('column_family', 'mapping') + def batch_multikey_add(self, column_family=None, mapping=None, consistency=None, + retries=None): + consistency = consistency or self.consistency + mutmap = dict([(k, {column_family: self._mk_counter_cols_or_supers(v)}) + for k, v in mapping.items()]) + return self.batch_mutate_counters(mutmap, consistency=consistency, retries=retries) + + @requirekwargs('key', 'column_family', 'mapping') + def batch_add(self, key=None, column_family=None, mapping=None, consistency=None, + retries=None): + consistency = consistency or self.consistency + mutmap = {key: {column_family: self._mk_counter_cols_or_supers(mapping)}} + return self.batch_mutate_counters(mutmap, consistency=consistency, retries=retries) + + @requirekwargs('column_family', 'mapping') + def batch_multikey_insert(self, column_family=None, mapping=None, timestamp=None, + consistency=None, retries=None, ttl=None): + for value in mapping.values(): + if isinstance(value, list) and timestamp is not None: + raise RuntimeError('Timestamp cannot be specified with a list of Mutations') + timestamp = timestamp or self._time() + consistency = consistency or self.consistency + mutmap = dict([(k, {column_family: self._mk_cols_or_supers(v, timestamp, ttl)}) + for k, v in mapping.items()]) + return self.batch_mutate(mutmap, timestamp=timestamp, consistency=consistency, + retries=retries) + @requirekwargs('key', 'column_family', 'mapping') def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=None, consistency=None, retries=None, ttl=None): @@ -204,6 +232,25 @@ def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None, req = ManagedThriftRequest('batch_mutate', mutmap, consistency) return self.manager.pushRequest(req, retries=retries) + @requirekwargs('mutationmap') + def batch_mutate_counters(self, mutationmap=None, consistency=None, retries=None): + consistency = consistency or self.consistency + mutmap = defaultdict(dict) + for key, cfmap in mutationmap.iteritems(): + for cf, colmap in cfmap.iteritems(): + cols_or_supers = self._mk_counter_cols_or_supers(colmap) + muts = [] + for c in cols_or_supers: + if isinstance(c, CounterSuperColumn): + muts.append(Mutation(ColumnOrSuperColumn(counter_super_column=c))) + elif isinstance(c, CounterColumn): + muts.append(Mutation(ColumnOrSuperColumn(counter_column=c))) + else: + muts.append(c) + mutmap[key][cf] = muts + req = ManagedThriftRequest('batch_mutate', mutmap, consistency) + return self.manager.pushRequest(req, retries=retries) + @requirekwargs('mutationmap') def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retries=None, ttl=None): timestamp = timestamp or self._time() @@ -226,6 +273,25 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri req = ManagedThriftRequest('batch_mutate', mutmap, consistency) return self.manager.pushRequest(req, retries=retries) + def _mk_counter_cols_or_supers(self, mapping): + if isinstance(mapping, list): + return mapping + colsorsupers = [] + if isinstance(mapping, dict): + first = mapping.keys()[0] + if isinstance(mapping[first], dict): + for name in mapping: + cols = [] + for col,val in mapping[name].iteritems(): + cols.append(CounterColumn(col, val)) + colsorsupers.append(CounterSuperColumn(name=name, columns=cols)) + else: + for col, val in mapping.iteritems(): + colsorsupers.append(CounterColumn(col, val)) + else: + raise TypeError('dict (of dicts) or list of CounterColumn/CounterSuperColumn expected') + return colsorsupers + def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False): if isinstance(mapping, list): return mapping diff --git a/telephus/translate.py b/telephus/translate.py index a74a2d2..b25c56c 100644 --- a/telephus/translate.py +++ b/telephus/translate.py @@ -2,8 +2,8 @@ from telephus.cassandra.c08.constants import VERSION as CASSANDRA_08_VERSION supported_versions = ( - ('0.7', CASSANDRA_07_VERSION), ('0.8', CASSANDRA_08_VERSION), + ('0.7', CASSANDRA_07_VERSION), ) class APIMismatch(Exception): diff --git a/test/test_cassandraclient.py b/test/test_cassandraclient.py index cdcc3a6..ade6396 100644 --- a/test/test_cassandraclient.py +++ b/test/test_cassandraclient.py @@ -114,6 +114,18 @@ def test_insert_get(self): res = yield self.client.get('test2', SCF, column=COLUMN, super_column=SCOLUMN) self.assertEqual(res.column.value, 'superval2') + @defer.inlineCallbacks + def test_batch_multikey_insert_get_slice_and_count(self): + mapping = {'testA':{COLUMN: 'column1A', COLUMN2: 'column2A'}, + 'testB':{COLUMN: 'column1B', COLUMN2: 'column2B'}} + yield self.client.batch_multikey_insert(CF, mapping) + res = yield self.client.get_slice('testA', CF, names=(COLUMN, COLUMN2)) + self.assertEqual(res[0].column.value, 'column1A') + self.assertEqual(res[1].column.value, 'column2A') + res = yield self.client.get_slice('testB', CF, names=(COLUMN, COLUMN2)) + self.assertEqual(res[0].column.value, 'column1B') + self.assertEqual(res[1].column.value, 'column2B') + @defer.inlineCallbacks def test_batch_insert_get_slice_and_count(self): yield self.client.batch_insert('test', CF, @@ -198,6 +210,34 @@ def test_indexed_slices(self): res = yield self.client.get_indexed_slices(IDX_CF, expressions, start_key='') self.assertEquals(res[0].columns[0].column.value,'two') + @defer.inlineCallbacks + def test_counter_batch_multikey_add(self): + if self.version != CASSANDRA_08_VERSION: + raise unittest.SkipTest('Counters are not supported in 0.7') + mapping = { + "keyA":{"col1A":1, "col2A":5}, + "keyB":{"col1B":2, "col2B":3}} + yield self.client.batch_multikey_add(COUNTER_CF, mapping) + res = yield self.client.get('keyA', COUNTER_CF, column='col1A') + self.assertEquals(res.counter_column.value, 1) + res = yield self.client.get('keyA', COUNTER_CF, column='col2A') + self.assertEquals(res.counter_column.value, 5) + res = yield self.client.get('keyB', COUNTER_CF, column='col1B') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('keyB', COUNTER_CF, column='col2B') + self.assertEquals(res.counter_column.value, 3) + + @defer.inlineCallbacks + def test_counter_batch_add(self): + if self.version != CASSANDRA_08_VERSION: + raise unittest.SkipTest('Counters are not supported in 0.7') + mapping = {"col1":1, "col2":5} + yield self.client.batch_add('test', COUNTER_CF, mapping) + res = yield self.client.get('test', COUNTER_CF, column='col1') + self.assertEquals(res.counter_column.value, 1) + res = yield self.client.get('test', COUNTER_CF, column='col2') + self.assertEquals(res.counter_column.value, 5) + @defer.inlineCallbacks def test_counter_add(self): if self.version != CASSANDRA_08_VERSION: From db716a7cf866533c665e6f27c3d4c3f1a647e1d2 Mon Sep 17 00:00:00 2001 From: John Wehr Date: Mon, 13 Feb 2012 16:10:21 -0500 Subject: [PATCH 3/7] Batched deletions. --- telephus/client.py | 11 +++++++++++ test/test_cassandraclient.py | 20 +++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/telephus/client.py b/telephus/client.py index 71f0d28..aa1333c 100644 --- a/telephus/client.py +++ b/telephus/client.py @@ -218,6 +218,17 @@ def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=Non return self.batch_mutate(mutmap, timestamp=timestamp, consistency=consistency, retries=retries) + @requirekwargs('cfmap') + def batch_remove_rows(self, cfmap=None, consistency=None, timestamp=None, retries=None): + timestamp = timestamp or self._time() + consistency = consistency or self.consistency + mutmap = defaultdict(dict) + for cf, keys in cfmap.iteritems(): + for key in keys: + mutmap[key][cf] = [Mutation(deletion=Deletion(timestamp))] + req = ManagedThriftRequest('batch_mutate', mutmap, consistency) + return self.manager.pushRequest(req, retries=retries) + @requirekwargs('cfmap') def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None, reverse=False, consistency=None, timestamp=None, supercolumn=None, diff --git a/test/test_cassandraclient.py b/test/test_cassandraclient.py index ade6396..79952d1 100644 --- a/test/test_cassandraclient.py +++ b/test/test_cassandraclient.py @@ -226,7 +226,19 @@ def test_counter_batch_multikey_add(self): self.assertEquals(res.counter_column.value, 2) res = yield self.client.get('keyB', COUNTER_CF, column='col2B') self.assertEquals(res.counter_column.value, 3) - + mapping = { + "keyA":{"col1A":1, "col2A":1}, + "keyB":{"col1B":1, "col2B":1}} + yield self.client.batch_multikey_add(COUNTER_CF, mapping) + res = yield self.client.get('keyA', COUNTER_CF, column='col1A') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('keyA', COUNTER_CF, column='col2A') + self.assertEquals(res.counter_column.value, 6) + res = yield self.client.get('keyB', COUNTER_CF, column='col1B') + self.assertEquals(res.counter_column.value, 3) + res = yield self.client.get('keyB', COUNTER_CF, column='col2B') + self.assertEquals(res.counter_column.value, 4) + @defer.inlineCallbacks def test_counter_batch_add(self): if self.version != CASSANDRA_08_VERSION: @@ -237,6 +249,12 @@ def test_counter_batch_add(self): self.assertEquals(res.counter_column.value, 1) res = yield self.client.get('test', COUNTER_CF, column='col2') self.assertEquals(res.counter_column.value, 5) + mapping = {"col1":1, "col2":1} + yield self.client.batch_add('test', COUNTER_CF, mapping) + res = yield self.client.get('test', COUNTER_CF, column='col1') + self.assertEquals(res.counter_column.value, 2) + res = yield self.client.get('test', COUNTER_CF, column='col2') + self.assertEquals(res.counter_column.value, 6) @defer.inlineCallbacks def test_counter_add(self): From 707786d72313b25870c407b72a72048ee6ac197f Mon Sep 17 00:00:00 2001 From: John Wehr Date: Thu, 16 Feb 2012 16:00:41 -0500 Subject: [PATCH 4/7] Back to distutils. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 111e7e2..a298910 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/python -from setuptools import setup +from distutils.core import setup setup( name='telephus', version='1.0.0', From 2486e973062d418ff39de69c596b04ea7cab506f Mon Sep 17 00:00:00 2001 From: John Wehr Date: Thu, 16 Feb 2012 16:16:59 -0500 Subject: [PATCH 5/7] batch_remove_rows test. --- test/test_cassandraclient.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/test_cassandraclient.py b/test/test_cassandraclient.py index 79952d1..1341553 100644 --- a/test/test_cassandraclient.py +++ b/test/test_cassandraclient.py @@ -114,6 +114,15 @@ def test_insert_get(self): res = yield self.client.get('test2', SCF, column=COLUMN, super_column=SCOLUMN) self.assertEqual(res.column.value, 'superval2') + @defer.inlineCallbacks + def test_batch_remove_rows(self): + yield self.client.insert('test', CF, 'testval', column=COLUMN) + yield self.client.insert('test2', CF, 'testval2', column=COLUMN) + yield self.client.batch_remove_rows({CF:["test", "test2"]}) + res = yield self.client.multiget(['test', 'test2'], CF, column=COLUMN) + self.assertEqual(len(res['test']), 0) + self.assertEqual(len(res['test2']), 0) + @defer.inlineCallbacks def test_batch_multikey_insert_get_slice_and_count(self): mapping = {'testA':{COLUMN: 'column1A', COLUMN2: 'column2A'}, From 4ce6d5883fbd2f50b4c684dd10f5cc42ad5ab406 Mon Sep 17 00:00:00 2001 From: John Wehr Date: Thu, 16 Feb 2012 16:31:21 -0500 Subject: [PATCH 6/7] And back to 1.0 beta version. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a298910..64e49e1 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup( name='telephus', - version='1.0.0', + version='1.0.0_beta1', description='connection pooled, low-level client API for Cassandra in Twisted python', author='brandon@faltering.com', url='http://github.com/driftx/Telephus', From 1b6fae266515fc8d006922febf4616b74e0e3e03 Mon Sep 17 00:00:00 2001 From: John Wehr Date: Thu, 23 Feb 2012 01:06:11 -0500 Subject: [PATCH 7/7] Version number once again giving SetupTools fits. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 64e49e1..c0dd041 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup( name='telephus', - version='1.0.0_beta1', + version='1.0.0beta1', description='connection pooled, low-level client API for Cassandra in Twisted python', author='brandon@faltering.com', url='http://github.com/driftx/Telephus',