diff --git a/entitydb-entity-store/entitydb-entity-store-cassandra/src/main/java/ai/philterd/entitydb/entitystore/cassandra/CassandraEntityStore.java b/entitydb-entity-store/entitydb-entity-store-cassandra/src/main/java/ai/philterd/entitydb/entitystore/cassandra/CassandraEntityStore.java index aa5d094..df8f8ac 100644 --- a/entitydb-entity-store/entitydb-entity-store-cassandra/src/main/java/ai/philterd/entitydb/entitystore/cassandra/CassandraEntityStore.java +++ b/entitydb-entity-store/entitydb-entity-store-cassandra/src/main/java/ai/philterd/entitydb/entitystore/cassandra/CassandraEntityStore.java @@ -25,21 +25,11 @@ import ai.philterd.entitydb.model.entitystore.EntityIdGenerator; import ai.philterd.entitydb.model.entitystore.EntityStore; import ai.philterd.entitydb.model.entitystore.QueryResult; -import ai.philterd.entitydb.model.eql.EntityMetadataFilter; import ai.philterd.entitydb.model.eql.EntityQuery; import ai.philterd.entitydb.model.exceptions.EntityStoreException; import ai.philterd.entitydb.model.exceptions.MalformedAclException; import ai.philterd.entitydb.model.exceptions.NonexistantEntityException; import ai.philterd.entitydb.model.search.IndexedEntity; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; -import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; @@ -49,6 +39,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -59,16 +50,6 @@ import java.util.Set; import java.util.UUID; -import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; -import static com.datastax.driver.core.querybuilder.QueryBuilder.contains; -import static com.datastax.driver.core.querybuilder.QueryBuilder.containsKey; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.gte; -import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; -import static com.datastax.driver.core.querybuilder.QueryBuilder.lte; -import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static com.datastax.driver.core.querybuilder.QueryBuilder.set; - /** * Implementation of {@link EntityStore} that uses a Cassandra database. * Query support is provided through the {@link EntityQuery} class. @@ -100,14 +81,12 @@ public CassandraEntityStore(String host, int port, String keySpace) { this.host = host; this.keySpace = keySpace; - final Cluster cluster = Cluster.builder() - .addContactPoint(host) - .withPort(port) - .withQueryOptions(new QueryOptions().setFetchSize(DEFAULT_PAGE_SIZE)) + this.session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(host, port)) + .withKeyspace(keySpace) + .withLocalDatacenter("datacenter1") .build(); - this.session = CqlSession.builder().build(); - } /** @@ -117,7 +96,7 @@ public CassandraEntityStore(String host, int port, String keySpace) { */ public CassandraEntityStore(CqlSession session, String keySpace) { - this.host = session.getContext().getSessionName(); + this.host = session.getName(); this.keySpace = keySpace; this.session = session; @@ -139,18 +118,13 @@ public List getNonIndexedEntities(int limit) { // The "indexed" column is indexed, but we can only use one index. // Because of being restricted to a single index, the "visible" // column is checked on each returned entity and not in the query. - // That's why the "visible" condition is commented out in the below query. - - final SimpleStatement simpleStatement = SimpleStatement.newInstance("SELECT * FROM my_table WHERE id = 1").setKeyspace(CqlIdentifier.fromCql("my_keyspace")); - final Select select = QueryBuilder.select() - .all() - .from(keySpace, TABLE_NAME) - .where(eq("indexed", Long.valueOf(0))) - //.and(eq("visible", Long.valueOf(1))) - .limit(limit); + final SimpleStatement statement = SimpleStatement.builder("SELECT * FROM " + TABLE_NAME + " WHERE indexed = ?") + .addPositionalValues("0") + .setKeyspace(keySpace) + .build(); - final ResultSet resultSet = session.execute(simpleStatement); + final ResultSet resultSet = session.execute(statement); final List cassandraStoredEntities = new LinkedList(); @@ -185,17 +159,18 @@ public boolean markEntityAsIndexed(String entityId) { if(entity != null) { - final Statement statement = QueryBuilder - .update(keySpace, TABLE_NAME) - .with(set("indexed", System.currentTimeMillis())) - .where(eq("id", entityId)); + final SimpleStatement statement = SimpleStatement.builder("UPDATE " + TABLE_NAME + " SET indexed = ? WHERE id = ?") + .addPositionalValues("0") + .addPositionalValues(entityId) + .setKeyspace(keySpace) + .build(); try { final ResultSet resultSet = session.execute(statement); // wasApplied() is not the right function here because - // this is not a conditional update but it does indicate + // this is not a conditional update, but it does indicate // that execution completed. result = resultSet.wasApplied(); @@ -239,46 +214,27 @@ public String storeEntity(Entity entity, String acl) throws EntityStoreException try { - final Date timestamp = new Date(); - - // Executing in a batch doesn't help performance - it provides atomicity for the insert. - final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); - - final PreparedStatement entityInsert = session.prepare(insertInto(keySpace, TABLE_NAME) - .value("id", bindMarker()) - .value("text", bindMarker()) - .value("confidence", bindMarker()) - .value("type", bindMarker()) - .value("context", bindMarker()) - .value("documentid", bindMarker()) - .value("uri", bindMarker()) - .value("language", bindMarker()) - .value("extractiondate", bindMarker()) - .value("acl", bindMarker()) - .value("metadata", bindMarker()) - .value("timestamp", bindMarker()) - .value("visible", bindMarker()) - .value("indexed", bindMarker())); - - final BoundStatement boundEntityInsert = entityInsert.bind( - entityId, - entity.getText(), - entity.getConfidence(), - entity.getType(), - entity.getContext(), - entity.getDocumentId(), - entity.getUri(), - entity.getLanguageCode(), - timestamp.getTime(), - acl, - entity.getMetadata(), - System.currentTimeMillis(), - 1, - 0L); - - batchStatement.add(boundEntityInsert); - - session.execute(batchStatement); + final Date timestamp = new Date(); + + final SimpleStatement statement = SimpleStatement.builder("INSERT INTO " + TABLE_NAME + " id, text, confidence, type, context, documentid, uri, language, extractiondate, acl, metadata, timestamp, visible, indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + .addPositionalValues(entityId) + .addPositionalValues(entity.getText()) + .addPositionalValues(entity.getConfidence()) + .addPositionalValues(entity.getType()) + .addPositionalValues(entity.getContext()) + .addPositionalValues(entity.getDocumentId()) + .addPositionalValues(entity.getUri()) + .addPositionalValues(entity.getLanguageCode()) + .addPositionalValues(timestamp.getTime()) + .addPositionalValues(acl) + .addPositionalValues(entity.getMetadata()) + .addPositionalValues(System.currentTimeMillis()) + .addPositionalValues(1) + .addPositionalValues(0L) + .setKeyspace(keySpace) + .build(); + + session.execute(statement); } catch (Exception ex) { @@ -319,60 +275,34 @@ public String updateAcl(String entityId, String acl) throws EntityStoreException // Make the ID for the new entity and set it. newEntityId = EntityIdGenerator.generateEntityId(cloned.getText(), cloned.getConfidence(), cloned.getLanguage(), cloned.getContext(), cloned.getDocumentId(), acl); cloned.setId(newEntityId); - - // Executing in a batch doesn't help performance - it provides atomicity for the insert. - final BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.LOGGED); - - final PreparedStatement entityInsert = session.prepare(insertInto(keySpace, TABLE_NAME) - .value("id", bindMarker()) - .value("text", bindMarker()) - .value("confidence", bindMarker()) - .value("type", bindMarker()) - .value("context", bindMarker()) - .value("documentid", bindMarker()) - .value("uri", bindMarker()) - .value("language", bindMarker()) - .value("extractiondate", bindMarker()) - .value("acl", bindMarker()) - .value("visible", bindMarker()) - .value("timestamp", bindMarker()) - .value("indexed", bindMarker()) - .value("metadata", bindMarker())); - - final BoundStatement boundEntityInsert = entityInsert.bind( - newEntityId, - cloned.getText(), - cloned.getConfidence(), - cloned.getType(), - cloned.getContext(), - cloned.getDocumentId(), - cloned.getUri(), - cloned.getLanguage(), - cloned.getExtractionDate(), - acl, - cloned.getVisible(), - System.currentTimeMillis(), - cloned.getIndexed(), - cloned.getMetadata()); - - final Statement updateEntityStatement = - QueryBuilder - .update(keySpace, TABLE_NAME) - .with(set("visible", 0)) - .where(eq("id", entityId)); - - batchStatement.add(updateEntityStatement); - batchStatement.add(boundEntityInsert); - - try { - - session.execute(batchStatement); - - } catch (Exception ex) { - - LOGGER.error("Entity " + entityId + " could not be set as not visible.", ex); - - } + + final SimpleStatement newEntityStatement = SimpleStatement.builder("INSERT INTO " + TABLE_NAME + " id, text, confidence, type, context, documentid, uri, language, extractiondate, acl, metadata, timestamp, visible, indexed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + .addPositionalValues(newEntityId) + .addPositionalValues(cloned.getText()) + .addPositionalValues(cloned.getConfidence()) + .addPositionalValues(cloned.getType()) + .addPositionalValues(cloned.getContext()) + .addPositionalValues(cloned.getDocumentId()) + .addPositionalValues(cloned.getUri()) + .addPositionalValues(cloned.getLanguage()) + .addPositionalValues(cloned.getExtractionDate()) + .addPositionalValues(acl) + .addPositionalValues(cloned.getMetadata()) + .addPositionalValues(System.currentTimeMillis()) + .addPositionalValues(1) + .addPositionalValues(0L) + .setKeyspace(keySpace) + .build(); + + final SimpleStatement hideEntityStatement = SimpleStatement.builder("UPDATE " + TABLE_NAME + " SET visible = ? WHERE id = ?") + .addPositionalValues("0") + .addPositionalValues(entityId) + .setKeyspace(keySpace) + .build(); + + // TODO: Execute these statements in a transaction. + session.execute(newEntityStatement); + session.execute(hideEntityStatement); } else { @@ -406,9 +336,10 @@ public Map storeEntities(Set entities, String acl) throw @Override public void deleteEntity(String entityId) { - final Statement statement = QueryBuilder.delete() - .from(keySpace, TABLE_NAME) - .where(eq("id", entityId)); + final SimpleStatement statement = SimpleStatement.builder("DELETE FROM " + TABLE_NAME + " WHERE id = ?") + .addPositionalValues(entityId) + .setKeyspace(keySpace) + .build(); session.execute(statement); @@ -447,78 +378,70 @@ public List getEntitiesByIds(List entityIds, bool @Override public QueryResult query(EntityQuery entityQuery) throws EntityStoreException { - final Select select = QueryBuilder.select().from(keySpace, TABLE_NAME); - - if(StringUtils.isNotEmpty(entityQuery.getText())) { - - select.where(eq("text", entityQuery.getText())); - - } - - if(StringUtils.isNotEmpty(entityQuery.getType())) { - - select.where(eq("type", entityQuery.getType())); - - } - - if(StringUtils.isNotEmpty(entityQuery.getLanguageCode())) { - - select.where(eq("language", entityQuery.getLanguageCode())); - - } - - if(StringUtils.isNotEmpty(entityQuery.getUri())) { - - select.where(eq("uri", entityQuery.getUri())); - - } - - if(StringUtils.isNotEmpty(entityQuery.getContext())) { - - select.where(eq("context", entityQuery.getContext())); - - } - - if(StringUtils.isNotEmpty(entityQuery.getDocumentId())) { - - select.where(eq("documentid", entityQuery.getDocumentId())); - - } - - if(entityQuery.getConfidenceRange() != null) { - - select.where(gte("confidence", entityQuery.getConfidenceRange().getMinimum())); - select.where(lte("confidence", entityQuery.getConfidenceRange().getMaximum())); - - } - - if(CollectionUtils.isNotEmpty(entityQuery.getEntityMetadataFilters())) { - - for(EntityMetadataFilter entityMetadataFilter : entityQuery.getEntityMetadataFilters()) { - - select.where(containsKey("metadata", entityMetadataFilter.getName())) - .and(contains("metadata", entityMetadataFilter.getValue())); - - } - - } - - /* - * TODO: Ordering of Cassandra query results. - * The entity store query() functions are not called externally. All querying is done against - * Elasticsearch instead. However, it would be nice if all entity stores behaved the same. - * So ordering of Cassandra query results needs implemented at some point. - */ - - // The fetchsize must be sufficient to support this query. - final int fetchSize = entityQuery.getOffset() + entityQuery.getLimit(); - select.setFetchSize(fetchSize); - - // Some queries are not optimal. We won't prevent them but they are discouraged. - // See the table schema to understand the bad queries. - select.allowFiltering(); +// final Select select = QueryBuilder.select().from(keySpace, TABLE_NAME); +// +// if(StringUtils.isNotEmpty(entityQuery.getText())) { +// select.where(eq("text", entityQuery.getText())); +// } +// +// if(StringUtils.isNotEmpty(entityQuery.getType())) { +// select.where(eq("type", entityQuery.getType())); +// } +// +// if(StringUtils.isNotEmpty(entityQuery.getLanguageCode())) { +// select.where(eq("language", entityQuery.getLanguageCode())); +// +// } +// +// if(StringUtils.isNotEmpty(entityQuery.getUri())) { +// select.where(eq("uri", entityQuery.getUri())); +// } +// +// if(StringUtils.isNotEmpty(entityQuery.getContext())) { +// select.where(eq("context", entityQuery.getContext())); +// } +// +// if(StringUtils.isNotEmpty(entityQuery.getDocumentId())) { +// select.where(eq("documentid", entityQuery.getDocumentId())); +// } +// +// if(entityQuery.getConfidenceRange() != null) { +// select.where(gte("confidence", entityQuery.getConfidenceRange().getMinimum())); +// select.where(lte("confidence", entityQuery.getConfidenceRange().getMaximum())); +// } +// +// if(CollectionUtils.isNotEmpty(entityQuery.getEntityMetadataFilters())) { +// +// for(final EntityMetadataFilter entityMetadataFilter : entityQuery.getEntityMetadataFilters()) { +// +// select.where(containsKey("metadata", entityMetadataFilter.getName())) +// .and(contains("metadata", entityMetadataFilter.getValue())); +// +// } +// +// } +// +// /* +// * TODO: Ordering of Cassandra query results. +// * The entity store query() functions are not called externally. All querying is done against +// * Elasticsearch instead. However, it would be nice if all entity stores behaved the same. +// * So ordering of Cassandra query results needs implemented at some point. +// */ +// +// // The fetchsize must be sufficient to support this query. +// final int fetchSize = entityQuery.getOffset() + entityQuery.getLimit(); +// select.setFetchSize(fetchSize); +// +// // Some queries are not optimal. We won't prevent them but they are discouraged. +// // See the table schema to understand the bad queries. +// select.allowFiltering(); + + // TODO: Build this query using the WHERE conditions commented out above. + final SimpleStatement statement = SimpleStatement.builder("SELECT * FROM " + TABLE_NAME) + .setKeyspace(keySpace) + .build(); - final ResultSet resultSet = session.execute(select); + final ResultSet resultSet = session.execute(statement); List cassandraStoredEntities = new LinkedList(); @@ -582,13 +505,14 @@ public QueryResult query(EntityQuery entityQuery) throws EntityStoreException { @Override - public CassandraStoredEntity getEntityById(String id) { + public CassandraStoredEntity getEntityById(String entityId) { - final Select select = QueryBuilder.select().from(keySpace, TABLE_NAME); - - select.where(eq("id", id)); + final SimpleStatement statement = SimpleStatement.builder("SELECT * FROM " + TABLE_NAME + " WHERE id = ?") + .addPositionalValues(entityId) + .setKeyspace(keySpace) + .build(); - final ResultSet resultSet = session.execute(select); + final ResultSet resultSet = session.execute(statement); final List cassandraStoredEntities = new LinkedList(); @@ -624,15 +548,16 @@ public long getEntityCount() throws EntityStoreException { long count = 0; - final PreparedStatement select = session.prepare(select().countAll().from(keySpace, TABLE_NAME)); - final Row row = session.execute(select.bind()).one(); - - if(row != null) { - - count = row.getLong(0); - - } - + // TODO: Update this for the new version. +// final PreparedStatement select = session.prepare(select().countAll().from(keySpace, TABLE_NAME)); +// final Row row = session.execute(select.bind()).one(); +// +// if(row != null) { +// +// count = row.getLong(0); +// +// } +// return count; } catch (Exception ex) { @@ -651,14 +576,15 @@ public long getEntityCount(String context) throws EntityStoreException { long count = 0; - final PreparedStatement statement = session.prepare(select().countAll().from(keySpace, TABLE_NAME).where(eq("context", bindMarker()))); - final Row row = session.execute(statement.bind(context)).one(); - - if(row != null) { - - count = row.getLong(0); - - } + // TODO: Update this for the new version. +// final PreparedStatement statement = session.prepare(select().countAll().from(keySpace, TABLE_NAME).where(eq("context", bindMarker()))); +// final Row row = session.execute(statement.bind(context)).one(); +// +// if(row != null) { +// +// count = row.getLong(0); +// +// } return count; @@ -687,10 +613,11 @@ public void deleteContext(String context) throws EntityStoreException { try { - final Statement statement = QueryBuilder.delete() - .from(keySpace, TABLE_NAME) - .where(eq("context", context)); - + final SimpleStatement statement = SimpleStatement.builder("DELETE FROM " + TABLE_NAME + " WHERE context = ?") + .addPositionalValues(context) + .setKeyspace(keySpace) + .build(); + session.execute(statement); } catch (Exception ex) { @@ -707,10 +634,11 @@ public void deleteDocument(String documentId) throws EntityStoreException { try { - final Statement statement = QueryBuilder.delete() - .from(keySpace, TABLE_NAME) - .where(eq("documentid", documentId)); - + final SimpleStatement statement = SimpleStatement.builder("DELETE FROM " + TABLE_NAME + " WHERE documentid = ?") + .addPositionalValues(documentId) + .setKeyspace(keySpace) + .build(); + session.execute(statement); } catch (Exception ex) { diff --git a/entitydb-entity-store/entitydb-entity-store-cassandra/src/test/java/ai/philterd/test/entitydb/entitystore/cassandra/CassandraEntityStoreTest.java b/entitydb-entity-store/entitydb-entity-store-cassandra/src/test/java/ai/philterd/test/entitydb/entitystore/cassandra/CassandraEntityStoreTest.java index 0866859..dc404a5 100644 --- a/entitydb-entity-store/entitydb-entity-store-cassandra/src/test/java/ai/philterd/test/entitydb/entitystore/cassandra/CassandraEntityStoreTest.java +++ b/entitydb-entity-store/entitydb-entity-store-cassandra/src/test/java/ai/philterd/test/entitydb/entitystore/cassandra/CassandraEntityStoreTest.java @@ -45,9 +45,8 @@ public class CassandraEntityStoreTest extends AbstractEntityStoreTest getEntityStore() { final CqlSession session = embeddedCassandra.getSession(); - final CassandraEntityStore cassandraEntityStore = new CassandraEntityStore(session, KEYSPACE); - - return cassandraEntityStore; + + return new CassandraEntityStore(session, KEYSPACE); }