Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.6] Wait after create index complete #972

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0.
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public class FlintOptions implements Serializable {
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000;


public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis";
public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0;
public static final int DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS = 2000;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public static final String BATCH_BYTES = "write.batch_bytes";
Expand Down Expand Up @@ -178,6 +182,13 @@ public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}

public int getRequestCompletionDelayMillis() {
int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName())
? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS
: DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS;
return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(defaultValue)));
}

public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
try {
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
waitRequestComplete(); // Delay to ensure create is complete before making other requests for the index
emitIndexCreationSuccessMetric(metadata.kind());
} catch (IllegalStateException ex) {
emitIndexCreationFailureMetric(metadata.kind());
Expand Down Expand Up @@ -131,6 +132,14 @@ private String sanitizeIndexName(String indexName) {
return OpenSearchClientUtils.sanitizeIndexName(indexName);
}

private void waitRequestComplete() {
try {
Thread.sleep(options.getRequestCompletionDelayMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void emitIndexCreationSuccessMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "success");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))
val REQUEST_COMPLETION_DELAY_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}")
.datasourceOption()
.doc("delay in milliseconds after index creation is completed")
.createOptional()
val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
Expand Down Expand Up @@ -356,7 +361,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
EXCLUDE_JOB_IDS,
SCROLL_SIZE)
SCROLL_SIZE,
REQUEST_COMPLETION_DELAY_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ class FlintSparkConfSuite extends FlintSuite {
}
}

test("test request completionDelayMillis default value") {
FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0
}

test("test request completionDelayMillis default value for aoss") {
val options = FlintSparkConf(Map("auth.servicename" -> "aoss").asJava).flintOptions()
options.getRequestCompletionDelayMillis shouldBe 2000
}

test("test specified request completionDelayMillis") {
val options =
FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions()
options.getRequestCompletionDelayMillis shouldBe 1000
}

test("externalSchedulerIntervalThreshold should return default value when empty") {
val options = FlintSparkConf(Map("spark.flint.job.externalScheduler.interval" -> "").asJava)
assert(options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

it should "create index with request completion delay config" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}")
// Create a dummy index to avoid timing the initial overhead
flintClient.createIndex("dummy", metadata)

val indexName = "flint_test_without_request_completion_delay"
val elapsedTimeWithoutDelay = timer {
flintClient.createIndex(indexName, metadata)
}

val delayIndexName = "flint_test_with_request_completion_delay"
val delayOptions =
openSearchOptions + (FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS -> "2000")
val delayFlintOptions = new FlintOptions(delayOptions.asJava)
val delayFlintClient = new FlintOpenSearchClient(delayFlintOptions)
val elapsedTimeWithDelay = timer {
delayFlintClient.createIndex(delayIndexName, metadata)
}
elapsedTimeWithDelay - elapsedTimeWithoutDelay should be >= 1800L // allowing 200ms of wiggle room
}

it should "get all index names with the given index name pattern" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(
"""{"properties": {"test": { "type": "integer" } } }""")
Expand Down Expand Up @@ -220,4 +241,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
def createTable(indexName: String, options: FlintOptions): Table = {
OpenSearchCluster.apply(indexName, options).asScala.head
}

def timer(block: => Unit): Long = {
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
end - start
}
}
Loading