Skip to content

Commit

Permalink
Update javadoc and more logging
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 30, 2023
1 parent 379b78f commit 2f26640
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@
import java.util.Optional;

/**
* Flint metadata log.
* Flint metadata log that provides transactional support on write API based on different storage.
*/
public interface FlintMetadataLog<T> {

/**
* Add a new log entry to the metadata log.
*
* @param logEntry log entry
* @return log entry after add
*/
T add(T logEntry);

/**
* Get the latest log entry in the metadata log.
*
* @return latest log entry
*/
Optional<T> getLatest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.opensearch.flint.core.FlintClient;

/**
* Flint metadata log in OpenSearch store.
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
* of metadata log.
*/
public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadataLogEntry> {

Expand All @@ -46,8 +47,8 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadat

public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String metadataLogIndexName) {
this.flintClient = flintClient;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
this.metadataLogIndexName = metadataLogIndexName;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
}

@Override
Expand All @@ -64,17 +65,22 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {

@Override
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT);

if (response.isExists()) {
return Optional.of(
new FlintMetadataLogEntry(
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getSourceAsMap()));
FlintMetadataLogEntry latest = new FlintMetadataLogEntry(
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getSourceAsMap());

LOG.info("Found latest log entry " + latest);
return Optional.of(latest);
} else {
LOG.info("Latest log entry not found");
return Optional.empty();
}
} catch (Exception e) {
Expand All @@ -97,9 +103,12 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
RequestOptions.DEFAULT);

// Update seqNo and primaryTerm in log entry object
return logEntry.copy(
logEntry = logEntry.copy(
logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(),
logEntry.state(), logEntry.dataSource(), logEntry.error());

LOG.info("Create log entry " + logEntry);
return logEntry;
} catch (OpenSearchException | IOException e) {
throw new IllegalStateException("Failed to create initial log entry", e);
}
Expand All @@ -118,9 +127,12 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
RequestOptions.DEFAULT);

// Update seqNo and primaryTerm in log entry object
return logEntry.copy(
logEntry = logEntry.copy(
logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(),
logEntry.state(), logEntry.dataSource(), logEntry.error());

LOG.info("Log entry updated " + logEntry);
return logEntry;
} catch (OpenSearchException | IOException e) {
throw new IllegalStateException("Failed to update log entry: " + logEntry, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

/**
* Optimistic transaction implementation by OpenSearch OCC.
* For now use single doc instead of maintaining history of metadata log.
* Default optimistic transaction implementation that captures the basic workflow for
* transaction support by optimistic locking.
*
* @param <T> result type
*/
Expand Down

0 comments on commit 2f26640

Please sign in to comment.