Skip to content

Commit

Permalink
wait after create index complete
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 26, 2024
1 parent 3ff2ef2 commit 368b17d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 0.
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ public class FlintOptions implements Serializable {
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000;


public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis";
public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public static final String BATCH_BYTES = "write.batch_bytes";
Expand Down Expand Up @@ -178,6 +181,10 @@ public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}

public int getRequestCompletionDelayMillis() {
return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS)));
}

public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
try {
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
waitRequestComplete(); // Delay to ensure create is complete before making other requests for the index
emitIndexCreationSuccessMetric(metadata.kind());
} catch (IllegalStateException ex) {
emitIndexCreationFailureMetric(metadata.kind());
Expand Down Expand Up @@ -131,6 +132,14 @@ private String sanitizeIndexName(String indexName) {
return OpenSearchClientUtils.sanitizeIndexName(indexName);
}

private void waitRequestComplete() {
try {
Thread.sleep(options.getRequestCompletionDelayMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void emitIndexCreationSuccessMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "success");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))
val REQUEST_COMPLETION_DELAY_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}")
.datasourceOption()
.doc("delay in milliseconds after index creation is completed")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS))
val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
Expand Down Expand Up @@ -342,7 +347,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS,
BATCH_BYTES)
BATCH_BYTES,
REQUEST_COMPLETION_DELAY_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ class FlintSparkConfSuite extends FlintSuite {
}
}

test("test request completionDelayMillis default value") {
FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0
}

test("test specified request completionDelayMillis") {
val options =
FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions()
options.getRequestCompletionDelayMillis shouldBe 1000
}

test("externalSchedulerIntervalThreshold should return default value when empty") {
val options = FlintSparkConf(Map("spark.flint.job.externalScheduler.interval" -> "").asJava)
assert(options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

it should "create index with request completion delay config" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}")
// Create a dummy index to avoid timing the initial overhead
flintClient.createIndex("dummy", metadata)

val indexName = "flint_test_without_request_completion_delay"
val elapsedTimeWithoutDelay = timer {
flintClient.createIndex(indexName, metadata)
}

val delayIndexName = "flint_test_with_request_completion_delay"
val delayOptions =
openSearchOptions + (FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS -> "2000")
val delayFlintOptions = new FlintOptions(delayOptions.asJava)
val delayFlintClient = new FlintOpenSearchClient(delayFlintOptions)
val elapsedTimeWithDelay = timer {
delayFlintClient.createIndex(delayIndexName, metadata)
}
elapsedTimeWithDelay - elapsedTimeWithoutDelay should be >= 1800L // allowing 200ms of wiggle room
}

it should "get all index names with the given index name pattern" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(
"""{"properties": {"test": { "type": "integer" } } }""")
Expand Down Expand Up @@ -220,4 +241,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
def createTable(indexName: String, options: FlintOptions): Table = {
OpenSearchCluster.apply(indexName, options).asScala.head
}

def timer(block: => Unit): Long = {
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
end - start
}
}

0 comments on commit 368b17d

Please sign in to comment.