Skip to content

Commit

Permalink
Write metadata cache data to mappings _meta with refresh time update (#…
Browse files Browse the repository at this point in the history
…805)

* [0.5-nexus] Write mock metadata cache data to mappings _meta (#744)

* write mock metadata cache data to mappings _meta

Signed-off-by: Sean Kao <[email protected]>

* Enable write to cache by default

Signed-off-by: Sean Kao <[email protected]>

* bugfix: _meta.latestId missing when create index

Signed-off-by: Sean Kao <[email protected]>

* set and unset config in test suite

Signed-off-by: Sean Kao <[email protected]>

* fix: use member flintSparkConf

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>

* default metadata cache write disabled

Signed-off-by: Sean Kao <[email protected]>

* remove string literal "external" in index builder

Signed-off-by: Sean Kao <[email protected]>

* track refreshInterval and lastRefreshTime

Signed-off-by: Sean Kao <[email protected]>

* add last refresh timestamps to metadata log entry

Signed-off-by: Sean Kao <[email protected]>

* update metadata cache test case: should pass

Signed-off-by: Sean Kao <[email protected]>

* move to spark package; get refresh interval

Signed-off-by: Sean Kao <[email protected]>

* parse refresh interval

Signed-off-by: Sean Kao <[email protected]>

* minor syntax fix on FlintSpark.createIndex

Signed-off-by: Sean Kao <[email protected]>

* strategize cache writer interface

Signed-off-by: Sean Kao <[email protected]>

* update refresh timestamps in FlintSpark

Signed-off-by: Sean Kao <[email protected]>

* add test cases

Signed-off-by: Sean Kao <[email protected]>

* IT test for refresh timestamp update

Signed-off-by: Sean Kao <[email protected]>

* add doc for spark conf

Signed-off-by: Sean Kao <[email protected]>

* change mock table name

Signed-off-by: Sean Kao <[email protected]>

* add IT test at FlintSpark level

Signed-off-by: Sean Kao <[email protected]>

* test with external scheduler

Signed-off-by: Sean Kao <[email protected]>

* refactor refreshIndex method; add test for modes

Signed-off-by: Sean Kao <[email protected]>

* fix typo

Signed-off-by: Sean Kao <[email protected]>

* fix failed test caused by refactoring

Signed-off-by: Sean Kao <[email protected]>

* rename method; add comment

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Oct 29, 2024
1 parent cd730cb commit a07f88f
Show file tree
Hide file tree
Showing 23 changed files with 1,053 additions and 71 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
- `spark.flint.metadataCacheWrite.enabled`: default is false. enable writing metadata to index mappings _meta as read cache for frontend user to access. Do not use in production, this setting will be removed in later version.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
* log entry id
* @param state
* Flint index state
* @param lastRefreshStartTime
* timestamp when last refresh started for manual or external scheduler refresh
* @param lastRefreshCompleteTime
* timestamp when last refresh completed for manual or external scheduler refresh
* @param entryVersion
* entry version fields for consistency control
* @param error
Expand All @@ -28,10 +32,12 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
case class FlintMetadataLogEntry(
id: String,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
* This is currently used as streaming job start time for internal scheduler. In future, this
* should represent the create timestamp of the log entry
*/
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: Map[String, Any],
error: String,
Expand All @@ -40,26 +46,48 @@ case class FlintMetadataLogEntry(
def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties.asScala.toMap)
}

def this(
id: String,
createTime: Long,
lastRefreshStartTime: Long,
lastRefreshCompleteTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
this(
id,
createTime,
lastRefreshStartTime,
lastRefreshCompleteTime,
state,
entryVersion.asScala.toMap,
error,
properties)
}
}

object FlintMetadataLogEntry {

val EMPTY_TIMESTAMP = 0L

/**
* Flint index state enum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
initialLog = initialLog.copy(
initialLog.id(),
initialLog.createTime(),
initialLog.lastRefreshStartTime(),
initialLog.lastRefreshCompleteTime(),
initialLog.state(),
latest.entryVersion(),
initialLog.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessin
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();

json.put("version", "1.0");
json.put("version", "1.1");
json.put("latestId", logEntry.id());
json.put("type", "flintindexstate");
json.put("state", logEntry.state().toString());
json.put("applicationId", applicationId);
json.put("jobId", jobId);
json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString());
json.put("jobStartTime", logEntry.createTime());
json.put("lastRefreshStartTime", logEntry.lastRefreshStartTime());
json.put("lastRefreshCompleteTime", logEntry.lastRefreshCompleteTime());
json.put("lastUpdateTime", lastUpdateTime);
json.put("error", logEntry.error());

Expand Down Expand Up @@ -138,6 +140,8 @@ public static FlintMetadataLogEntry constructLogEntry(
id,
/* sourceMap may use Integer or Long even though it's always long in index mapping */
((Number) sourceMap.get("jobStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshStartTime")).longValue(),
((Number) sourceMap.get("lastRefreshCompleteTime")).longValue(),
FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")),
Map.of("seqNo", seqNo, "primaryTerm", primaryTerm),
(String) sourceMap.get("error"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public void purge() {
public FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
0L,
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.EMPTY_TIMESTAMP(),
FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(),
Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM),
"",
Expand All @@ -146,6 +148,8 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
logEntry.copy(
latestId,
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
logEntry.entryVersion(),
logEntry.error(),
Expand Down Expand Up @@ -184,6 +188,8 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry = new FlintMetadataLogEntry(
logEntry.id(),
logEntry.createTime(),
logEntry.lastRefreshStartTime(),
logEntry.lastRefreshCompleteTime(),
logEntry.state(),
Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()),
logEntry.error(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val sourceMap = JMap.of(
"jobStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890123L.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890123L.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -36,6 +40,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
when(mockLogEntry.id).thenReturn("id")
when(mockLogEntry.state).thenReturn(FlintMetadataLogEntry.IndexState.ACTIVE)
when(mockLogEntry.createTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshStartTime).thenReturn(1234567890123L)
when(mockLogEntry.lastRefreshCompleteTime).thenReturn(1234567890123L)
when(mockLogEntry.error).thenReturn("")
when(mockLogEntry.properties).thenReturn(Map("dataSourceName" -> "testDataSource"))
}
Expand All @@ -45,14 +51,16 @@ class FlintMetadataLogEntryOpenSearchConverterTest
val expectedJsonWithoutLastUpdateTime =
s"""
|{
| "version": "1.0",
| "version": "1.1",
| "latestId": "id",
| "type": "flintindexstate",
| "state": "active",
| "applicationId": "unknown",
| "jobId": "unknown",
| "dataSourceName": "testDataSource",
| "jobStartTime": 1234567890123,
| "lastRefreshStartTime": 1234567890123,
| "lastRefreshCompleteTime": 1234567890123,
| "error": ""
|}
|""".stripMargin
Expand All @@ -67,15 +75,22 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890123L
logEntry.lastRefreshStartTime shouldBe 1234567890123L
logEntry.lastRefreshCompleteTime shouldBe 1234567890123L
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
}

it should "construct log entry with integer jobStartTime value" in {
it should "construct log entry with integer timestamp value" in {
// Use Integer instead of Long for timestamps
val testSourceMap = JMap.of(
"jobStartTime",
1234567890.asInstanceOf[Object], // Integer instead of Long
1234567890.asInstanceOf[Object],
"lastRefreshStartTime",
1234567890.asInstanceOf[Object],
"lastRefreshCompleteTime",
1234567890.asInstanceOf[Object],
"state",
"active".asInstanceOf[Object],
"dataSourceName",
Expand All @@ -87,6 +102,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest
logEntry shouldBe a[FlintMetadataLogEntry]
logEntry.id shouldBe "id"
logEntry.createTime shouldBe 1234567890
logEntry.lastRefreshStartTime shouldBe 1234567890
logEntry.lastRefreshCompleteTime shouldBe 1234567890
logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE
logEntry.error shouldBe ""
logEntry.properties.get("dataSourceName").get shouldBe "testDataSource"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("false")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -309,6 +313,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Loading

0 comments on commit a07f88f

Please sign in to comment.