Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mongo dbref expansion #525

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/main/java/org/elasticsearch/river/mongodb/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;

import com.mongodb.*;
import org.bson.types.BasicBSONList;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
Expand All @@ -26,10 +27,6 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.DBRef;
import com.mongodb.gridfs.GridFSDBFile;

class Indexer extends MongoDBRiverComponent implements Runnable {
Expand Down Expand Up @@ -410,13 +407,18 @@ private Map<String, Object> createObjectMap(DBObject dbObj) {
/**
* Map a DBRef to a Map for indexing
*
* @param ref
* @return
* @param ref to convert
* @return Map representation of the DBRef
*/
private Map<String, Object> convertDbRef(DBRef ref) {
Map<String, Object> obj = new HashMap<String, Object>();
if(definition.isExpandDbRefs() && ref.getDB() != null) {
DBObject dbObject = ref.getDB().getMongo().getDB(definition.getMongoDb()).getCollection(ref.getCollectionName()).findOne(new BasicDBObject("_id", ref.getId()));
return createObjectMap(dbObject);
}

Map<String, Object> obj = new HashMap<>();
obj.put("id", ref.getId());
obj.put("ref", ref.getRef());
obj.put("ref", ref.getCollectionName());

return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class MongoDBRiverDefinition {
public final static String BULK_SIZE_FIELD = "bulk_size";
public final static String BULK_TIMEOUT_FIELD = "bulk_timeout";
public final static String CONCURRENT_BULK_REQUESTS_FIELD = "concurrent_bulk_requests";
public final static String EXPAND_DB_REFS_FIELD = "expand_db_refs";

public final static String BULK_FIELD = "bulk";
public final static String ACTIONS_FIELD = "actions";
Expand Down Expand Up @@ -151,6 +152,7 @@ public class MongoDBRiverDefinition {
private final String statisticsTypeName;
private final boolean importAllCollections;
private final boolean disableIndexRefresh;
private final boolean expandDbRefs;
// index
private final String indexName;
private final String typeName;
Expand Down Expand Up @@ -202,6 +204,7 @@ public static class Builder {
private String statisticsTypeName;
private boolean importAllCollections;
private boolean disableIndexRefresh;
private boolean expandDbRefs;

// index
private String indexName;
Expand Down Expand Up @@ -342,6 +345,11 @@ public Builder disableIndexRefresh(boolean disableIndexRefresh) {
return this;
}

public Builder expandDbRefs(boolean expandDbRefs){
this.expandDbRefs = expandDbRefs;
return this;
}

public Builder initialTimestamp(Binary initialTimestamp) {
this.initialTimestamp = new Timestamp.GTID(initialTimestamp.getData(), null);
return this;
Expand Down Expand Up @@ -625,6 +633,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(String riverName
// false));
builder.importAllCollections(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(IMPORT_ALL_COLLECTIONS_FIELD),
false));
builder.expandDbRefs(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(EXPAND_DB_REFS_FIELD),
false));
builder.disableIndexRefresh(XContentMapValues.nodeBooleanValue(mongoOptionsSettings.get(DISABLE_INDEX_REFRESH_FIELD), false));
builder.includeCollection(XContentMapValues.nodeStringValue(mongoOptionsSettings.get(INCLUDE_COLLECTION_FIELD), ""));

Expand Down Expand Up @@ -924,6 +934,7 @@ private MongoDBRiverDefinition(final Builder builder) {
this.statisticsTypeName = builder.statisticsTypeName;
this.importAllCollections = builder.importAllCollections;
this.disableIndexRefresh = builder.disableIndexRefresh;
this.expandDbRefs = builder.expandDbRefs;

// index
this.indexName = builder.indexName;
Expand Down Expand Up @@ -1078,6 +1089,10 @@ public boolean isDisableIndexRefresh() {
return disableIndexRefresh;
}

public boolean isExpandDbRefs() {
return expandDbRefs;
}

public String getIndexName() {
return indexName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ protected IRuntimeConfig newRuntimeConfig() {
public static final String TEST_MONGODB_RIVER_IMPORT_ALL_COLLECTION_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-import-all-collections.json";
public static final String TEST_MONGODB_RIVER_STORE_STATISTICS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-store-statistics.json";
public static final String TEST_SIMPLE_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document.json";
public static final String TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-river-expand-db-refs.json";

protected final ESLogger logger = Loggers.getLogger(getClass().getName());
protected final static long wait = 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import org.testng.annotations.*;

import com.mongodb.DB;
import com.mongodb.DBCollection;
Expand All @@ -26,12 +23,15 @@
import com.mongodb.WriteResult;
import com.mongodb.util.JSON;

import java.io.IOException;

public class RiverMongoDbRefTest extends RiverMongoDBTestAbstract {

private static final String TEST_DBREF_MONGODB_DOCUMENT_JSON = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-document-with-dbref.json";
private static final String TEST_MONGO_REFERENCED_DOCUMENT = "/org/elasticsearch/river/mongodb/simple/test-simple-mongodb-referenced-document.json";

private DB mongoDB;
private DBCollection mongoCollection;
private DBCollection mongoCollection, referencedCollection;

@Factory(dataProvider = "allMongoExecutableTypes")
public RiverMongoDbRefTest(ExecutableType type) {
Expand All @@ -44,10 +44,10 @@ public void createDatabase() {
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON);
logger.info("Start createCollection");
this.mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
Assert.assertNotNull(referencedCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
Expand All @@ -63,20 +63,16 @@ public void cleanUp() {
@Test
public void simpleBSONObject() throws Throwable {
logger.debug("Start simpleBSONObject");
super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON);
try {
String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);

WriteResult result = mongoCollection.insert(dbObject);
Thread.sleep(wait);
DBObject dbObject = setUp();
String id = dbObject.get("_id").toString();
String categoryId = ((DBRef) dbObject.get("category")).getId().toString();
logger.info("WriteResult: {}", result.toString());
ActionFuture<IndicesExistsResponse> response = getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()));
assertThat(response.actionGet().isExists(), equalTo(true));
refreshIndex();
SearchRequest search = getNode().client().prepareSearch(getIndex())
SearchRequest search = getNode().client().prepareSearch().setIndices(getIndex())
.setQuery(QueryBuilders.queryString(categoryId).defaultField("category.id")).request();
SearchResponse searchResponse = getNode().client().search(search).actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l));
Expand All @@ -98,7 +94,55 @@ public void simpleBSONObject() throws Throwable {
logger.error("simpleBSONObject failed.", t);
t.printStackTrace();
throw t;
} finally {
tearDown();
}
}

@Test
public void nestedDbRef() throws Throwable{
super.createRiver(TEST_MONGODB_RIVER_EXPAND_DB_REFS_JSON);
try {
DBObject dbObject = setUp();
ActionFuture<IndicesExistsResponse> response = getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()));
assertThat(response.actionGet().isExists(), equalTo(true));
refreshIndex();

SearchRequest search = getNode().client().prepareSearch(getIndex()).setQuery(new QueryStringQueryBuilder("arbitrary").defaultField("category.name")).addField("category.name")
.request();
SearchResponse searchResponse = getNode().client().search(search).actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1l));
Assert.assertEquals("arbitrary category for a thousand", searchResponse.getHits().getAt(0).field("category.name").getValue().toString());

} catch (Throwable t) {
logger.error("nestedDbRef failed.", t);
t.printStackTrace();
throw t;
} finally{
tearDown();
}
}

DBObject setUp() throws IOException, InterruptedException {
this.referencedCollection = mongoDB.createCollection("category", null);
String referencedDocument = copyToStringFromClasspath(TEST_MONGO_REFERENCED_DOCUMENT);
WriteResult result = referencedCollection.insert((DBObject) JSON.parse(referencedDocument));
Thread.sleep(wait);
logger.info("Referenced WriteResult: {}", result.toString());

String mongoDocument = copyToStringFromClasspath(TEST_DBREF_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
result = mongoCollection.insert(dbObject);
Thread.sleep(wait);
logger.info("WriteResult: {}", result.toString());

return dbObject;
}

void tearDown(){
mongoCollection.drop();
referencedCollection.drop();
super.deleteRiver();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"_id": { "$oid" : "5194272CFDEA65E5D6000021" },
"name": "arbitrary category for a thousand",
"postDate": { "$date": 1388853809000.000000 }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"type": "mongodb",
"mongodb": {
"servers": [{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
}],
"db": "%s",
"collection": "%s",
"gridfs": false,
"options": {
"expand_db_refs": true
}
},
"index": {
"name": "%s",
"throttle_size": 2000
}
}