From 15f7923d656f6cc9d160dbeb8ee08779490dee32 Mon Sep 17 00:00:00 2001 From: Eduardo McLean Date: Thu, 23 Apr 2015 16:30:57 -0400 Subject: [PATCH 1/4] expand mongo dbrefs when possible --- .../java/org/elasticsearch/river/mongodb/Indexer.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index fa2ca818..a3f850c6 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -414,7 +414,14 @@ private Map createObjectMap(DBObject dbObj) { * @return */ private Map convertDbRef(DBRef ref) { - Map obj = new HashMap(); + DBObject dbObject = ref.fetch(); + if(dbObject == null){ + dbObject = ref.getDB().getMongo().getDB(definition.getMongoDb()).getCollection(ref.getRef()).findOne(new BasicDBObject("_id", ref.getId())); + } + if(dbObject != null) { + return createObjectMap(dbObject); + } + Map obj = new HashMap<>(); obj.put("id", ref.getId()); obj.put("ref", ref.getRef()); From 47406c7e3d45549fa7d21b986bd65f93f3614138 Mon Sep 17 00:00:00 2001 From: Eduardo McLean Date: Thu, 23 Apr 2015 20:08:03 -0400 Subject: [PATCH 2/4] Adding tests for dbref expansion --- .../mongodb/simple/RiverMongoDbRefTest.java | 47 ++++++++++++++++++- ...st-simple-mongodb-referenced-document.json | 5 ++ 2 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java index 1220d28c..04c96563 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java @@ -29,9 +29,10 @@ public class RiverMongoDbRefTest extends RiverMongoDBTestAbstract { private static final String TEST_DBREF_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document-with-dbref.json"; + private static final String TEST_MONGO_REFERENCED_DOCUMENT = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json"; private DB mongoDB; - private DBCollection mongoCollection; + private DBCollection mongoCollection, referencedCollection; @Factory(dataProvider = "allMongoExecutableTypes") public RiverMongoDbRefTest(ExecutableType type) { @@ -48,6 +49,7 @@ public void createDatabase() { logger.info("Start createCollection"); this.mongoCollection = mongoDB.createCollection(getCollection(), null); Assert.assertNotNull(mongoCollection); + Assert.assertNotNull(referencedCollection); } catch (Throwable t) { logger.error("createDatabase failed.", t); } @@ -76,7 +78,7 @@ public void simpleBSONObject() throws Throwable { .exists(new IndicesExistsRequest(getIndex())); assertThat(response.actionGet().isExists(), equalTo(true)); refreshIndex(); - SearchRequest search = getNode().client().prepareSearch(getIndex()) + SearchRequest search = getNode().client().prepareSearch().setIndices(getIndex()) .setQuery(QueryBuilders.queryString(categoryId).defaultField("category.id")).request(); SearchResponse searchResponse = getNode().client().search(search).actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); @@ -98,7 +100,48 @@ public void simpleBSONObject() throws Throwable { logger.error("simpleBSONObject failed.", t); t.printStackTrace(); throw t; + } finally { + reset(); } } + @Test + public void nestedDbRef() throws Throwable{ + try { + this.referencedCollection = mongoDB.createCollection("category", null); + String referencedDocument = copyToStringFromClasspath(TEST_MONGO_REFERENCED_DOCUMENT); + WriteResult result = referencedCollection.insert((DBObject) JSON.parse(referencedDocument)); + Thread.sleep(wait); + logger.info("Referenced WriteResult: {}", result.toString()); + + String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); + DBObject dbObject = (DBObject) JSON.parse(mongoDocument); + result = mongoCollection.insert(dbObject); + Thread.sleep(wait); + logger.info("WriteResult: {}", result.toString()); + + ActionFuture response = getNode().client().admin().indices() + .exists(new IndicesExistsRequest(getIndex())); + assertThat(response.actionGet().isExists(), equalTo(true)); + refreshIndex(); + + SearchRequest search = getNode().client().prepareSearch(getIndex()).setQuery(new QueryStringQueryBuilder("arbitrary").defaultField("category.name")).addField("category.name") + .request(); + SearchResponse searchResponse = getNode().client().search(search).actionGet(); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); + Assert.assertEquals("arbitrary category for a thousand", searchResponse.getHits().getAt(0).field("category.name").getValue().toString()); + + } catch (Throwable t) { + logger.error("nestedDbRef failed.", t); + t.printStackTrace(); + throw t; + } finally{ + reset(); + } + } + + void reset(){ + mongoCollection.drop(); + referencedCollection.drop(); + } } diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json new file mode 100644 index 00000000..094b8df9 --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json @@ -0,0 +1,5 @@ +{ + "_id": { "$oid" : "5194272CFDEA65E5D6000021" }, + "name": "arbitrary category for a thousand", + "postDate": { "$date": 1388853809000.000000 } +} \ No newline at end of file From 442887d25f50b5db4607dd4d3d11d5fb741b4615 Mon Sep 17 00:00:00 2001 From: Eduardo McLean Date: Mon, 27 Apr 2015 09:36:17 -0400 Subject: [PATCH 3/4] replacing deprecated methods --- .../elasticsearch/river/mongodb/Indexer.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index a3f850c6..42fac003 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; +import com.mongodb.*; import org.bson.types.BasicBSONList; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -26,10 +27,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; -import com.mongodb.DBRef; import com.mongodb.gridfs.GridFSDBFile; class Indexer extends MongoDBRiverComponent implements Runnable { @@ -414,16 +411,17 @@ private Map createObjectMap(DBObject dbObj) { * @return */ private Map convertDbRef(DBRef ref) { - DBObject dbObject = ref.fetch(); - if(dbObject == null){ - dbObject = ref.getDB().getMongo().getDB(definition.getMongoDb()).getCollection(ref.getRef()).findOne(new BasicDBObject("_id", ref.getId())); - } - if(dbObject != null) { - return createObjectMap(dbObject); + DB refDb = ref.getDB(); + if(refDb != null) { + DBObject dbObject = refDb.getMongo().getDB(definition.getMongoDb()).getCollection(ref.getCollectionName()).findOne(new BasicDBObject("_id", ref.getId())); + if(dbObject != null) { + return createObjectMap(dbObject); + } } + Map obj = new HashMap<>(); obj.put("id", ref.getId()); - obj.put("ref", ref.getRef()); + obj.put("ref", ref.getCollectionName()); return obj; } From bf7c08377cb799475304d82eebfbd0182d6037b5 Mon Sep 17 00:00:00 2001 From: Eduardo McLean Date: Mon, 27 Apr 2015 11:35:15 -0400 Subject: [PATCH 4/4] Adding config for enabling/disabling dbref expansion --- .../elasticsearch/river/mongodb/Indexer.java | 13 ++--- .../river/mongodb/MongoDBRiverDefinition.java | 15 ++++++ .../mongodb/RiverMongoDBTestAbstract.java | 1 + .../mongodb/simple/RiverMongoDbRefTest.java | 53 ++++++++++--------- ...t-simple-mongodb-river-expand-db-refs.json | 27 ++++++++++ 5 files changed, 75 insertions(+), 34 deletions(-) create mode 100644 src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index 42fac003..232f5dcc 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -407,16 +407,13 @@ private Map createObjectMap(DBObject dbObj) { /** * Map a DBRef to a Map for indexing * - * @param ref - * @return + * @param ref to convert + * @return Map representation of the DBRef */ private Map convertDbRef(DBRef ref) { - DB refDb = ref.getDB(); - if(refDb != null) { - DBObject dbObject = refDb.getMongo().getDB(definition.getMongoDb()).getCollection(ref.getCollectionName()).findOne(new BasicDBObject("_id", ref.getId())); - if(dbObject != null) { - return createObjectMap(dbObject); - } + if(definition.isExpandDbRefs() && ref.getDB() != null) { + DBObject dbObject = ref.getDB().getMongo().getDB(definition.getMongoDb()).getCollection(ref.getCollectionName()).findOne(new BasicDBObject("_id", ref.getId())); + return createObjectMap(dbObject); } Map obj = new HashMap<>(); diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java index 75fa217f..64917d7a 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java @@ -101,6 +101,7 @@ public class MongoDBRiverDefinition { public final static String BULK_SIZE_FIELD = "bulk_size"; public final static String BULK_TIMEOUT_FIELD = "bulk_timeout"; public final static String CONCURRENT_BULK_REQUESTS_FIELD = "concurrent_bulk_requests"; + public final static String EXPAND_DB_REFS_FIELD = "expand_db_refs"; public final static String BULK_FIELD = "bulk"; public final static String ACTIONS_FIELD = "actions"; @@ -151,6 +152,7 @@ public class MongoDBRiverDefinition { private final String statisticsTypeName; private final boolean importAllCollections; private final boolean disableIndexRefresh; + private final boolean expandDbRefs; // index private final String indexName; private final String typeName; @@ -202,6 +204,7 @@ public static class Builder { private String statisticsTypeName; private boolean importAllCollections; private boolean disableIndexRefresh; + private boolean expandDbRefs; // index private String indexName; @@ -342,6 +345,11 @@ public Builder disableIndexRefresh(boolean disableIndexRefresh) { return this; } + public Builder expandDbRefs(boolean expandDbRefs){ + this.expandDbRefs = expandDbRefs; + return this; + } + public Builder initialTimestamp(Binary initialTimestamp) { this.initialTimestamp = new Timestamp.GTID(initialTimestamp.getData(), null); return this; @@ -625,6 +633,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName // false)); builder.importAllCollections(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(IMPORT_ALL_COLLECTIONS_FIELD), false)); + builder.expandDbRefs(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(EXPAND_DB_REFS_FIELD), + false)); builder.disableIndexRefresh(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(DISABLE_INDEX_REFRESH_FIELD), false)); builder.includeCollection(XContentMapValues.nodeStringValue(mongoOptionsSettings.get(INCLUDE_COLLECTION_FIELD), "")); @@ -924,6 +934,7 @@ private MongoDBRiverDefinition(final Builder builder) { this.statisticsTypeName = builder.statisticsTypeName; this.importAllCollections = builder.importAllCollections; this.disableIndexRefresh = builder.disableIndexRefresh; + this.expandDbRefs = builder.expandDbRefs; // index this.indexName = builder.indexName; @@ -1078,6 +1089,10 @@ public boolean isDisableIndexRefresh() { return disableIndexRefresh; } + public boolean isExpandDbRefs() { + return expandDbRefs; + } + public String getIndexName() { return indexName; } diff --git a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java index e94c4e66..6791f273 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java +++ b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java @@ -180,6 +180,7 @@ protected IRuntimeConfig newRuntimeConfig() { public static final String TEST_MONGODB_RIVER_IMPORT_ALL_COLLECTION_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-import-all-collections.json"; public static final String TEST_MONGODB_RIVER_STORE_STATISTICS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-store-statistics.json"; public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document.json"; + public static final String TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json"; protected final ESLogger logger = Loggers.getLogger(getClass().getName()); protected final static long wait = 2000; diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java index 04c96563..8c8d3b47 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java @@ -13,10 +13,7 @@ import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Factory; -import org.testng.annotations.Test; +import org.testng.annotations.*; import com.mongodb.DB; import com.mongodb.DBCollection; @@ -26,6 +23,8 @@ import com.mongodb.WriteResult; import com.mongodb.util.JSON; +import java.io.IOException; + public class RiverMongoDbRefTest extends RiverMongoDBTestAbstract { private static final String TEST_DBREF_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document-with-dbref.json"; @@ -45,7 +44,6 @@ public void createDatabase() { try { mongoDB = getMongo().getDB(getDatabase()); mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); - super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); logger.info("Start createCollection"); this.mongoCollection = mongoDB.createCollection(getCollection(), null); Assert.assertNotNull(mongoCollection); @@ -65,15 +63,11 @@ public void cleanUp() { @Test public void simpleBSONObject() throws Throwable { logger.debug("Start simpleBSONObject"); + super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); try { - String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); - DBObject dbObject = (DBObject) JSON.parse(mongoDocument); - - WriteResult result = mongoCollection.insert(dbObject); - Thread.sleep(wait); + DBObject dbObject = setUp(); String id = dbObject.get("_id").toString(); String categoryId = ((DBRef) dbObject.get("category")).getId().toString(); - logger.info("WriteResult: {}", result.toString()); ActionFuture response = getNode().client().admin().indices() .exists(new IndicesExistsRequest(getIndex())); assertThat(response.actionGet().isExists(), equalTo(true)); @@ -101,25 +95,15 @@ public void simpleBSONObject() throws Throwable { t.printStackTrace(); throw t; } finally { - reset(); + tearDown(); } } @Test public void nestedDbRef() throws Throwable{ + super.createRiver(TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON); try { - this.referencedCollection = mongoDB.createCollection("category", null); - String referencedDocument = copyToStringFromClasspath(TEST_MONGO_REFERENCED_DOCUMENT); - WriteResult result = referencedCollection.insert((DBObject) JSON.parse(referencedDocument)); - Thread.sleep(wait); - logger.info("Referenced WriteResult: {}", result.toString()); - - String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); - DBObject dbObject = (DBObject) JSON.parse(mongoDocument); - result = mongoCollection.insert(dbObject); - Thread.sleep(wait); - logger.info("WriteResult: {}", result.toString()); - + DBObject dbObject = setUp(); ActionFuture response = getNode().client().admin().indices() .exists(new IndicesExistsRequest(getIndex())); assertThat(response.actionGet().isExists(), equalTo(true)); @@ -136,12 +120,29 @@ public void nestedDbRef() throws Throwable{ t.printStackTrace(); throw t; } finally{ - reset(); + tearDown(); } } - void reset(){ + DBObject setUp() throws IOException, InterruptedException { + this.referencedCollection = mongoDB.createCollection("category", null); + String referencedDocument = copyToStringFromClasspath(TEST_MONGO_REFERENCED_DOCUMENT); + WriteResult result = referencedCollection.insert((DBObject) JSON.parse(referencedDocument)); + Thread.sleep(wait); + logger.info("Referenced WriteResult: {}", result.toString()); + + String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); + DBObject dbObject = (DBObject) JSON.parse(mongoDocument); + result = mongoCollection.insert(dbObject); + Thread.sleep(wait); + logger.info("WriteResult: {}", result.toString()); + + return dbObject; + } + + void tearDown(){ mongoCollection.drop(); referencedCollection.drop(); + super.deleteRiver(); } } diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json new file mode 100644 index 00000000..7e66044c --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json @@ -0,0 +1,27 @@ +{ + "type": "mongodb", + "mongodb": { + "servers": [{ + "host": "localhost", + "port": %s + }, + { + "host": "localhost", + "port": %s + }, + { + "host": "localhost", + "port": %s + }], + "db": "%s", + "collection": "%s", + "gridfs": false, + "options": { + "expand_db_refs": true + } + }, + "index": { + "name": "%s", + "throttle_size": 2000 + } +} \ No newline at end of file