From a0440f4bf748f6a7b30274c6f0b695ac0cbd7de9 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 18 Nov 2022 20:47:41 +0200 Subject: [PATCH] wip Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/internal/UnboundedProcessor.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 520ff318a..4c755edb8 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -23,8 +23,10 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Stream; -import org.reactivestreams.Subscription; +import org.reactivestreams.Suwipbscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.Exceptions; @@ -97,7 +99,7 @@ public UnboundedProcessor() { this(() -> {}); } - public UnboundedProcessor(Runnable onFinalizedHook) { + public UnboundedProcessor(Runnable onFinalizedHook, Consumer onValueDelivered) { this.onFinalizedHook = onFinalizedHook; this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); @@ -121,6 +123,9 @@ public Object scanUnsafe(Attr key) { return null; } + public boolean tryEmitNext + + @Deprecated public void onNextPrioritized(ByteBuf t) { if (this.done || this.cancelled) { release(t); @@ -157,6 +162,7 @@ public void onNextPrioritized(ByteBuf t) { } @Override + @Deprecated public void onNext(ByteBuf t) { if (this.done || this.cancelled) { release(t); @@ -193,6 +199,7 @@ public void onNext(ByteBuf t) { } @Override + @Deprecated public void onError(Throwable t) { if (this.done || this.cancelled) { Operators.onErrorDropped(t, currentContext()); @@ -235,6 +242,7 @@ public void onError(Throwable t) { } @Override + @Deprecated public void onComplete() { if (this.done || this.cancelled) { return;