Skip to content

Commit

Permalink
websocket lifecycle events: add transport agnostic variants
Browse files Browse the repository at this point in the history
  • Loading branch information
mostroverkhov committed Jul 24, 2024
1 parent eda9681 commit a16a39d
Show file tree
Hide file tree
Showing 4 changed files with 516 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.jauntsdn.netty.handler.codec.http2.websocketx;

import com.jauntsdn.netty.handler.codec.http2.websocketx.WebSocketEvent.WebSocketHandshakeErrorEvent;
import com.jauntsdn.netty.handler.codec.http2.websocketx.WebSocketEvent.WebSocketHandshakeStartEvent;
import com.jauntsdn.netty.handler.codec.http2.websocketx.WebSocketEvent.WebSocketHandshakeSuccessEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
Expand Down Expand Up @@ -74,9 +77,13 @@ static void fireHandshakeStartAndError(
parentPipeline.fireUserEventTriggered(
new Http2WebSocketHandshakeStartEvent(
serial, path, subprotocols, startNanos, requestHeaders));
parentPipeline.fireUserEventTriggered(
new WebSocketHandshakeStartEvent(serial, path, subprotocols, startNanos, requestHeaders));

parentPipeline.fireUserEventTriggered(
new Http2WebSocketHandshakeErrorEvent(serial, path, subprotocols, errorNanos, null, t));
parentPipeline.fireUserEventTriggered(
new WebSocketHandshakeErrorEvent(serial, path, subprotocols, errorNanos, null, t));
return;
}
parentPipeline.fireExceptionCaught(t);
Expand All @@ -97,10 +104,15 @@ static void fireHandshakeStartAndError(
parentPipeline.fireUserEventTriggered(
new Http2WebSocketHandshakeStartEvent(
serial, path, subprotocols, startNanos, requestHeaders));
parentPipeline.fireUserEventTriggered(
new WebSocketHandshakeStartEvent(serial, path, subprotocols, startNanos, requestHeaders));

parentPipeline.fireUserEventTriggered(
new Http2WebSocketHandshakeErrorEvent(
serial, path, subprotocols, errorNanos, null, errorName, errorMessage));
parentPipeline.fireUserEventTriggered(
new WebSocketHandshakeErrorEvent(
serial, path, subprotocols, errorNanos, null, errorName, errorMessage));
}

static void fireHandshakeStartAndSuccess(
Expand All @@ -116,34 +128,47 @@ static void fireHandshakeStartAndSuccess(
ChannelPipeline parentPipeline = webSocketChannel.parent().pipeline();
ChannelPipeline webSocketPipeline = webSocketChannel.pipeline();

Http2WebSocketHandshakeStartEvent startEvent =
Http2WebSocketHandshakeStartEvent http2StartEvent =
new Http2WebSocketHandshakeStartEvent(
serial, path, subprotocols, startNanos, requestHeaders);
Http2WebSocketHandshakeSuccessEvent successEvent =
WebSocketHandshakeStartEvent startEvent =
new WebSocketHandshakeStartEvent(serial, path, subprotocols, startNanos, requestHeaders);
Http2WebSocketHandshakeSuccessEvent http2SuccessEvent =
new Http2WebSocketHandshakeSuccessEvent(
serial, path, subprotocols, subprotocol, successNanos, responseHeaders);
WebSocketHandshakeSuccessEvent successEvent =
new WebSocketHandshakeSuccessEvent(
serial, path, subprotocols, subprotocol, successNanos, responseHeaders);

parentPipeline.fireUserEventTriggered(http2StartEvent);
parentPipeline.fireUserEventTriggered(startEvent);
parentPipeline.fireUserEventTriggered(http2SuccessEvent);
parentPipeline.fireUserEventTriggered(successEvent);

webSocketPipeline.fireUserEventTriggered(http2StartEvent);
webSocketPipeline.fireUserEventTriggered(startEvent);
webSocketPipeline.fireUserEventTriggered(http2SuccessEvent);
webSocketPipeline.fireUserEventTriggered(successEvent);
}

static void fireHandshakeStart(
Http2WebSocketChannel webSocketChannel, Http2Headers requestHeaders, long timestampNanos) {
ChannelPipeline parentPipeline = webSocketChannel.parent().pipeline();
ChannelPipeline webSocketPipeline = webSocketChannel.pipeline();
int serial = webSocketChannel.serial();
String path = webSocketChannel.path();
String subprotocol = webSocketChannel.subprotocol();

Http2WebSocketHandshakeStartEvent startEvent =
Http2WebSocketHandshakeStartEvent http2StartEvent =
new Http2WebSocketHandshakeStartEvent(
webSocketChannel.serial(),
webSocketChannel.path(),
webSocketChannel.subprotocol(),
timestampNanos,
requestHeaders);
serial, path, subprotocol, timestampNanos, requestHeaders);

WebSocketHandshakeStartEvent startEvent =
new WebSocketHandshakeStartEvent(serial, path, subprotocol, timestampNanos, requestHeaders);

parentPipeline.fireUserEventTriggered(http2StartEvent);
parentPipeline.fireUserEventTriggered(startEvent);
webSocketPipeline.fireUserEventTriggered(http2StartEvent);
webSocketPipeline.fireUserEventTriggered(startEvent);
}

Expand All @@ -155,19 +180,22 @@ static void fireHandshakeError(
ChannelPipeline parentPipeline = webSocketChannel.parent().pipeline();

if (t instanceof Exception) {
String path = webSocketChannel.path();
ChannelPipeline webSocketPipeline = webSocketChannel.pipeline();
String path = webSocketChannel.path();
int serial = webSocketChannel.serial();
String subprotocol = webSocketChannel.subprotocol();

Http2WebSocketHandshakeErrorEvent errorEvent =
Http2WebSocketHandshakeErrorEvent http2ErrorEvent =
new Http2WebSocketHandshakeErrorEvent(
webSocketChannel.serial(),
path,
webSocketChannel.subprotocol(),
timestampNanos,
responseHeaders,
t);
serial, path, subprotocol, timestampNanos, responseHeaders, t);

WebSocketHandshakeErrorEvent errorEvent =
new WebSocketHandshakeErrorEvent(
serial, path, subprotocol, timestampNanos, responseHeaders, t);

parentPipeline.fireUserEventTriggered(http2ErrorEvent);
parentPipeline.fireUserEventTriggered(errorEvent);
webSocketPipeline.fireUserEventTriggered(http2ErrorEvent);
webSocketPipeline.fireUserEventTriggered(errorEvent);
return;
}
Expand All @@ -176,21 +204,23 @@ static void fireHandshakeError(

static void fireHandshakeSuccess(
Http2WebSocketChannel webSocketChannel, Http2Headers responseHeaders, long timestampNanos) {
String path = webSocketChannel.path();
String subprotocol = webSocketChannel.subprotocol();
ChannelPipeline parentPipeline = webSocketChannel.parent().pipeline();
ChannelPipeline webSocketPipeline = webSocketChannel.pipeline();
String path = webSocketChannel.path();
String subprotocol = webSocketChannel.subprotocol();
int serial = webSocketChannel.serial();

Http2WebSocketHandshakeSuccessEvent successEvent =
Http2WebSocketHandshakeSuccessEvent http2SuccessEvent =
new Http2WebSocketHandshakeSuccessEvent(
webSocketChannel.serial(),
path,
subprotocol,
subprotocol,
timestampNanos,
responseHeaders);
serial, path, subprotocol, subprotocol, timestampNanos, responseHeaders);

WebSocketHandshakeSuccessEvent successEvent =
new WebSocketHandshakeSuccessEvent(
serial, path, subprotocol, subprotocol, timestampNanos, responseHeaders);

parentPipeline.fireUserEventTriggered(http2SuccessEvent);
parentPipeline.fireUserEventTriggered(successEvent);
webSocketPipeline.fireUserEventTriggered(http2SuccessEvent);
webSocketPipeline.fireUserEventTriggered(successEvent);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2024 - present Maksym Ostroverkhov.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.jauntsdn.netty.handler.codec.http2.websocketx;

import io.netty.handler.codec.Headers;
import javax.annotation.Nullable;

/** Base type for transport agnostic websocket lifecycle events */
public abstract class WebSocketEvent extends Http2WebSocketEvent.Http2WebSocketLifecycleEvent {

WebSocketEvent(
Http2WebSocketEvent.Type type,
int id,
String path,
String subprotocols,
long timestampNanos) {
super(type, id, path, subprotocols, timestampNanos);
}

/** websocket handshake start event */
public static class WebSocketHandshakeStartEvent extends WebSocketEvent {
private final Headers<CharSequence, CharSequence, ?> requestHeaders;

WebSocketHandshakeStartEvent(
int id,
String path,
String subprotocol,
long timestampNanos,
Headers<CharSequence, CharSequence, ?> requestHeaders) {
super(Type.HANDSHAKE_START, id, path, subprotocol, timestampNanos);
this.requestHeaders = requestHeaders;
}

/** @return websocket request headers */
public Headers<CharSequence, CharSequence, ?> requestHeaders() {
return requestHeaders;
}
}

/** websocket handshake error event */
public static class WebSocketHandshakeErrorEvent extends WebSocketEvent {
private final Headers<CharSequence, CharSequence, ?> responseHeaders;
private final String errorName;
private final String errorMessage;
private final Throwable error;

WebSocketHandshakeErrorEvent(
int id,
String path,
String subprotocols,
long timestampNanos,
Headers<CharSequence, CharSequence, ?> responseHeaders,
Throwable error) {
this(id, path, subprotocols, timestampNanos, responseHeaders, error, null, null);
}

WebSocketHandshakeErrorEvent(
int id,
String path,
String subprotocols,
long timestampNanos,
Headers<CharSequence, CharSequence, ?> responseHeaders,
String errorName,
String errorMessage) {
this(id, path, subprotocols, timestampNanos, responseHeaders, null, errorName, errorMessage);
}

private WebSocketHandshakeErrorEvent(
int id,
String path,
String subprotocols,
long timestampNanos,
Headers<CharSequence, CharSequence, ?> responseHeaders,
Throwable error,
String errorName,
String errorMessage) {
super(Type.HANDSHAKE_ERROR, id, path, subprotocols, timestampNanos);
this.responseHeaders = responseHeaders;
this.errorName = errorName;
this.errorMessage = errorMessage;
this.error = error;
}

/** @return response headers of failed websocket handshake */
public Headers<CharSequence, CharSequence, ?> responseHeaders() {
return responseHeaders;
}

/**
* @return exception associated with failed websocket handshake. May be null, in this case
* {@link #errorName()} and {@link #errorMessage()} contain error details.
*/
@Nullable
public Throwable error() {
return error;
}

/**
* @return name of error associated with failed websocket handshake. May be null, in this case
* {@link #error()} contains respective exception
*/
public String errorName() {
return errorName;
}

/**
* @return message of error associated with failed websocket handshake. May be null, in this
* case {@link #error()} contains respective exception
*/
public String errorMessage() {
return errorMessage;
}
}

/** websocket handshake success event */
public static class WebSocketHandshakeSuccessEvent extends WebSocketEvent {
private final String subprotocol;
private final Headers<CharSequence, CharSequence, ?> responseHeaders;

WebSocketHandshakeSuccessEvent(
int id,
String path,
String subprotocols,
String subprotocol,
long timestampNanos,
Headers<CharSequence, CharSequence, ?> responseHeaders) {
super(Type.HANDSHAKE_SUCCESS, id, path, subprotocols, timestampNanos);
this.subprotocol = subprotocol;
this.responseHeaders = responseHeaders;
}

public String subprotocol() {
return subprotocol;
}

/** @return response headers of succeeded websocket handshake */
public Headers<CharSequence, CharSequence, ?> responseHeaders() {
return responseHeaders;
}
}
}
Loading

0 comments on commit a16a39d

Please sign in to comment.