diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java index 786f4f326..278d078df 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -8,11 +8,22 @@ import java.util.Optional; /** - * Flint metadata log. + * Flint metadata log that provides transactional support on write API based on different storage. */ public interface FlintMetadataLog { + /** + * Add a new log entry to the metadata log. + * + * @param logEntry log entry + * @return log entry after add + */ T add(T logEntry); + /** + * Get the latest log entry in the metadata log. + * + * @return latest log entry + */ Optional getLatest(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java index aec6b6cb4..c6383f41a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java @@ -23,7 +23,8 @@ import org.opensearch.flint.core.FlintClient; /** - * Flint metadata log in OpenSearch store. + * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history + * of metadata log. */ public class FlintOpenSearchMetadataLog implements FlintMetadataLog { @@ -46,8 +47,8 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog getLatest() { + LOG.info("Fetching latest log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { GetResponse response = client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); + if (response.isExists()) { - return Optional.of( - new FlintMetadataLogEntry( - response.getId(), - response.getSeqNo(), - response.getPrimaryTerm(), - response.getSourceAsMap())); + FlintMetadataLogEntry latest = new FlintMetadataLogEntry( + response.getId(), + response.getSeqNo(), + response.getPrimaryTerm(), + response.getSourceAsMap()); + + LOG.info("Found latest log entry " + latest); + return Optional.of(latest); } else { + LOG.info("Latest log entry not found"); return Optional.empty(); } } catch (Exception e) { @@ -97,9 +103,12 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { RequestOptions.DEFAULT); // Update seqNo and primaryTerm in log entry object - return logEntry.copy( + logEntry = logEntry.copy( logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), logEntry.state(), logEntry.dataSource(), logEntry.error()); + + LOG.info("Create log entry " + logEntry); + return logEntry; } catch (OpenSearchException | IOException e) { throw new IllegalStateException("Failed to create initial log entry", e); } @@ -118,9 +127,12 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { RequestOptions.DEFAULT); // Update seqNo and primaryTerm in log entry object - return logEntry.copy( + logEntry = logEntry.copy( logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), logEntry.state(), logEntry.dataSource(), logEntry.error()); + + LOG.info("Log entry updated " + logEntry); + return logEntry; } catch (OpenSearchException | IOException e) { throw new IllegalStateException("Failed to update log entry: " + logEntry, e); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java index 881778987..5ae70fe19 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java @@ -18,8 +18,8 @@ import org.opensearch.flint.core.metadata.log.OptimisticTransaction; /** - * Optimistic transaction implementation by OpenSearch OCC. - * For now use single doc instead of maintaining history of metadata log. + * Default optimistic transaction implementation that captures the basic workflow for + * transaction support by optimistic locking. * * @param result type */