Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance index monitor to terminate streaming job on consecutive errors #346

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented May 17, 2024

Description

This PR introduces enhancements to the FlintSparkIndexMonitor to improve the robustness and observability of index monitoring tasks. By tracking consecutive errors, this update helps in better managing the streaming tasks and resource utilization.

  1. Add new Spark conf for monitor initial delay, interval and max error count
  2. Add FlintSparkIndexMonitorTask with counter tracking number of consecutive errors
  3. Add streaming job and monitor task terminating logic once max error count reached

Testing with EMR and AOS:

scala> sc.setLogLevel("INFO")

# Start index refresh streaming job
sql("CREATE SKIPPING INDEX ON myglue.ds_tables.http_logs_perf_tiny (status VALUE_SET) WITH (auto_refresh = true)")

24/05/17 21:28:13 INFO FlintSparkIndexMonitor: Starting index monitor for flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index with configuration:
 - Initial delay: 15 seconds
 - Interval: 60 seconds
 - Max error count: 5

# Set read only on AOS
PUT _cluster/settings
{
   "persistent":{
      "cluster.blocks.read_only": true
   }
}

# Monitor start failing
24/05/17 21:28:24 ERROR FlintSparkIndexMonitor: Failed to update index log entry, consecutive errors: 1

24/05/17 21:29:24 ERROR FlintSparkIndexMonitor: Failed to update index log entry, consecutive errors: 2
...

# Remove read only on AOS
PUT _cluster/settings
{
   "persistent":{
      "cluster.blocks.read_only": false
   }
}

# Monitor can update successfully
24/05/17 21:30:24 INFO FlintSparkIndexMonitor: Scheduler trigger index monitor task for flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index
24/05/17 21:30:24 INFO FlintSparkIndexMonitor: Streaming job is still active

# Set read only on AOS again
PUT _cluster/settings
{
   "persistent":{
      "cluster.blocks.read_only": true
   }
}

# Counter is reset and restarts from 1
24/05/17 21:31:25 ERROR FlintSparkIndexMonitor: Failed to update index log entry, consecutive errors: 1

24/05/17 21:32:25 ERROR FlintSparkIndexMonitor: Failed to update index log entry, consecutive errors: 2

...

# Monitor stops streaming job and itself once max error count reached
24/05/17 21:35:26 ERROR FlintSparkIndexMonitor: Failed to update index log entry, consecutive errors: 5
java.lang.IllegalStateException: Failed to commit transaction operation
	at org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction.commit(DefaultOptimisticTransaction.java:125) ~[flint-spark-integration-assembly-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
	at org.opensearch.flint.spark.FlintSparkIndexMonitor$FlintSparkIndexMonitorTask.run(FlintSparkIndexMonitor.scala:106) ~[flint-spark-integration-assembly-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]
24/05/17 21:35:26 INFO FlintSparkIndexMonitor: Terminating streaming job and index monitor for flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index
24/05/17 21:35:26 INFO DAGScheduler: Asked to cancel job group 02c0af66-b3a9-4611-8fc2-9259206f83a4
24/05/17 21:35:26 ERROR MicroBatchExecution: Query flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index [id = ba336769-f33d-4da8-8ba2-46630cbdcd92, runId = 02c0af66-b3a9-4611-8fc2-9259206f83a4] terminated with error ...
24/05/17 21:35:26 INFO DAGScheduler: Asked to cancel job group 02c0af66-b3a9-4611-8fc2-9259206f83a4
24/05/17 21:35:26 INFO MicroBatchExecution: Query flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index [id = ba336769-f33d-4da8-8ba2-46630cbdcd92, runId = 02c0af66-b3a9-4611-8fc2-9259206f83a4] was stopped
24/05/17 21:35:26 INFO FlintSparkIndexMonitor: Cancelling scheduled task for index flint_myglue_ds_tables_http_logs_perf_tiny_skipping_index
24/05/17 21:35:26 INFO FlintSparkIndexMonitor: Streaming job and index monitor terminated

Issues Resolved

#344

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@dai-chen dai-chen added enhancement New feature or request 0.5 backport 0.4 labels May 17, 2024
@dai-chen dai-chen self-assigned this May 17, 2024
@dai-chen dai-chen marked this pull request as ready for review May 17, 2024 20:55
@dai-chen dai-chen merged commit 9de4f28 into opensearch-project:main May 17, 2024
4 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request May 17, 2024
#346)

* Add error counter and terminate logic in index monitor

Signed-off-by: Chen Dai <[email protected]>

* Add new Spark conf for max error count and interval

Signed-off-by: Chen Dai <[email protected]>

* Add new Spark conf for initial delay too

Signed-off-by: Chen Dai <[email protected]>

* Update user manual

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit 9de4f28)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
dai-chen pushed a commit that referenced this pull request May 18, 2024
#346) (#347)

* Add error counter and terminate logic in index monitor



* Add new Spark conf for max error count and interval



* Add new Spark conf for initial delay too



* Update user manual



---------


(cherry picked from commit 9de4f28)

Signed-off-by: Chen Dai <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@dai-chen dai-chen deleted the terminate-streaming-job-in-index-monitor branch May 20, 2024 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Flint index refresh job continues despite critical dependency unavailable
2 participants