diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index fa2ca818..232f5dcc 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; +import com.mongodb.*; import org.bson.types.BasicBSONList; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -26,10 +27,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; -import com.mongodb.DBRef; import com.mongodb.gridfs.GridFSDBFile; class Indexer extends MongoDBRiverComponent implements Runnable { @@ -410,13 +407,18 @@ private Map createObjectMap(DBObject dbObj) { /** * Map a DBRef to a Map for indexing * - * @param ref - * @return + * @param ref to convert + * @return Map representation of the DBRef */ private Map convertDbRef(DBRef ref) { - Map obj = new HashMap(); + if(definition.isExpandDbRefs() && ref.getDB() != null) { + DBObject dbObject = ref.getDB().getMongo().getDB(definition.getMongoDb()).getCollection(ref.getCollectionName()).findOne(new BasicDBObject("_id", ref.getId())); + return createObjectMap(dbObject); + } + + Map obj = new HashMap<>(); obj.put("id", ref.getId()); - obj.put("ref", ref.getRef()); + obj.put("ref", ref.getCollectionName()); return obj; } diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java index 75fa217f..64917d7a 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java @@ -101,6 +101,7 @@ public class MongoDBRiverDefinition { public final static String BULK_SIZE_FIELD = "bulk_size"; public final static String BULK_TIMEOUT_FIELD = "bulk_timeout"; public final static String CONCURRENT_BULK_REQUESTS_FIELD = "concurrent_bulk_requests"; + public final static String EXPAND_DB_REFS_FIELD = "expand_db_refs"; public final static String BULK_FIELD = "bulk"; public final static String ACTIONS_FIELD = "actions"; @@ -151,6 +152,7 @@ public class MongoDBRiverDefinition { private final String statisticsTypeName; private final boolean importAllCollections; private final boolean disableIndexRefresh; + private final boolean expandDbRefs; // index private final String indexName; private final String typeName; @@ -202,6 +204,7 @@ public static class Builder { private String statisticsTypeName; private boolean importAllCollections; private boolean disableIndexRefresh; + private boolean expandDbRefs; // index private String indexName; @@ -342,6 +345,11 @@ public Builder disableIndexRefresh(boolean disableIndexRefresh) { return this; } + public Builder expandDbRefs(boolean expandDbRefs){ + this.expandDbRefs = expandDbRefs; + return this; + } + public Builder initialTimestamp(Binary initialTimestamp) { this.initialTimestamp = new Timestamp.GTID(initialTimestamp.getData(), null); return this; @@ -625,6 +633,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName // false)); builder.importAllCollections(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(IMPORT_ALL_COLLECTIONS_FIELD), false)); + builder.expandDbRefs(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(EXPAND_DB_REFS_FIELD), + false)); builder.disableIndexRefresh(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(DISABLE_INDEX_REFRESH_FIELD), false)); builder.includeCollection(XContentMapValues.nodeStringValue(mongoOptionsSettings.get(INCLUDE_COLLECTION_FIELD), "")); @@ -924,6 +934,7 @@ private MongoDBRiverDefinition(final Builder builder) { this.statisticsTypeName = builder.statisticsTypeName; this.importAllCollections = builder.importAllCollections; this.disableIndexRefresh = builder.disableIndexRefresh; + this.expandDbRefs = builder.expandDbRefs; // index this.indexName = builder.indexName; @@ -1078,6 +1089,10 @@ public boolean isDisableIndexRefresh() { return disableIndexRefresh; } + public boolean isExpandDbRefs() { + return expandDbRefs; + } + public String getIndexName() { return indexName; } diff --git a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java index e94c4e66..6791f273 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java +++ b/src/test/java/org/elasticsearch/river/mongodb/RiverMongoDBTestAbstract.java @@ -180,6 +180,7 @@ protected IRuntimeConfig newRuntimeConfig() { public static final String TEST_MONGODB_RIVER_IMPORT_ALL_COLLECTION_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-import-all-collections.json"; public static final String TEST_MONGODB_RIVER_STORE_STATISTICS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-store-statistics.json"; public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document.json"; + public static final String TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json"; protected final ESLogger logger = Loggers.getLogger(getClass().getName()); protected final static long wait = 2000; diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java index 1220d28c..8c8d3b47 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDbRefTest.java @@ -13,10 +13,7 @@ import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Factory; -import org.testng.annotations.Test; +import org.testng.annotations.*; import com.mongodb.DB; import com.mongodb.DBCollection; @@ -26,12 +23,15 @@ import com.mongodb.WriteResult; import com.mongodb.util.JSON; +import java.io.IOException; + public class RiverMongoDbRefTest extends RiverMongoDBTestAbstract { private static final String TEST_DBREF_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document-with-dbref.json"; + private static final String TEST_MONGO_REFERENCED_DOCUMENT = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json"; private DB mongoDB; - private DBCollection mongoCollection; + private DBCollection mongoCollection, referencedCollection; @Factory(dataProvider = "allMongoExecutableTypes") public RiverMongoDbRefTest(ExecutableType type) { @@ -44,10 +44,10 @@ public void createDatabase() { try { mongoDB = getMongo().getDB(getDatabase()); mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); - super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); logger.info("Start createCollection"); this.mongoCollection = mongoDB.createCollection(getCollection(), null); Assert.assertNotNull(mongoCollection); + Assert.assertNotNull(referencedCollection); } catch (Throwable t) { logger.error("createDatabase failed.", t); } @@ -63,20 +63,16 @@ public void cleanUp() { @Test public void simpleBSONObject() throws Throwable { logger.debug("Start simpleBSONObject"); + super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); try { - String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); - DBObject dbObject = (DBObject) JSON.parse(mongoDocument); - - WriteResult result = mongoCollection.insert(dbObject); - Thread.sleep(wait); + DBObject dbObject = setUp(); String id = dbObject.get("_id").toString(); String categoryId = ((DBRef) dbObject.get("category")).getId().toString(); - logger.info("WriteResult: {}", result.toString()); ActionFuture response = getNode().client().admin().indices() .exists(new IndicesExistsRequest(getIndex())); assertThat(response.actionGet().isExists(), equalTo(true)); refreshIndex(); - SearchRequest search = getNode().client().prepareSearch(getIndex()) + SearchRequest search = getNode().client().prepareSearch().setIndices(getIndex()) .setQuery(QueryBuilders.queryString(categoryId).defaultField("category.id")).request(); SearchResponse searchResponse = getNode().client().search(search).actionGet(); assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); @@ -98,7 +94,55 @@ public void simpleBSONObject() throws Throwable { logger.error("simpleBSONObject failed.", t); t.printStackTrace(); throw t; + } finally { + tearDown(); } } + @Test + public void nestedDbRef() throws Throwable{ + super.createRiver(TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON); + try { + DBObject dbObject = setUp(); + ActionFuture response = getNode().client().admin().indices() + .exists(new IndicesExistsRequest(getIndex())); + assertThat(response.actionGet().isExists(), equalTo(true)); + refreshIndex(); + + SearchRequest search = getNode().client().prepareSearch(getIndex()).setQuery(new QueryStringQueryBuilder("arbitrary").defaultField("category.name")).addField("category.name") + .request(); + SearchResponse searchResponse = getNode().client().search(search).actionGet(); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l)); + Assert.assertEquals("arbitrary category for a thousand", searchResponse.getHits().getAt(0).field("category.name").getValue().toString()); + + } catch (Throwable t) { + logger.error("nestedDbRef failed.", t); + t.printStackTrace(); + throw t; + } finally{ + tearDown(); + } + } + + DBObject setUp() throws IOException, InterruptedException { + this.referencedCollection = mongoDB.createCollection("category", null); + String referencedDocument = copyToStringFromClasspath(TEST_MONGO_REFERENCED_DOCUMENT); + WriteResult result = referencedCollection.insert((DBObject) JSON.parse(referencedDocument)); + Thread.sleep(wait); + logger.info("Referenced WriteResult: {}", result.toString()); + + String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON); + DBObject dbObject = (DBObject) JSON.parse(mongoDocument); + result = mongoCollection.insert(dbObject); + Thread.sleep(wait); + logger.info("WriteResult: {}", result.toString()); + + return dbObject; + } + + void tearDown(){ + mongoCollection.drop(); + referencedCollection.drop(); + super.deleteRiver(); + } } diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json new file mode 100644 index 00000000..094b8df9 --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json @@ -0,0 +1,5 @@ +{ + "_id": { "$oid" : "5194272CFDEA65E5D6000021" }, + "name": "arbitrary category for a thousand", + "postDate": { "$date": 1388853809000.000000 } +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json new file mode 100644 index 00000000..7e66044c --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json @@ -0,0 +1,27 @@ +{ + "type": "mongodb", + "mongodb": { + "servers": [{ + "host": "localhost", + "port": %s + }, + { + "host": "localhost", + "port": %s + }, + { + "host": "localhost", + "port": %s + }], + "db": "%s", + "collection": "%s", + "gridfs": false, + "options": { + "expand_db_refs": true + } + }, + "index": { + "name": "%s", + "throttle_size": 2000 + } +} \ No newline at end of file