Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Java BundleProcessorCache evicts all processors for a bundle descriptor id after 1 minute of idleness #29797

Open
1 of 16 tasks
scwhittle opened this issue Dec 18, 2023 · 4 comments · May be fixed by #33175
Open
1 of 16 tasks

Comments

@scwhittle
Copy link
Contributor

What happened?

A timeout of 1 minute is specified here:
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java#L923

The key of this cache is the bundle descriptor id (fused stage) and the value is a list of BundleProcessors cached. This means that if we don't process a stage for a minute all of the bundle processors are destroyed. For long-lived streaming pipelines this is wasteful as stages are processed in parallel and if were previously processed, will likely process again. Creating a BundleProcessor involves constructing user DoFn and running Setup method so it is non-trivial.

Some improvements:

  • increase the timeout for streaming pipelines by default or make it runner configurable
  • don't throw away the entire list of cached BundleProcessors on expiration but perhaps just remove the last processor.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@gurjarutkarsh
Copy link

could introduce a way to track the frequency of use for each Bundle processor and only remove those that haven't been utilized for a longer period, rather than a strict one-minute rule.

@kennknowles
Copy link
Member

What's the comparable lifetime in other contexts, e.g. Dataflow legacy worker streaming? Is there any eviction at all?

@scwhittle
Copy link
Contributor Author

Dataflow streaming runner never times these out so that is certainly a valid option in my opinion.

Queue is here:

Stored in map here:

private final ConcurrentMap<String, ComputationState> computationMap = new ConcurrentHashMap<>();

Neither of which have a timeout.

We add all successfully processed dofns back to the queue:

@kennknowles
Copy link
Member

Yea, I mean for today we do know that there's a finite number of them since the graph cannot change dynamically. I don't have historical context, nor have I looked at the commit history, so I don't know if there is a specific motivation here. Lacking that, I also would favor copying something that is known good. Aka let us just not time them out. Or we could at least set it to, say, an hour. I suppose you can close up and free bounded sources that have completed, if that is not done some other way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants