Skip to content

Commit

Permalink
Issue #2 - The collections are not updated until delivery to Kafka is…
Browse files Browse the repository at this point in the history
… confirmed in the Callback.
  • Loading branch information
BillFarber committed Mar 15, 2020
1 parent 705b0e5 commit 36fa184
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/marklogic/KafkaMarklogicSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ private static void loadConfigurationFromProperties(ApplicationConfig config) {

databaseClient = new DefaultDatabaseClientCreator().createDatabaseClient(config);
databaseQuerier = new DatabaseQuerier(config, databaseClient);
Callback.initializeCallbackClass(databaseClient,
config.getString(ApplicationConfig.QUERY_TARGET_COLLECTION),
config.getString(ApplicationConfig.QUERY_SENT_COLLECTION));
}

}
18 changes: 5 additions & 13 deletions src/main/java/com/marklogic/client/DatabaseQuerier.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
public class DatabaseQuerier {
private static Logger logger = LoggerFactory.getLogger(DatabaseQuerier.class);

private static DatabaseClient databaseClient;
private static QueryManager queryMgr;
private static StringQueryDefinition stringQueryDefinition;
private static String targetCollection;
private static String sentCollection;
private static String query;
private DatabaseClient databaseClient;
private QueryManager queryMgr;
private StringQueryDefinition stringQueryDefinition;
private String query;
private String targetCollection = null;

public DatabaseQuerier(ApplicationConfig config, DatabaseClient databaseClient) {
this.databaseClient = databaseClient;
Expand All @@ -35,9 +34,6 @@ public DatabaseQuerier(ApplicationConfig config, DatabaseClient databaseClient)
targetCollection = config.getString(ApplicationConfig.QUERY_TARGET_COLLECTION);
logger.info("Query Collection: " + targetCollection);
stringQueryDefinition.setCollections(targetCollection);

sentCollection = config.getString(ApplicationConfig.QUERY_SENT_COLLECTION);
logger.info("Sent Collection: " + sentCollection);
}

public SearchHandle search() {
Expand All @@ -63,10 +59,6 @@ public ArrayList<ProducerRecord> getRecordsFromMatches(SearchHandle results, Str
record.headers().add(uriHeader);
record.headers().add(topicHeader);
records.add(record);

metadataHandle.getCollections().remove(targetCollection);
metadataHandle.getCollections().add(sentCollection);
textDocumentManager.writeMetadata(docUri, metadataHandle);
}
return records;
}
Expand Down
32 changes: 29 additions & 3 deletions src/main/java/com/marklogic/kafka/producer/Callback.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.marklogic.kafka.producer;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.UriHeader;
import com.marklogic.client.document.TextDocumentManager;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.query.QueryManager;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
Expand All @@ -11,7 +16,18 @@
public class Callback implements org.apache.kafka.clients.producer.Callback {
private static Logger logger = LoggerFactory.getLogger(Callback.class);

Headers headers;
private static DatabaseClient databaseClient = null;
private static String targetCollection = null;
private static String sentCollection = null;

private Headers headers;

public static void initializeCallbackClass(DatabaseClient incomingDatabaseClient,
String incomingTargetCollection, String incomingSentCollection) {
databaseClient = incomingDatabaseClient;
targetCollection = incomingTargetCollection;
sentCollection = incomingSentCollection;
}

public Callback(Headers headers) {
this.headers = headers;
Expand All @@ -23,9 +39,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.error(String.format("Exception reported for document '%s': %s",
new String(headers.lastHeader("URI").value()), exception));
} else {
QueryManager queryMgr = databaseClient.newQueryManager();
Iterator headersIterator = headers.iterator();
logger.info(String.format("Committed document '%s' to Kafka topic %s",
new String(headers.lastHeader("URI").value()), new String(headers.lastHeader("TOPIC").value())));
String docUri = new String(headers.lastHeader("URI").value());
String topicName = new String(headers.lastHeader("TOPIC").value());
logger.info(String.format("Committed document '%s' to Kafka topic %s", docUri, topicName));
if (logger.isDebugEnabled()) {
while (headersIterator.hasNext()) {
UriHeader uriHeader = (UriHeader) headersIterator.next();
Expand All @@ -34,6 +52,14 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.debug("Value: " + uriHeader.getValue());
}
}
TextDocumentManager textDocumentManager = databaseClient.newTextDocumentManager();
DocumentMetadataHandle metadataHandle = new DocumentMetadataHandle();
StringHandle stringHandle = new StringHandle();
String documentContent = textDocumentManager.read(docUri, metadataHandle, stringHandle).get();
metadataHandle.getCollections().remove(targetCollection);
metadataHandle.getCollections().add(sentCollection);
textDocumentManager.writeMetadata(docUri, metadataHandle);
logger.info(String.format("Collection updated for document '%s', docUri"));
}
}
}

0 comments on commit 36fa184

Please sign in to comment.