Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-249 Added properties for handling backpressure in KplMessageHandler #250

Closed
wants to merge 20 commits into from

Conversation

siddharthjain210
Copy link
Contributor

Added 2 properties for handling backpressure.
Added test cases for KPL Message Handler

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks promising.
Please, consider my review.
Thanks

@siddharthjain210
Copy link
Contributor Author

@artembilan : I have pushed the changes as requested, please take a look at the test cases to understand the behaviour.

@siddharthjain210
Copy link
Contributor Author

@artembilan : I have worked on the most of the tasks, but not clear on the the 2, which are left open.

…o maxInFlightRecordsInitialBackoffDuration. Corrected capacity loop entry condition to use greater than zero.
@siddharthjain210
Copy link
Contributor Author

siddharthjain210 commented Dec 27, 2024

understood, and makes sense. Having a KPLBackpressureHandlerAdvice extending the spring retry advice is good. I can
extract the code from the current KPLMessageHandler into KPLBackpressureHandlerAdvice.

public class KPLBackpressureHandlerAdvice extends RequestHandlerRetryAdvice {

@Override
	protected Object doInvoke(final ExecutionCallback callback, Object target, final Message<?> message) throws KPLBackpressureException {
		if (this.maxInFlightRecords > 0) {
			waitForCapacityInUserRecordsBuffer();
		}

		return super.doInvoke(callback, target, message);
	}
}

The advices are configured in AbstractReplyProducingMessageHandler,

public abstract class AbstractReplyProducingMessageHandler extends AbstractMessageProducingHandler
		implements BeanClassLoaderAware {

	private final List<Advice> adviceChain = new LinkedList<>();

but the current outbound adapters are extending AbstractAwsMessageHandler which internally extends AbstractMessageProducingHandler. AbstractMessageProducingHandler doesn't have any advices configured.


public abstract class AbstractAwsMessageHandler<H> extends AbstractMessageProducingHandler {
}

If we can change the AbstractAwsMessageHandler`` to extend AbstractReplyProducingMessageHandlerfromAbstractMessageProducingHandler, it would allow users to define Advice Chain on their own. Otherwise, we can add the support for advice in AbstractAwsMessageHandler```.

@artembilan : Please advice the next steps and if the direction I am heading into seems correct.

@artembilan
Copy link
Member

I might not be clear, but I didn't ask for any custom Advice.
That use-case specific KPLBackpressureHandlerAdvice would not make sense outside of the KPLMessageHandler.

My point is to not have any opinion, but rather just fails with an error when condition has not met yet according to the configuration.
From here it might even better sound like backPressureThreshold instead of maxInFlightRecords.
Then we just throw that specific KplBackPressureException - and everything else is up to end-user how to handle such a situation.
The RequestHandlerRetryAdvice would call the handler for the same message again and again.
The RequestHandlerCircuitBreakerAdvice would not let the message to pass down to the handler if it is in the open state.
You may combine them as well.

The advice is applied from the endpoint.
See more info in docs https://docs.spring.io/spring-integration/reference/handler-advice.html.
We don't need to do anything with the AbstractAwsMessageHandler since you simply can proxy the whole handler or rely on the logic from Spring Integration.

@siddharthjain210
Copy link
Contributor Author

siddharthjain210 commented Dec 27, 2024

Sorry, if i understood correctly this time. I am getting confused between 2 contexts - consumer and library provider context.

If the threshold has breached, we throw a KPLbackPressureException.
The user/consumer of KPLMessageHandler can configure any number of advice [retry, timeout, circuit breaker, combinations] as required whenever the above exception is thrown.

We [library providers] don't really need to define and write any custom resilience pattern in the KPL message handler. A plain condition would be simpler.

@artembilan
Copy link
Member

A plain condition would be simpler.

Yes, that's my idea.
I might miss something, but feels like everywhere else.
We fail with some exception because we cannot handle yet.
"Back pressure" in your terms.
So, that is up to user of this service how to deal with such an error.
And lucky we have all the possible tools with those out-of-the-box advices.

@siddharthjain210
Copy link
Contributor Author

@artembilan: I have done the requested changes. Can you please review.

  1. Simplified the condition, removed the retry and exp backoff handling as discussed above.
  2. Introduced a KPLBackpressureException.
  3. Rename maxRecordsInFlight to backpressureThreshold.
  4. Refactored Tests.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the update !
Please, don’t comment like Fixed, Done etc. if you agree with me and going to push. I definitely will it is fixed in your next commit(s). Otherwise it feels like a spam in my mail box and contrpriductive.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there.
Thank you for the update!

* On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown.
* If Backpressure handling is enabled, {@link KplBackpressureException} must be handled.
*
* @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need the first part since you just nailed it with the second sentence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant remove Defaulted to 0.
Doesn't sounds good and covered with the next sentence.

* On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown.
* If Backpressure handling is enabled, {@link KplBackpressureException} must be handled.
*
* @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant remove Defaulted to 0.
Doesn't sounds good and covered with the next sentence.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Pulling locally for final review and merge.

@artembilan
Copy link
Member

Merged as b5f6c39 after some minor clean up.
Thank you for contribution; looking forward for more!

@artembilan artembilan closed this Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants