Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Data Prepper processor workers stop running when an error from the routes occurs #4883

Open
dlvenable opened this issue Aug 28, 2024 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@dlvenable
Copy link
Member

Describe the bug

If the routes for a sink fail, such as when the expression is invalid, the process worker running will stop running. This will lead to Data Prepper running without any process workers.

The buffer will fill up and Data Prepper will have effectively been shutdown.

To Reproduce

  1. Create a pipeline with conditional routes
  2. Make one of the routes have an invalid expression
  3. Run Data Prepper
  4. Ingest data
2024-08-26T23:13:58.480 [test-pipeline-processor-worker-5-thread-2] ERROR org.opensearch.dataprepper.pipeline.ProcessWorker - Encountered exception during pipeline test-pipeline processing
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@10ec1e4d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@198669bf[Wrapped task = org.opensearch.dataprepper.pipeline.Pipeline$$Lambda$1477/0x000000080136e230@138d51a1]] rejected from org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor@5af5a6fd[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2018]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[?:?]
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134) ~[?:?]
at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$6(Pipeline.java:347) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.Router.lambda$route$0(Router.java:64) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.DataFlowComponentRouter.route(DataFlowComponentRouter.java:48) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.router.Router.route(Router.java:58) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.Pipeline.publishToSinks(Pipeline.java:346) ~[data-prepper-core.jar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.postToSink(ProcessWorker.java:168) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:150) ~[data-prepper-corejar:?]
at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:68) ~[data-prepper-corejar:?]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.base/java.lang.Thread.run(Thread.java:833) [?:?]

Expected behavior

I expect that Data Prepper will continue to run. One difficulty is what to do with the data. We could drop it or send it incorrectly somewhere.

Ideally, we can use the new _default route if available.
Environment (please complete the following information):

Data Prepper 2.8

Additional context

This is a very similar issue to #4103, but is manifest through failures in the router and/or sinks.

@dlvenable dlvenable added bug Something isn't working untriaged and removed untriaged labels Aug 28, 2024
@dlvenable dlvenable self-assigned this Aug 30, 2024
@dlvenable
Copy link
Member Author

I have tried to reproduce this in order to find the root cause. It does not appear to be exactly from throwing the exception on the routes, but possibly some side effect.

I have this pipeline to attempt to reproduce, but it does not stall.

entry-pipeline:
  workers: 1
  delay: 10
  source:
    http:

  buffer:
    bounded_blocking:
      buffer_size: 2
      batch_size: 10000

  sink:
    - pipeline:
        name: second-pipeline
    - stdout:

second-pipeline:
  workers: 1
  delay: 10
  source:
    pipeline:
      name: entry-pipeline

  buffer:
    bounded_blocking:
      buffer_size: 2
      batch_size: 10000

  processor:

  routes:
    - bad_route: 'this will throw an exception'

  sink:
    - stdout:
        routes: ['bad_route']

@kkondaka
Copy link
Collaborator

@dlvenable your example pipeline returns a different kind of exception. The original exception mentioned in the description says that the submission to executor service is rejected. This is usually done when the executor service is being shutdown. Are you sure that the original issue did not occur while the pipeline is getting shutdown?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

No branches or pull requests

2 participants