Skip to content

Commit

Permalink
WebSockets Next: cancel returned Multi if the connection is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba committed Jun 7, 2024
1 parent 9222682 commit 2af63ef
Showing 1 changed file with 16 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,41 +319,31 @@ public Uni<Void> sendText(String message, boolean broadcast) {
return broadcast ? connection.broadcast().sendText(message) : connection.sendText(message);
}

public Uni<Void> multiText(Multi<Object> multi, Function<Object, Uni<Void>> action) {
multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
public Uni<Void> multiText(Multi<Object> multi, Function<? super Object, Uni<?>> action) {
multi
// Encode and send message
.onItem().call(action)
.onFailure().recoverWithMulti(t -> {
return doOnError(t).toMulti();
})
.subscribe().with(
m -> {
// Encode and send message
action.apply(m)
.onFailure().recoverWithUni(this::doOnError)
.subscribe()
.with(v -> LOG.debugf("Multi >> text message: %s", connection),
t -> LOG.errorf(t, "Unable to send text message from Multi: %s", connection));
},
t -> {
LOG.errorf(t, "Unable to send text message from Multi: %s ", connection);
});
m -> LOG.debugf("Multi >> text message: %s", connection),
t -> LOG.errorf(t, "Unable to send text message from Multi: %s ", connection));
return Uni.createFrom().voidItem();
}

public Uni<Void> sendBinary(Buffer message, boolean broadcast) {
return broadcast ? connection.broadcast().sendBinary(message) : connection.sendBinary(message);
}

public Uni<Void> multiBinary(Multi<Object> multi, Function<Object, Uni<Void>> action) {
multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
public Uni<Void> multiBinary(Multi<Object> multi, Function<? super Object, Uni<?>> action) {
multi
// Encode and send message
.onItem().call(action)
.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
.subscribe().with(
m -> {
// Encode and send message
action.apply(m)
.onFailure().recoverWithUni(this::doOnError)
.subscribe()
.with(v -> LOG.debugf("Multi >> binary message: %s", connection),
t -> LOG.errorf(t, "Unable to send binary message from Multi: %s", connection));
},
t -> {
LOG.errorf(t, "Unable to send text message from Multi: %s ", connection);
});
m -> LOG.debugf("Multi >> binary message: %s", connection),
t -> LOG.errorf(t, "Unable to send binary message from Multi: %s ", connection));
return Uni.createFrom().voidItem();
}
}

0 comments on commit 2af63ef

Please sign in to comment.