-
Notifications
You must be signed in to change notification settings - Fork 1
Examples
Simple column update
keyspace.prepareColumnMutation(CF_STANDARD1, rowKey, "Column1")
.putValue("1234", null)
.execute();
Batch mutation
MutationBatch m = keyspace.prepareMutationBatch();
long rowKey = 1234;
// Setting columns in a standard column
m.withRow(CF_STANDARD1, rowKey)
.putColumn("Column1", "X", null)
.putColumn("Column2", "X", null);
m.withRow(CF_STANDARD1, rowKey2)
.putColumn("Column1", "Y", null);
try {
OperationResult<Void> result = m.execute();
} catch (ConnectionException e) {
LOG.error(e);
}
The last argument to putColumn is the TTL. TTL is in seconds and is set on a per column basis. The cassandra cluster will automatically delete records after their TTL has expired without the need for any batch cleanup jobs. Setting TTL to null means there is no TTL.
Note: It is not currently possible to set a TTL on counter columns.
You can hook into a BatchMutation operation to provide your own WAL functionality. When executing a batch mutation Astyanax will first call the WAL with the mutation, execute the operation, and call either commit or remove to commit the mutation to the WAL on failure or remove from the WAL on success.
MutationBatch m = keyspace
.prepareMutationBatch()
.usingWriteAheadLog(writeAheadLog);
// ... fill the mutation
try {
OperationResult<Void> result = m.execute();
} catch (ConnectionException e) {
LOG.error(e);
}
MutationBatch m = keyspace.prepareMutationBatch();
// Deleting a standard column
m.withRow(CF_STANDARD1, rowKey)
.deleteColum("Column1");
// Deleting an entire row
m.withRow(CF_STANDARD1, rowKey2)
.delete();
try {
OperationResult<Void> result = m.execute();
} catch (ConnectionException e) {
LOG.error(e);
}
Column<String> result = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.getColumn("Column1")
.execute().getResult();
String value = result.getStringValue();
Query an entire row
ColumnList<String> result = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.execute().getResult();
if (!result.isEmpty()) {
...
}
ColumnList<String> result = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.execute().getResult();
if (!result.isEmpty()) {
...
}
Astyanax distinguishes between failover and retry. Failover is invoked within the context of selecting a connection from the connection pool or failing over if the connection is terminated or times out. Retry operates on top of the connection pool failover and retries the entire operation with a new failover context. Retry is useful when you want to retry after all lower level connection pool failovers have been exhausted. Retry implements backoff whereas failover in the connection pool waits for connections to be available. Use a retry policy when you want your app to be more resilient when there are lots of timeouts from cassandra, the connection pool is overwhelmed with requests of if there is a temporary cassandra outage. A retry policy may be added to a query or a mutation immediately after preparing the query or mutation object by calling withRetryPolicy. Astyanax provides the following retry policies: RunOnce (this is the default), RetryNTimes (no backoff), ConstantBackoff, ExponentialBackoff, BoundedExponentialBackoff.
ColumnList<String> result = keyspace.prepareQuery(CF_STANDARD1)
.withRetryPolicy(new ExponentialBackoff(250, 5))
.getKey(rowKey)
.execute().getResult();
if (!result.isEmpty()) {
...
}
ColumnList<String> columns;
int pageize = 10;
try {
RowQuery<String, String> query = keyspace
.prepareQuery(CF_STANDARD1)
.getKey("A")
.setIsPaginating()
.withColumnRange(new RangeBuilder().setMaxSize(pageize).build());
while (!(columns = query.execute().getResult()).isEmpty()) {
for (Column<String> c : columns) {
}
}
} catch (ConnectionException e) {
}
Query all with callback
This query breaks up the keys into token ranges and queries each range in a separate thread.
keyspace.prepareQuery(CF_STANDARD1)
.getAllRows()
.setRowLimit(100) // Read in blocks of 100
.setRepeatLastToken(false)
.withColumnRange(new RangeBuilder().setLimit(2).build())
.executeWithCallback(new RowCallback<String, String>() {
@Override
public void success(Rows<String, String> rows) {
// Do something with the rows that were fetched. Called once per block.
}
@Override
public boolean failure(ConnectionException e) {
return true; // Returning true will continue, false will terminate the query
}
});
Cassandra provides an API to count the number of columns in a reponse without returning the query data. This is not a constant time operation because Cassandra actually has to read the row and count the columns. This will be optimized in a future version.
int count = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.getCount()
.execute().getResult();
Use a column slice to narrow down the range of columns returned in a query. A column slice can be added to any of the queries by calling setColumnSlice on the query object prior to calling execute(). Columns slices come in two flavors, column slice and column range. Use wtihColumnSlice to return a non-contiguous set of columns. Use withColumnRange to return an ordered range of slices.
This is the general format of a column slice.
ColumnList<String> result;
result = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.withColumnRange(new RangeBuilder().setStart("firstColumn").setEnd("lastColumn").setMaxSize(100).build())
.execute().getResult();
if (!result.isEmpty()) {
...
}
Let’s assume you have data that looks like this,
CF_STANDARD1:{
"Prefixes":{
"Prefix1_a":1,
"Prefix1_b":2,
"Prefix2_a":3,
}
}
To get a slice of columns that start with “Prefix1”, perform the following query
OperationResult<ColumnList<String>> r = keyspace.prepareQuery(CF_STANDARD1)
.getKey("Prefixes")
.withColumnRange(new RangeBuilder()
.setStart("Prefix1_\u00000")
.setEnd("Prefix1_\uffff")
.setLimit(Integer.MAX_VALUE).build())
.execute();
OperationResult<ColumnList<String>> r = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.withColumnRange(new RangeBuilder().setMaxSize(5).build())
.execute();
OperationResult<ColumnList<String>> r = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.withColumnRange(new RangeBuilder().setReversed().setLimit(5).build())
.execute();
Use this type of column slice when you have fixed column names.
OperationResult<ColumnList<String>> r = keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey)
.withColumnSlice("First", "Last", "Age")
.execute();
public static final ColumnFamily<Long, String> CF_COUNTER1 =
new ColumnFamily<Long, String>(
"CounterColumnFamily",
LongSerializer.get(),
StringSerializer.get());
To increment using the single column mutator.
keyspace.prepareColumnMutation(CF_COUNTER1, rowKey, "CounterColumn1")
.incrementCounterColumn(1)
.execute();
To increment using the batch mutator
MutationBatch m = keyspace.prepareMutationBatch();
m.withRow(CF_COUNTER1, rowKey)
.incrementCounterColumn("MyCounter", 100);
m.execute();
Counter column values are retrieved using the same call as regular column except for that all calls except for getLongValue() will throw an exception.
Column<String> result = keyspace.prepareQuery(CF_COUNTER1)
.getKey(rowKey),
.getColumn("Column1"),
.execute().getResult();
Long counterValue = result.getLongValue();
To clear a counter first read the value and then add the negative of that value. Counter columns do not have TTL.
The following example shows how a composite column is used to store session events for a user. The row key is the user id and the column name is a composite of a session id and a timestamp. Using the composite columns we can search for all events for a specific session id sorted by event timestamp.
// Annotated composite class
public class SessionEvent{
@Component(ordinal=0) String sessiondId;
@Component(ordinal=1) UUID timestamp;
// Must have public default constructor
public SessionEvent() {
}
}
static AnnotatedCompositeSerializer<SessionEvent> eventSerializer
= new AnnotatedCompositeSerializer<SessionEvent>(SessionEvent.class);
static ColumnFamily<String, SessionEvent> CF_SESSION_EVENTS
= new ColumnFamily<String, SessionEvent>("SessionEvents",
StringSerializer.get(), eventSerializer);
// Querying cassandra for an entire row
OperationResult<ColumnList<SessionEvent>> result = keyspace.prepareQuery(CF_SESSION_EVENTS)
.getKey("UserId1")
.withColumnRange(eventSerializer.buildRange()
.withPrefix("SessionId1")
.greaterThanEquals(TimeUUIDUtils.getMicrosTimeUUID(startTime))
.lessThanEquals(TimeUUIDUtils.getMicrosTimeUUID(endTime)))
).execute();
The PrefixedSerializer is useful when you want to store keys or column names from different data domains in the same column family but need to avoid collisions.
The following example shows how to store keys from different domains in the same column family. To do this you actually need to create a separate ColumnFamily definition for each prefix. Note that prefixes must all have the same data type but that the other serializer may be different. This is not the case with column names since column names are sorted. Using different c. For column names it is better to use the CompositeSerializer (coming soon to Astyanax).
ColumnFamily<String, String> cfPrefix1 = new ColumnFamily<String, String>("Standard1",
new PrefixedSerializer<String, String>("Prefix1_", StringSerializer.get(), StringSerializer.get()),
StringSerializer.get(),
ColumnType.STANDARD);
ColumnFamily<String, String> cfPrefix2 = new ColumnFamily<String, String>("Standard1",
new PrefixedSerializer<String, String>("Prefix2_", StringSerializer.get(), StringSerializer.get()),
StringSerializer.get(),
ColumnType.STANDARD);
MutationBatch m = keyspace.prepareMutationBatch();
m.withRow(cfPrefix1, "A") // This key is actually "Prefix1_A"
.putColumn("Column1", "Value1", null);
m.withRow(cfPrefix2, "A") // This key is actually "Prefix2_A"
.putColumn("Column1", "Value2", null);
The Rows object returned by the query transparently paginates through all rows in the column family. Since queries to the keyspace are actually done through the iteration it is necessary to set an ExceptionCallback for your application to handle the exceptions. Return true from the callback to retry or false to exit the iteration loop.
Rows<String, String>> rows;
try {
rows = keyspace.prepareQuery("ColumnFamilyName")
.getAllRows()
.setBlockSize(10)
.withColumnRange(new RangeBuilder().setMaxSize(10).build())
.setExceptionCallback(new ExceptionCallback() {
@Override
public boolean onException(ConnectionException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
}
return true;
}})
.execute().getResult();
} catch (ConnectionException e) {
}
// This will never throw an exception
for (Row<String, String> row : rows.getResult()) {
LOG.info("ROW: " + row.getKey() + " " + row.getColumns().size());
}
If you just want to get the keys then simply add a column slice with size 0
OperationResult<Rows<String, String> result =
keyspace.prepareQuery(CF_STANDARD1)
.getAllRows()
.withColumnRange(new RangeBuilder().setLimit(0).build()) // RangeBuilder will be available in version 1.13
.execute();
Copy results of a query to another column family
OperationResult<Void> result =
keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey).copyTo(CF_STANDARD2, rowKey)
.execute();
OperationResult<Void> result =
keyspace.prepareQuery(CF_STANDARD1)
.getKey(rowKey).copyTo(CF_STANDARD2, rowKey)
.execute();
Settings a different consistency level for each call
OperationResult<Void> result =
keyspace.prepareQuery(CF_STANDARD1)
.setConsistencyLevel(ConsistencyLevel.CL_ONE)
.getKey("Key1").copyTo(CF_STANDARD2, "Key2")
.execute();
OperationResult<Void> result =
keyspace.prepareQuery(CF_STANDARD1)
.setConsistencyLevel(ConsistencyLevel.CL_ONE)
.getKey("Key1").copyTo(CF_STANDARD2, "Key2")
.execute();
To use secondary indexes you must first configure your column family with column metadata that tells cassandra for which columns to create to secondary index. Cassandra currrently only supports KEYS index types which is essentially a hash lookup.
create column family UserInfo with
comparator = UTF8Type and
column_metadata =
[
{column_name: first, validation_class: UTF8Type},
{column_name: last, validation_class: UTF8Type},
{column_name: age, validation_class: UTF8Type, index_type: KEYS}
];
OperationResult<Rows<String, String>> result;
result = keyspace.prepareQuery(CF_STANDARD1)
.searchWithIndex()
.setLimit(100) // Number of rows returned
.addExpression()
.whereColumn("age").equals().value(26)
.execute();
In the event that you want to reuse an index expression then you can create a prepared index expression from the column family and then provide it to the index query by calling addPreparedExpressions. Expressions in the list are anded (sorry there is no OR in Cassandra).
PreparedIndexExpression<String, String> clause = CF_STANDARD1.newIndexClause().whereColumn("Index1").equals().value(26);
OperationResult<Rows<String, String>> result;
result = keyspace.prepareQuery(MockConstants.CF_STANDARD1)
.searchWithIndex()
.setStartKey("")
.addPreparedExpressions(Arrays.asList(clause))
.execute();
When using an index query to query large result sets it is best to paginate through the result otherwise you are likely to get timeout exceptions from cassandra.
IndexQuery<String, String> query = keyspace.prepareQuery(CF_STANDARD1)
.searchWithIndex()
.setLimit(10) // This is the page size
.setIsPaginating()
.addExpression()
.whereColumn("Index2").equals().value(42);
while (!(result = query.execute()).getResult().isEmpty()) {
pageCount++;
rowCount += result.getResult().size();
for (Row<String, String> row : result.getResult()) {
}
}
TODO
TODO
TODO
A simple method for mapping between a Java bean and a Column Family is available. You need to create a Java bean that maps to columns in a Column Family. Annotations are used to denote persistent columns and the key. E.g.
public class MyBean {
@Id("ID")
private int myKey;
@Column("NAME")
private String myName;
@Column("TIME")
private Date myDate;
... getters/setters go here ...
}
Once you have an annotated bean, use the {{Mapping}} class to manage the mapping:
Mapping<MyBean> mapper = Mapping.make(MyBean.class);
...
// reading into a bean
ColumnList<String> result = keyspace.prepareQuery(columnFamily).getKey(id).execute().getResult();
MyBean bean = mapper.newInstance(result);
// writing a bean
MutationBatch mutationBatch = keyspace.prepareMutationBatch();
ColumnListMutation<String> columnListMutation = mutationBatch.withRow(columnFamily, id);
mapper.fillMutation(myBean, columnListMutation);
mutationBatch.execute();
Because there is overhead to creating a Mapping instance, you should use the {{MappingCache}} to access Mapping instances. So, the modified code would be:
MappingCache cache = ... // usually a static or a field in a class
Mapping<MyBean> mapper = cache.getMapping(MyBean.class);
...
A high level abstraction for mapping is provided by the {{MappingUtil}} class. It has methods that function like a HashMap except that values are stored/read from Cassandra. E.g.
MappingUtil mapper = new MappingUtil(keyspace, new MappingCache());
...
mapper.put(columnFamily, myBean);
...
MyBean bean = mapper.get(columnFamily, id, MyBean.class);
...
List<MyBean> allBeans = mapper.getAll(columnFamily, MyBean.class);
...
mapper.remove(columnFamily, myBean);
CQL is the Cassandra equivalent to SQL but only supports a small subset of the SQL syntax.
Query for row data from cassandra. Notice that this query always returns a list of Rows even if the query is for a specific column.
try {
OperationResult<CqlResult<String, String>> result
= keyspace.prepareQuery(CF_STANDARD1)
.withCql("SELECT * FROM Standard1;")
.execute();
for (Row<String, String> row : result.getResult()) {
}
} catch (ConnectionException e) {
}
Query for count of columns
try {
OperationResult<CqlResult<String, String>> result
= keyspace.prepareQuery(CF_STANDARD1)
.withCql("SELECT count(*) FROM Standard1 where KEY='A';")
.execute();
System.out.println("CQL Count: " + result.getResult().getNumber());
} catch (ConnectionException e) {
}
One of the pain points of using cassandra is the inability to do an efficient range search. For example, SELECT * FROM CF_USERS WHERE AGE >= 10 AND AGE <= 40. To do this with secondary indexes you need to throw in an equality clause and then have cassandra read all rows matching the equality and evaluate the range linearly. This can be very inefficient. An alternative is to read all rows and execute the range filter in java but that is even more inefficient.
This can be solved with custom reverse indexes. These indexes have to be built manually but can be very powerful. The idea behind a secondary index is to create a separate column family where the key name is the index (usually sharded over multiple keys) and the columns are a composite of the value + foreign key (Yes, foreign key\!\!\!). The benefit here is that each index shard is sorted by a value (the value from the data column family). So for the above example the data and index column families would be
CF_USERS
RowKey: username (UTF8Type)
ComparatorType: UTF8Type
Example:
"elandau" : {
"FIRST" : "Eran",
"LAST" : "Landau",
"AGE" : 34,
}
CF_INDEX
RowKey: AGE_[0..10] (11 shards here)
CompartorType: CompositeType(LongType, UTF8Type)
Example:
"AGE_0" : {
5:"ariellelandau" : null,
34:"elandau" : null,
60:"lindalandau" : null
}
"AGE_1" : {
12:"amitlandau":null
33:"nettalandau":null
}
Notice that the first part of the composite has type equal to the value type and the second part has type equal to the row key (in the other column family)
To use the reverse index you need to first search CF_INDEX by reading all rows in the shard and applying a column range from 10 to 40. Once you have all the index entries you extract the foreign key from the column names and do a get from the CF_USERS column family. The complexity here is how to construct the query to be most efficient. To do so you may need to break up the index shard query into small key slices that can be executed in parallel. Once you start reading each shard you may need to paginate through it if it holds a lot of columns.
Astyanax provides a recipe to do all of this in one call.
ReverseIndexQuery.newQuery(keyspace, CF_USERS, INDEX_CF_NAME, LongSerializer.get())
.withIndexShards(new Shards.StringShardBuilder().setPrefix("AGE_").setShardCount(11).build())
.fromIndexValue(10L)
.toIndexValue(40L)
.withColumnSlice(Arrays.asList("FIRST", "LAST", "AGE"))
.forEach(new Callback<Row<String, String>>() {
@Override
public void handle(Row<String, String> row) {
// Process your data rows here: The row should have columns "FIRST", "LAST", "AGE"
})
.execute();
What’s missing? You currently need to build the custom index on your own when you update your data. In the future Astyanax will have a simple DAO that will do this transparently. Also, a future version of cassandra will provide trigger functionality which could be used by cassandra to built the reverse index for you.
Cassandra does not provide built in uniqueness constraint on row keys. Astyanax provides a recipe which follows a write/read sequence that guarantees uniqueness constraint. This is a suboptimal use case for cassandra and should be used sparingly where guaranteeing uniqueness cannot be avoided.
The following example shows how to set up a uniqueness constraint on a column family that has long key type and String columns name type. The assumption here is that the same CF is used to store data as well as the columns used to guarantee uniqueness.
UniquenessConstraintWithPrefix<Long> unique = new UniquenessConstraintWithPrefix<Long>(keyspace, CF_DATA)
.setPrefix("unique_") // This is optional and can be used to distinguish between the unique column name and real columns
.setTtl(60); // This is optional
try {
String column = unique.isUnique(someRowKey);
if (column == null) {
// Not unique
}
else {
// Is unique.
// Make sure to store the returned column with your data otherwise it will TTL and uniquess will be lost
}
} catch (ConnectionException e) {
}
QUORUM consistency level is used by default but can be changed by calling .setConsistencyLevel(ConsistencyLevel.CL_QUORUM_EACH)
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Jobs