From 3bd6872615d713f0641aacd4460bde46762172f6 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 30 Apr 2024 08:08:22 +0300 Subject: [PATCH 01/16] next development iteration --- README.md | 2 +- gradle.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0c837e2..b151b6f 100644 --- a/README.md +++ b/README.md @@ -159,7 +159,7 @@ repositories { } dependencies { - implementation "com.jauntsdn.netty:netty-websocket-http1:1.1.3" + implementation "com.jauntsdn.netty:netty-websocket-http1:1.1.4" } ``` diff --git a/gradle.properties b/gradle.properties index 06fc45d..f434711 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=com.jauntsdn.netty -version=1.1.4 +version=1.1.5 googleJavaFormatPluginVersion=0.9 dependencyManagementPluginVersion=1.1.0 From 5fc287a0b9b7adcdd56217968e02b51e64cd1a8c Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 23 May 2024 15:00:30 +0300 Subject: [PATCH 02/16] update dependencies --- gradle.properties | 2 +- netty-websocket-http1-test/gradle.lockfile | 20 ++++++++++---------- netty-websocket-http1/gradle.lockfile | 16 ++++++++-------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/gradle.properties b/gradle.properties index f434711..a04c92e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,7 +7,7 @@ gitPluginVersion=0.13.0 osDetectorPluginVersion=1.7.3 versionsPluginVersion=0.45.0 -nettyVersion=4.1.109.Final +nettyVersion=4.1.110.Final nettyTcnativeVersion=2.0.65.Final hdrHistogramVersion=2.1.12 slf4jVersion=1.7.36 diff --git a/netty-websocket-http1-test/gradle.lockfile b/netty-websocket-http1-test/gradle.lockfile index 66f1289..0cd39b2 100644 --- a/netty-websocket-http1-test/gradle.lockfile +++ b/netty-websocket-http1-test/gradle.lockfile @@ -9,16 +9,16 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec-http:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-epoll:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-kqueue:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-native-unix-common:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.109.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec-http:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-epoll:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-kqueue:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-native-unix-common:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.14.11=testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath org.assertj:assertj-core:3.25.3=testCompileClasspath,testRuntimeClasspath diff --git a/netty-websocket-http1/gradle.lockfile b/netty-websocket-http1/gradle.lockfile index 99ee8e2..054601b 100644 --- a/netty-websocket-http1/gradle.lockfile +++ b/netty-websocket-http1/gradle.lockfile @@ -7,13 +7,13 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.109.Final=compileClasspath -io.netty:netty-codec-http:4.1.109.Final=compileClasspath -io.netty:netty-codec:4.1.109.Final=compileClasspath -io.netty:netty-common:4.1.109.Final=compileClasspath -io.netty:netty-handler:4.1.109.Final=compileClasspath -io.netty:netty-resolver:4.1.109.Final=compileClasspath -io.netty:netty-transport-native-unix-common:4.1.109.Final=compileClasspath -io.netty:netty-transport:4.1.109.Final=compileClasspath +io.netty:netty-buffer:4.1.110.Final=compileClasspath +io.netty:netty-codec-http:4.1.110.Final=compileClasspath +io.netty:netty-codec:4.1.110.Final=compileClasspath +io.netty:netty-common:4.1.110.Final=compileClasspath +io.netty:netty-handler:4.1.110.Final=compileClasspath +io.netty:netty-resolver:4.1.110.Final=compileClasspath +io.netty:netty-transport-native-unix-common:4.1.110.Final=compileClasspath +io.netty:netty-transport:4.1.110.Final=compileClasspath org.codehaus.mojo:animal-sniffer-annotations:1.14=googleJavaFormat1.6 empty=annotationProcessor From f7f19c31fc2d366d5ac20d01f21219b81e6b9a5e Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Fri, 24 May 2024 16:42:53 +0300 Subject: [PATCH 03/16] Server/ClientProtocolHandler: override isSharable() to get rid of unnecessary reflection, thread locals and wasted memory. --- .../http/websocketx/WebSocketClientProtocolHandler.java | 5 +++++ .../http/websocketx/WebSocketServerProtocolHandler.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java index ea31759..fdda21c 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java @@ -93,6 +93,11 @@ public ChannelFuture handshakeCompleted() { return completed; } + @Override + public boolean isSharable() { + return false; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeCompleted = ctx.newPromise(); diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index d8ae34a..49d0c76 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -87,6 +87,11 @@ public ChannelFuture handshakeCompleted() { return completed; } + @Override + public boolean isSharable() { + return false; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeCompleted = ctx.newPromise(); From b48f8dc1dc5727278ec35a9a2cb68f36327374b0 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 11 Jun 2024 11:10:13 +0300 Subject: [PATCH 04/16] tests: use actual tls certificate instead of generating one at runtime --- .../perftest/bulkencoder/server/Main.java | 5 ++++- .../perftest/encoder/server/Main.java | 5 ++++- .../src/main/resources/localhost.p12 | Bin 0 -> 4077 bytes .../http/websocketx/soaktest/server/Main.java | 4 +++- .../codec/http/websocketx/test/Security.java | 18 ++++++++++++------ 5 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 netty-websocket-http1-perftest/src/main/resources/localhost.p12 diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java index e6289b4..6359436 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/bulkencoder/server/Main.java @@ -52,6 +52,8 @@ public static void main(String[] args) throws Exception { boolean isNativeTransport = Boolean.parseBoolean(System.getProperty("NATIVE_TRANSPORT", "true")); boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + String keyStoreFile = System.getProperty("KEYSTORE", "localhost.p12"); + String keyStorePassword = System.getProperty("KEYSTORE_PASS", "localhost"); boolean isOpensslAvailable = OpenSsl.isAvailable(); boolean isEpollAvailable = Transport.isEpollAvailable(); @@ -67,7 +69,8 @@ public static void main(String[] args) throws Exception { Transport transport = Transport.get(isNativeTransport); logger.info("\n==> io transport: {}", transport.type()); - SslContext sslContext = isEncrypted ? Security.serverSslContext() : null; + SslContext sslContext = + isEncrypted ? Security.serverSslContext(keyStoreFile, keyStorePassword) : null; ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java index e029068..667450f 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java @@ -52,6 +52,8 @@ public static void main(String[] args) throws Exception { boolean isNativeTransport = Boolean.parseBoolean(System.getProperty("NATIVE_TRANSPORT", "true")); boolean isEncrypted = Boolean.parseBoolean(System.getProperty("ENCRYPT", "true")); + String keyStoreFile = System.getProperty("KEYSTORE", "localhost.p12"); + String keyStorePassword = System.getProperty("KEYSTORE_PASS", "localhost"); boolean isOpensslAvailable = OpenSsl.isAvailable(); boolean isEpollAvailable = Transport.isEpollAvailable(); @@ -67,7 +69,8 @@ public static void main(String[] args) throws Exception { Transport transport = Transport.get(isNativeTransport); logger.info("\n==> io transport: {}", transport.type()); - SslContext sslContext = isEncrypted ? Security.serverSslContext() : null; + SslContext sslContext = + isEncrypted ? Security.serverSslContext(keyStoreFile, keyStorePassword) : null; ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-perftest/src/main/resources/localhost.p12 b/netty-websocket-http1-perftest/src/main/resources/localhost.p12 new file mode 100644 index 0000000000000000000000000000000000000000..3d07e307ed4b5addc2833d8c8dc1f5f86ba399c8 GIT binary patch literal 4077 zcmVLj{#%>?ss5W{0EWqT)Yp30^TG z8S=>I9#DH4t51d7o)TZI4}|Eiy?25Y@5S3=JS~k(xL>u*(XG!zx&V1-DM7q_m@+~% zHfD}1*`ac;AB?)~J<_kt;1>Us)53ai`ZrOYtalY|5-riu?mo;bNC$3-uw#}e_sB&2;4l2i(ra9Yl^?u^ z8w&(Go8F*h?Ej8A%l#HcGKDxYjvQK`W$NskvlWT@@|AQpC(+> zdVGu2b^giC&A|XGM_~#W)^YLz242c+VvJ-iE!)y3Od%FIi9Sb;3pzEKzwc;hCWIVi zo=047s%C@(^+)2c)F9~(D=u)1a|UlUr@Tzfmh{v7bu{`agK|2hb4Zt{n_9QfLq!j0 zLW}B#vSEZV>yvv;W%&_YmdB~QdQtk6<{z}bsXxMf&0N=LX;3Hk5$z$rCfa2=+!STTgBU zX`}A5m+@5&ea`vMm<3>Y*g=ReYYk2@$DKw+b%16H#sIyHIlpb|Q*fhDq!jbv6n^zW zeDkDCo<1u4x_|G!W@&PEZ>34|3-{+{#1j-S_^*|gu46Sv$RL&>PL@if*x0ZfkLW;^ zMovfHlQq)_-X&e0D}g$-nRSHYPHE1p140JYvzRWOS`c+Lw`u~(k`$j(u--PNXZoYN zXY`QFVK4w8Yrd(eLVVf6+7;t`(Jr6IBx7-j_p6sz2nzD7Q0s3K%7zBgL8>Axv=$M* zxl?b-pH3k^y=;cp4QQcAAtQ^R$UBzKE0U9|k1RFUrn)MpLxc&QJWVb7RV|ON3i8NQ znZi5|;C)8f*bx!}P4lYjjKUb}_}odz{{@K^a1^!dCcXZc8;jE9_V1}UZ%VmWiG1b% zIuwp6bZ$1c;3Kf6Sq2@r49tG{v(r^?}BH9U){rZjUzKvDzjUJxT0qd9WWA zBRgpC@X1^D$#4OuGhNq3hZ%iNLAeqMNQf3Q;9VkVG#ZvpKA))5f91!s;xZmK(|r7* zf=YGW2nUgN9hTEzAJ+AF5GkcA90>z;>Nny!8zABB~mq zk{dQ6>G_}hG!Eh$lcGH!pK)x)uQ|NhaLko`NR)BO7bpRRqOiva^`#q|!jB8ui zKBOq`FO?QN`S4|Bj$~$8I}?|B)r_Iq@l+_amahO+>8L?i_-iswvDWtD7%yiNDyl6` zeSVA;1ii)3_56I^(}XigC^vRC;P^qo;eB{_-H@_fv}z&_D7XX2Iryiojcn@mZigve zwSsIq6^a!s5bF)i!thyW7Lr)^o6H+v$#Lp%rYTh{mX|9y}2fbxfTL9H7NhPDQcFYJo6#LHM6gC8;c;9-M1hkn$_lxGb zGgdY(|3JsY6u-8ICg-YgOc){lVdD-Ca;y+hQjzj1!E9KS7}$J2TIfPG6D^;I$xN#9 zdwLmnyuN-a|G57WDCp>jeW)IO$B@HZFoFre1_>&LNQUrK0##Jn#4jnLD!u7$}!X<0eRk7OLlw}iwlJ&&efe;=uojNK#cQU#v!@1ZrPDv4SV}X19|yu ztEu63tB`mf9;!`~N7AmiA1#1wU3Wt>r!|;ir^>DleK?w7!M00aT;Z&$0q)ukW*HX9 zjQwdAKFpl`=jl7jhSsZ~#r~Skis30~R9bzl!6<<9LPQ5$EMXWH8 zE={{(iq&6Vg)1HAYXOyAwXfs&3ZQ65|JNNkx7uLEE(ypknkH5m8kPZ=fW?vElp z5Hn_a_DU6*FHf*_y`VfGmc+RSA1x0_;T96qA?x5>3xDtRNi9nCFUZ*W6B$ZYlxfUo z`+ELQ#-4g(umIpS>iPs4vmt8ndxzjIZe(RHPiwy`yOMFY<%PtURdrF&7q0kP;)@d5-e~5kMw-zCU+T}x%7|5j$_OSNawrvZ6vof7KqP?5f`xlUTMoZqu^r0?OKy>% zOQ;mc`Wzi<22jV!xdW6w49aH!TIR^;6N#?TrVq~6scv<(X{~H?U-g^W3}Y>-Nk-EG z4@q;IOhXKZRc4#J5>jOhEha>~cvPh7f7!^Rs|4iA3(k4A#isX&1Re{n9Xe;@-Yo9! zwinJZ8ga!=u_`#IRu@lW8L?m@vctqb`YDGkvb}Za(n_P&YG2{QQO@9{F_THbZKiFK z5{WtFT`zfPfV#Dy9o5JrGS&B|;Q67Z=dw>&3oBOTwPM-H!F?&0w6rK~t;DF9m^E|4 z+eT|!b&tijehmo1(-14Bzx#4jeJU9_4YW~SJsd930Qp%6xTm#vl;F1w^6fO~u&{ROZI>!FU%78U@bFJiLne7|xd_as z)UngE5w?1AzU!6(k^(T#mbDy>wKN0>zAu5&%vn9xn-H^4E%zRX6+T1ADd5TLrZT7mvA;;`2-=k@Xm8K9J0^5tfcriJnuv!r-zwxf1 zn6^=TTL#`WC?TpO7dRBXX}WJjZ@@s+;0OTFRBU{% z88EYU_o|1@xfIzRwn5qD9Ys8a^9wInP26<34Es)5wXdv?$K6Z&pmEp3NGSGS z|5oi64iY^bX!|iW#GBv*pB)N~%1~g41-KYphgc%>XSVMSgzov}NhffG=i}wIbHUz3 zL_wcf>_yuWUEz;L>|U} zx+J3SRi@BFp1}l zv=ZOE-Mv!a47Rw>OXSXqh)Y9Xww|2t&)6%fucji#h}D1YKToR1q4uW7^nqW?o?AAEjfEQ!j*6qvdH8Qg5|KB;S!bkNG$ZW@M=yvB;@Hl3(c4Dajk$1OZBFxV9Y9?Ui6!}|0!o%)VY#IoOmPz} zks(RvNRkP%&|E{{20UYI(d+XJ!=HwDOD1Gv_p{#GKj8pE@A9 z<;o!##aKRhE0)Rm_eWF+yHUP^s&l4pP=s~lqHkPc*jIk$AA&=@BruHgMhjo+Qm}=% zb!w~Wp2)Cz(Orq{H#vTcF)9XBt6cLaIFMyM0!8amZ#+*7(+ybXUrjpC-UgR3?20AR zIhbHSg|Ub158iflXKi2*LR{L_WDx`xrd8AVl-ydtl;36-DH=lr)$1`OFe3&DDuzgg z_YDCF6)_eB6cZ4_6Sxa6(7ZTkxBXPd{Ej?>HZU io transport: {}", transport.type()); - SslContext sslContext = Security.serverSslContext(); + SslContext sslContext = Security.serverSslContext(keyStoreFile, keyStorePassword); ServerBootstrap bootstrap = new ServerBootstrap(); Channel server = diff --git a/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java b/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java index c3a8956..aa3422c 100644 --- a/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java +++ b/netty-websocket-http1-test/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/test/Security.java @@ -22,10 +22,11 @@ import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.netty.handler.ssl.util.SelfSignedCertificate; -import java.security.SecureRandom; +import java.io.InputStream; +import java.security.KeyStore; import java.util.Arrays; import java.util.List; +import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +34,16 @@ public final class Security { private static final Logger logger = LoggerFactory.getLogger(Security.class); - public static SslContext serverSslContext() throws Exception { - SecureRandom random = new SecureRandom(); - SelfSignedCertificate ssc = new SelfSignedCertificate("com.jauntsdn", random, 1024); + public static SslContext serverSslContext(String keystoreFile, String keystorePassword) + throws Exception { + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + InputStream keystoreStream = Security.class.getClassLoader().getResourceAsStream(keystoreFile); + char[] keystorePasswordArray = keystorePassword.toCharArray(); + keyStore.load(keystoreStream, keystorePasswordArray); + keyManagerFactory.init(keyStore, keystorePasswordArray); - return SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + return SslContextBuilder.forServer(keyManagerFactory) .protocols("TLSv1.3") .sslProvider(sslProvider()) .ciphers(supportedCypherSuites(), SupportedCipherSuiteFilter.INSTANCE) From 712b0b32e248487b966bc3fe5b31b6b75a2e23e2 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 17 Jun 2024 08:06:02 +0300 Subject: [PATCH 05/16] soak test: add keystore file --- .../src/main/resources/localhost.p12 | Bin 0 -> 4077 bytes soak_client_run.sh | 3 +++ soak_server_run.sh | 5 +++++ 3 files changed, 8 insertions(+) create mode 100644 netty-websocket-http1-soaktest/src/main/resources/localhost.p12 create mode 100755 soak_client_run.sh create mode 100755 soak_server_run.sh diff --git a/netty-websocket-http1-soaktest/src/main/resources/localhost.p12 b/netty-websocket-http1-soaktest/src/main/resources/localhost.p12 new file mode 100644 index 0000000000000000000000000000000000000000..3d07e307ed4b5addc2833d8c8dc1f5f86ba399c8 GIT binary patch literal 4077 zcmVLj{#%>?ss5W{0EWqT)Yp30^TG z8S=>I9#DH4t51d7o)TZI4}|Eiy?25Y@5S3=JS~k(xL>u*(XG!zx&V1-DM7q_m@+~% zHfD}1*`ac;AB?)~J<_kt;1>Us)53ai`ZrOYtalY|5-riu?mo;bNC$3-uw#}e_sB&2;4l2i(ra9Yl^?u^ z8w&(Go8F*h?Ej8A%l#HcGKDxYjvQK`W$NskvlWT@@|AQpC(+> zdVGu2b^giC&A|XGM_~#W)^YLz242c+VvJ-iE!)y3Od%FIi9Sb;3pzEKzwc;hCWIVi zo=047s%C@(^+)2c)F9~(D=u)1a|UlUr@Tzfmh{v7bu{`agK|2hb4Zt{n_9QfLq!j0 zLW}B#vSEZV>yvv;W%&_YmdB~QdQtk6<{z}bsXxMf&0N=LX;3Hk5$z$rCfa2=+!STTgBU zX`}A5m+@5&ea`vMm<3>Y*g=ReYYk2@$DKw+b%16H#sIyHIlpb|Q*fhDq!jbv6n^zW zeDkDCo<1u4x_|G!W@&PEZ>34|3-{+{#1j-S_^*|gu46Sv$RL&>PL@if*x0ZfkLW;^ zMovfHlQq)_-X&e0D}g$-nRSHYPHE1p140JYvzRWOS`c+Lw`u~(k`$j(u--PNXZoYN zXY`QFVK4w8Yrd(eLVVf6+7;t`(Jr6IBx7-j_p6sz2nzD7Q0s3K%7zBgL8>Axv=$M* zxl?b-pH3k^y=;cp4QQcAAtQ^R$UBzKE0U9|k1RFUrn)MpLxc&QJWVb7RV|ON3i8NQ znZi5|;C)8f*bx!}P4lYjjKUb}_}odz{{@K^a1^!dCcXZc8;jE9_V1}UZ%VmWiG1b% zIuwp6bZ$1c;3Kf6Sq2@r49tG{v(r^?}BH9U){rZjUzKvDzjUJxT0qd9WWA zBRgpC@X1^D$#4OuGhNq3hZ%iNLAeqMNQf3Q;9VkVG#ZvpKA))5f91!s;xZmK(|r7* zf=YGW2nUgN9hTEzAJ+AF5GkcA90>z;>Nny!8zABB~mq zk{dQ6>G_}hG!Eh$lcGH!pK)x)uQ|NhaLko`NR)BO7bpRRqOiva^`#q|!jB8ui zKBOq`FO?QN`S4|Bj$~$8I}?|B)r_Iq@l+_amahO+>8L?i_-iswvDWtD7%yiNDyl6` zeSVA;1ii)3_56I^(}XigC^vRC;P^qo;eB{_-H@_fv}z&_D7XX2Iryiojcn@mZigve zwSsIq6^a!s5bF)i!thyW7Lr)^o6H+v$#Lp%rYTh{mX|9y}2fbxfTL9H7NhPDQcFYJo6#LHM6gC8;c;9-M1hkn$_lxGb zGgdY(|3JsY6u-8ICg-YgOc){lVdD-Ca;y+hQjzj1!E9KS7}$J2TIfPG6D^;I$xN#9 zdwLmnyuN-a|G57WDCp>jeW)IO$B@HZFoFre1_>&LNQUrK0##Jn#4jnLD!u7$}!X<0eRk7OLlw}iwlJ&&efe;=uojNK#cQU#v!@1ZrPDv4SV}X19|yu ztEu63tB`mf9;!`~N7AmiA1#1wU3Wt>r!|;ir^>DleK?w7!M00aT;Z&$0q)ukW*HX9 zjQwdAKFpl`=jl7jhSsZ~#r~Skis30~R9bzl!6<<9LPQ5$EMXWH8 zE={{(iq&6Vg)1HAYXOyAwXfs&3ZQ65|JNNkx7uLEE(ypknkH5m8kPZ=fW?vElp z5Hn_a_DU6*FHf*_y`VfGmc+RSA1x0_;T96qA?x5>3xDtRNi9nCFUZ*W6B$ZYlxfUo z`+ELQ#-4g(umIpS>iPs4vmt8ndxzjIZe(RHPiwy`yOMFY<%PtURdrF&7q0kP;)@d5-e~5kMw-zCU+T}x%7|5j$_OSNawrvZ6vof7KqP?5f`xlUTMoZqu^r0?OKy>% zOQ;mc`Wzi<22jV!xdW6w49aH!TIR^;6N#?TrVq~6scv<(X{~H?U-g^W3}Y>-Nk-EG z4@q;IOhXKZRc4#J5>jOhEha>~cvPh7f7!^Rs|4iA3(k4A#isX&1Re{n9Xe;@-Yo9! zwinJZ8ga!=u_`#IRu@lW8L?m@vctqb`YDGkvb}Za(n_P&YG2{QQO@9{F_THbZKiFK z5{WtFT`zfPfV#Dy9o5JrGS&B|;Q67Z=dw>&3oBOTwPM-H!F?&0w6rK~t;DF9m^E|4 z+eT|!b&tijehmo1(-14Bzx#4jeJU9_4YW~SJsd930Qp%6xTm#vl;F1w^6fO~u&{ROZI>!FU%78U@bFJiLne7|xd_as z)UngE5w?1AzU!6(k^(T#mbDy>wKN0>zAu5&%vn9xn-H^4E%zRX6+T1ADd5TLrZT7mvA;;`2-=k@Xm8K9J0^5tfcriJnuv!r-zwxf1 zn6^=TTL#`WC?TpO7dRBXX}WJjZ@@s+;0OTFRBU{% z88EYU_o|1@xfIzRwn5qD9Ys8a^9wInP26<34Es)5wXdv?$K6Z&pmEp3NGSGS z|5oi64iY^bX!|iW#GBv*pB)N~%1~g41-KYphgc%>XSVMSgzov}NhffG=i}wIbHUz3 zL_wcf>_yuWUEz;L>|U} zx+J3SRi@BFp1}l zv=ZOE-Mv!a47Rw>OXSXqh)Y9Xww|2t&)6%fucji#h}D1YKToR1q4uW7^nqW?o?AAEjfEQ!j*6qvdH8Qg5|KB;S!bkNG$ZW@M=yvB;@Hl3(c4Dajk$1OZBFxV9Y9?Ui6!}|0!o%)VY#IoOmPz} zks(RvNRkP%&|E{{20UYI(d+XJ!=HwDOD1Gv_p{#GKj8pE@A9 z<;o!##aKRhE0)Rm_eWF+yHUP^s&l4pP=s~lqHkPc*jIk$AA&=@BruHgMhjo+Qm}=% zb!w~Wp2)Cz(Orq{H#vTcF)9XBt6cLaIFMyM0!8amZ#+*7(+ybXUrjpC-UgR3?20AR zIhbHSg|Ub158iflXKi2*LR{L_WDx`xrd8AVl-ydtl;36-DH=lr)$1`OFe3&DDuzgg z_YDCF6)_eB6cZ4_6Sxa6(7ZTkxBXPd{Ej?>HZU Date: Fri, 28 Jun 2024 16:22:21 +0300 Subject: [PATCH 06/16] Outbound text frames support (#6) * WebSocketFrameFactory: add text frames support. * WebSocketFrameFactory.Encoder: add text frames support. * WebSocketFrameFactory.BulkEncoder: add text frames support. --- .../http/websocketx/WebSocketCodecTest.java | 530 +++++++++++++++++- .../websocketx/MaskingWebSocketEncoder.java | 67 ++- .../NonMaskingWebSocketEncoder.java | 62 +- .../websocketx/WebSocketFrameFactory.java | 31 + 4 files changed, 668 insertions(+), 22 deletions(-) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java index e4a4dd7..d725aac 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -84,7 +84,7 @@ void tearDown() { @ParameterizedTest void binaryFramesEncoder(boolean mask) throws Exception { int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesEncoderClientHandler clientHandler = new BinaryFramesEncoderClientHandler(maxFrameSize); Channel client = @@ -98,12 +98,31 @@ void binaryFramesEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesEncoder(boolean mask) throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesEncoderClientHandler clientHandler = + new TextFramesEncoderClientHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory.Encoder encoder = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(encoder).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @ValueSource(booleans = {true, false}) @ParameterizedTest void binaryFramesBulkEncoder(boolean mask) throws Exception { int maxFrameSize = 1000; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesEncoderClientBulkHandler clientHandler = new BinaryFramesEncoderClientBulkHandler(maxFrameSize); Channel client = @@ -117,6 +136,44 @@ void binaryFramesBulkEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesBulkEncoder(boolean mask) throws Exception { + int maxFrameSize = 1000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesEncoderClientBulkHandler clientHandler = + new TextFramesEncoderClientBulkHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory.BulkEncoder encoder = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(encoder).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFactory(boolean mask) throws Exception { + int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + TextFramesFactoryClientHandler clientHandler = + new TextFramesFactoryClientHandler(maxFrameSize, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory frameFactory = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(frameFactory).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @MethodSource("maskingArgs") @ParameterizedTest @@ -124,7 +181,7 @@ void allSizeBinaryFramesDefaultDecoder( boolean mask, Class webSocketFrameFactoryType, Class webSocketDecoderType) throws Exception { int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); Channel client = webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); @@ -142,7 +199,7 @@ void allSizeBinaryFramesDefaultDecoder( @Test void binaryFramesSmallDecoder() throws Exception { int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; - Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), false, false); + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), false, false); BinaryFramesTestClientHandler clientHandler = new BinaryFramesTestClientHandler(maxFrameSize); Channel client = webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); @@ -450,7 +507,7 @@ protected void initChannel(SocketChannel ch) { WebSocketDecoderConfig.newBuilder() .expectMaskedFrames(expectMaskedFrames) .allowMaskMismatch(allowMaskMismatch) - .withUTF8Validator(false) + .withUTF8Validator(true) .allowExtensions(false) .maxFramePayloadLength(65535) .build(); @@ -610,6 +667,159 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } + static class TextFramesEncoderClientBulkHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private final int framesCount; + private final char expectedAsciiChar; + private WebSocketFrameFactory.BulkEncoder textFrameEncoder; + private int receivedFrames; + private int sentFrames; + private ByteBuf outBuffer; + private volatile ChannelHandlerContext ctx; + + TextFramesEncoderClientBulkHandler(int maxFrameSize, char expectedAsciiChar) { + this.framesCount = maxFrameSize; + this.expectedAsciiChar = expectedAsciiChar; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.textFrameEncoder = webSocketFrameFactory.bulkEncoder(); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame with unexpected content: " + + ch + + ", expected: " + + expectedAsciiChar)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + int bufferSize = 4 * framesCount; + this.outBuffer = ctx.alloc().buffer(bufferSize, bufferSize); + onHandshakeComplete.complete(textFrameEncoder); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + ByteBuf out = outBuffer; + if (out != null) { + outBuffer = null; + out.release(); + } + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + WebSocketFrameFactory.BulkEncoder frameEncoder = textFrameEncoder; + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + int frameSize = frameEncoder.sizeofTextFrame(payloadSize); + ByteBuf out = outBuffer; + if (frameSize > out.capacity() - out.writerIndex()) { + int readableBytes = out.readableBytes(); + int bufferSize = 4 * framesCount; + outBuffer = c.alloc().buffer(bufferSize, bufferSize); + if (c.channel().bytesBeforeUnwritable() < readableBytes) { + c.writeAndFlush(out, c.voidPromise()); + } else { + c.write(out, c.voidPromise()); + } + out = outBuffer; + } + int mask = frameEncoder.encodeTextFramePrefix(out, payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + out.writeByte(expectedAsciiChar); + } + frameEncoder.maskTextFrame(out, mask, payloadSize); + sentFrames++; + } + ByteBuf out = outBuffer; + if (out.readableBytes() > 0) { + c.writeAndFlush(out, c.voidPromise()); + } else { + c.flush(); + } + } + } + static class BinaryFramesEncoderClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { private final CompletableFuture onHandshakeComplete = @@ -759,6 +969,314 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } + static class TextFramesEncoderClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory.Encoder textFrameEncoder; + private final int framesCount; + private final char expectedAsciiChar; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; + + TextFramesEncoderClientHandler(int maxFrameSize, char expectedAsciiChar) { + this.framesCount = maxFrameSize; + this.expectedAsciiChar = expectedAsciiChar; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.textFrameEncoder = webSocketFrameFactory.encoder(); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame with unexpected content: " + + ch + + ", expected: " + + expectedAsciiChar)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(textFrameEncoder); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory.Encoder frameEncoder = textFrameEncoder; + boolean pendingFlush = false; + ByteBufAllocator allocator = c.alloc(); + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + int frameSize = frameEncoder.sizeofTextFrame(payloadSize); + ByteBuf textFrame = allocator.buffer(frameSize); + textFrame.writerIndex(frameSize - payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + textFrame.writeByte(expectedAsciiChar); + } + ByteBuf maskedTextFrame = frameEncoder.encodeTextFrame(textFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < textFrame.capacity()) { + c.writeAndFlush(maskedTextFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedTextFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + + static class TextFramesFactoryClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory frameFactory; + private final int framesCount; + private final char expectedAsciiChar; + private int receivedFrames; + private int sentFrames; + private volatile ChannelHandlerContext ctx; + + TextFramesFactoryClientHandler(int maxFrameSize, char expectedAsciiChar) { + this.framesCount = maxFrameSize; + this.expectedAsciiChar = expectedAsciiChar; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.frameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-text frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + char ch = (char) payload.readByte(); + if (ch != expectedAsciiChar) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame with unexpected content: " + + ch + + ", expected: " + + expectedAsciiChar)); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(frameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + Channel ch = c.channel(); + WebSocketFrameFactory factory = frameFactory; + boolean pendingFlush = false; + ByteBufAllocator allocator = c.alloc(); + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + ByteBuf textFrame = factory.createTextFrame(allocator, payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + textFrame.writeByte(expectedAsciiChar); + } + ByteBuf maskedTextFrame = factory.mask(textFrame); + sentFrames++; + if (ch.bytesBeforeUnwritable() < textFrame.capacity()) { + c.writeAndFlush(maskedTextFrame, c.voidPromise()); + pendingFlush = false; + if (!ch.isWritable()) { + return; + } + } else { + c.write(maskedTextFrame, c.voidPromise()); + pendingFlush = true; + } + } + if (pendingFlush) { + c.flush(); + } + } + } + static class BinaryFramesTestClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { private final CompletableFuture onHandshakeComplete = @@ -1186,7 +1704,7 @@ private void sendFrames(ChannelHandlerContext c, int toSend) { } } - static class BinaryFramesTestServerHandler extends ChannelInboundHandlerAdapter { + static class WebSocketFramesTestServerHandler extends ChannelInboundHandlerAdapter { boolean ready = true; boolean pendingFlush; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java index a587daa..dde1868 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java @@ -20,6 +20,7 @@ import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_CLOSE; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PING; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PONG; +import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_TEXT; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -55,6 +56,8 @@ static class FrameFactory static final int PREFIX_SIZE_SMALL = 6; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int TEXT_FRAME_SMALL = + OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; @@ -65,27 +68,38 @@ static class FrameFactory static final int PREFIX_SIZE_MEDIUM = 8; static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final WebSocketFrameFactory INSTANCE = new FrameFactory(); - @Override - public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + static ByteBuf createDataFrame( + ByteBufAllocator allocator, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { return allocator .buffer(PREFIX_SIZE_SMALL + payloadSize) - .writeShort(BINARY_FRAME_SMALL | payloadSize) + .writeShort(prefixSmall | payloadSize) .readerIndex(2) .writeInt(mask()); } else if (payloadSize <= 65_535) { return allocator .buffer(PREFIX_SIZE_MEDIUM + payloadSize) - .writeLong((long) (BINARY_FRAME_MEDIUM | payloadSize) << 32 | mask()) + .writeLong((long) (prefixMedium | payloadSize) << 32 | mask()) .readerIndex(4); } else { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } } + @Override + public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf createTextFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -155,11 +169,20 @@ public BulkEncoder bulkEncoder() { @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { + return encodeDataFrame(binaryFrame, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeTextFrame(ByteBuf textFrame) { + return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 6; if (frameSize <= 125 + smallPrefixSize) { int payloadSize = frameSize - smallPrefixSize; - binaryFrame.setShort(0, BINARY_FRAME_SMALL | payloadSize); + binaryFrame.setShort(0, prefixSmall | payloadSize); int mask = mask(); binaryFrame.setInt(2, mask); return mask(mask, binaryFrame, smallPrefixSize, binaryFrame.writerIndex()); @@ -169,7 +192,7 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { if (frameSize <= 65_535 + mediumPrefixSize) { int payloadSize = frameSize - mediumPrefixSize; int mask = mask(); - binaryFrame.setLong(0, ((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask); + binaryFrame.setLong(0, ((prefixMedium | (long) payloadSize) << 32) | mask); return mask(mask, binaryFrame, mediumPrefixSize, binaryFrame.writerIndex()); } int payloadSize = frameSize - 12; @@ -178,8 +201,18 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { @Override public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + return encodeDataFramePrefix(byteBuf, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + return encodeDataFramePrefix(byteBuf, textPayloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static int encodeDataFramePrefix( + ByteBuf byteBuf, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { - byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + byteBuf.writeShort(prefixSmall | payloadSize); int mask = mask(); byteBuf.writeInt(mask); return mask; @@ -187,7 +220,7 @@ public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { if (payloadSize <= 65_535) { int mask = mask(); - byteBuf.writeLong(((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask); + byteBuf.writeLong(((prefixMedium | (long) payloadSize) << 32) | mask); return mask; } throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); @@ -195,6 +228,15 @@ public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { @Override public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { + return maskDataFrame(byteBuf, mask, payloadSize); + } + + @Override + public ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + return maskDataFrame(byteBuf, mask, textPayloadSize); + } + + static ByteBuf maskDataFrame(ByteBuf byteBuf, int mask, int payloadSize) { int end = byteBuf.writerIndex(); int start = end - payloadSize; return mask(mask, byteBuf, start, end); @@ -202,6 +244,15 @@ public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { @Override public int sizeofBinaryFrame(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + @Override + public int sizeofTextFrame(int textPayloadSize) { + return sizeOfDataFrame(textPayloadSize); + } + + static int sizeOfDataFrame(int payloadSize) { if (payloadSize <= 125) { return payloadSize + 6; } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java index a120f7e..1734904 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java @@ -20,6 +20,7 @@ import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_CLOSE; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PING; import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_PONG; +import static com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketProtocol.OPCODE_TEXT; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -53,6 +54,7 @@ static class FrameFactory WebSocketFrameFactory.BulkEncoder { static final int PREFIX_SIZE_SMALL = 2; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15; + static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15; static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15; static final int PING_FRAME = OPCODE_PING << 8 | /*FIN*/ (byte) 1 << 15; @@ -60,25 +62,36 @@ static class FrameFactory static final int PREFIX_SIZE_MEDIUM = 4; static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final WebSocketFrameFactory INSTANCE = new FrameFactory(); - @Override - public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + static ByteBuf createDataFrame( + ByteBufAllocator allocator, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { return allocator .buffer(PREFIX_SIZE_SMALL + payloadSize) - .writeShort(BINARY_FRAME_SMALL | payloadSize); + .writeShort(prefixSmall | payloadSize); } if (payloadSize <= 65_535) { return allocator .buffer(PREFIX_SIZE_MEDIUM + payloadSize) - .writeInt(BINARY_FRAME_MEDIUM | payloadSize); + .writeInt(prefixMedium | payloadSize); } throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } + @Override + public ByteBuf createBinaryFrame(ByteBufAllocator allocator, int payloadSize) { + return createDataFrame(allocator, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame(allocator, textDataSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -136,17 +149,26 @@ public BulkEncoder bulkEncoder() { @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { + return encodeDataFrame(binaryFrame, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public ByteBuf encodeTextFrame(ByteBuf textFrame) { + return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 2; if (frameSize <= 125 + smallPrefixSize) { int payloadSize = frameSize - smallPrefixSize; - return binaryFrame.setShort(0, BINARY_FRAME_SMALL | payloadSize); + return binaryFrame.setShort(0, prefixSmall | payloadSize); } int mediumPrefixSize = 4; if (frameSize <= 65_535 + mediumPrefixSize) { int payloadSize = frameSize - mediumPrefixSize; - return binaryFrame.setInt(0, BINARY_FRAME_MEDIUM | payloadSize); + return binaryFrame.setInt(0, prefixMedium | payloadSize); } int payloadSize = frameSize - 8; throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); @@ -154,10 +176,20 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { @Override public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + return encodeDataFramePrefix(byteBuf, payloadSize, BINARY_FRAME_SMALL, BINARY_FRAME_MEDIUM); + } + + @Override + public int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + return encodeDataFramePrefix(byteBuf, textPayloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); + } + + static int encodeDataFramePrefix( + ByteBuf byteBuf, int payloadSize, int prefixSmall, int prefixMedium) { if (payloadSize <= 125) { - byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + byteBuf.writeShort(prefixSmall | payloadSize); } else if (payloadSize <= 65_535) { - byteBuf.writeInt(BINARY_FRAME_MEDIUM | payloadSize); + byteBuf.writeInt(prefixMedium | payloadSize); } else { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } @@ -169,8 +201,22 @@ public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { return byteBuf; } + @Override + public ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + return byteBuf; + } + @Override public int sizeofBinaryFrame(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + + @Override + public int sizeofTextFrame(int textPayloadSize) { + return sizeOfDataFrame(textPayloadSize); + } + + static int sizeOfDataFrame(int payloadSize) { if (payloadSize <= 125) { return payloadSize + 2; } diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java index bbc8ee8..63d19a7 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java @@ -27,6 +27,11 @@ public interface WebSocketFrameFactory { ByteBuf createBinaryFrame(ByteBufAllocator allocator, int binaryDataSize); + default ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createTextFrame() not implemented"); + } + ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason); ByteBuf createPingFrame(ByteBufAllocator allocator, int binaryDataSize); @@ -47,6 +52,16 @@ interface Encoder { ByteBuf encodeBinaryFrame(ByteBuf binaryFrame); int sizeofBinaryFrame(int payloadSize); + + default ByteBuf encodeTextFrame(ByteBuf textFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.encodeTextFrame() not implemented"); + } + + default int sizeofTextFrame(int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.sizeofTextFrame() not implemented"); + } } /** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */ @@ -58,5 +73,21 @@ interface BulkEncoder { ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize); int sizeofBinaryFrame(int payloadSize); + + /** @return frame mask, or -1 if masking not applicable */ + default int encodeTextFramePrefix(ByteBuf byteBuf, int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.encodeTextFramePrefix() not implemented"); + } + + default ByteBuf maskTextFrame(ByteBuf byteBuf, int mask, int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.maskTextFrame() not implemented"); + } + + default int sizeofTextFrame(int textPayloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.BulkEncoder.sizeofTextFrame() not implemented"); + } } } From 3b8ff36d8e2b383f9de7f2172f6b00eca00938eb Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 2 Jul 2024 07:40:07 +0300 Subject: [PATCH 07/16] Add utf8 validation utility for text frames payload --- .../websocketx/WebSocketValidationTest.java | 38 ++++++++ .../websocketx/WebSocketFrameListener.java | 90 +++++++++++++++++++ 2 files changed, 128 insertions(+) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java index b4bdb5c..09439e8 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java @@ -19,6 +19,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -37,7 +39,9 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -268,6 +272,40 @@ void invalidFragmentCompletion() throws Exception { } } + @Test + void utf8TextFrameValidator() { + ByteBufAllocator alloc = ByteBufAllocator.DEFAULT; + List utf8 = + Arrays.asList( + ByteBufUtil.writeUtf8(alloc, "ab"), + ByteBufUtil.writeUtf8(alloc, "c"), + ByteBufUtil.writeUtf8(alloc, "def"), + ByteBufUtil.writeUtf8(alloc, "ghijk"), + ByteBufUtil.writeUtf8(alloc, "lmn")); + ByteBuf nonUtf8 = alloc.buffer(2).writeByte(0xc3).writeByte(0x28); + + WebSocketFrameListener.Utf8FrameValidator validator = + WebSocketFrameListener.Utf8FrameValidator.create(); + + try { + Assertions.assertThat(validator.validateTextFrame(utf8.get(0))).isTrue(); + Assertions.assertThat(validator.state).isEqualTo(0); + Assertions.assertThat(validator.codep).isEqualTo(0); + Assertions.assertThat(validator.validateTextFragmentStart(utf8.get(1))).isTrue(); + Assertions.assertThat(validator.validateFragmentContinuation(utf8.get(2))).isTrue(); + Assertions.assertThat(validator.validateFragmentEnd(utf8.get(3))).isTrue(); + Assertions.assertThat(validator.state).isEqualTo(0); + Assertions.assertThat(validator.codep).isEqualTo(0); + Assertions.assertThat(validator.validateTextFrame(utf8.get(4))).isTrue(); + Assertions.assertThat(validator.validateTextFrame(nonUtf8)).isFalse(); + } finally { + for (ByteBuf string : utf8) { + string.release(); + } + nonUtf8.release(); + } + } + static WebSocketDecoderConfig decoderConfig(int maxFramePayloadLength) { return WebSocketDecoderConfig.newBuilder() .allowMaskMismatch(true) diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java index 7c5bfdf..3842e0b 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.ByteProcessor; import io.netty.util.CharsetUtil; /** @@ -72,4 +73,93 @@ public static String reason(ByteBuf payload) { Short.BYTES, payload.readableBytes() - Short.BYTES, CharsetUtil.UTF_8); } } + + /** + * UTF8 finite state machine based implementation from + * https://bjoern.hoehrmann.de/utf-8/decoder/dfa/ + */ + final class Utf8FrameValidator implements ByteProcessor { + public static final int UTF8_VALIDATION_ERROR_CODE = 1007; + public static final String UTF8_VALIDATION_ERROR_MESSAGE = + "inbound text frame with non-utf8 contents"; + + private static final int UTF8_ACCEPT = 0; + private static final int UTF8_REJECT = 12; + + private static final byte[] TYPES = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, + 9, 9, 9, 9, 9, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, + 7, 7, 7, 7, 7, 7, 8, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + 2, 2, 2, 2, 2, 2, 2, 10, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 3, 3, 11, 6, 6, 6, 5, 8, 8, 8, + 8, 8, 8, 8, 8, 8, 8, 8 + }; + + private static final byte[] STATES = { + 0, 12, 24, 36, 60, 96, 84, 12, 12, 12, 48, 72, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, + 12, 0, 12, 12, 12, 12, 12, 0, 12, 0, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 24, 12, 12, + 12, 12, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 12, 12, 24, 12, + 12, 12, 12, 12, 12, 12, 12, 12, 36, 12, 36, 12, 12, 12, 36, 12, 12, 12, 12, 12, 36, 12, 36, + 12, 12, 12, 36, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12 + }; + + int state = UTF8_ACCEPT; + int codep; + + private Utf8FrameValidator() {} + + public static Utf8FrameValidator create() { + return new Utf8FrameValidator(); + } + + /** + * @param buffer text frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateTextFrame(ByteBuf buffer) { + buffer.forEachByte(this); + int st = state; + state = UTF8_ACCEPT; + codep = 0; + return st == UTF8_ACCEPT; + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateTextFragmentStart(ByteBuf buffer) { + buffer.forEachByte(this); + return state != UTF8_REJECT; + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateFragmentContinuation(ByteBuf buffer) { + return validateTextFragmentStart(buffer); + } + + /** + * @param buffer text fragment frame payload + * @return true if payload is utf8 encoded, false otherwise + */ + public boolean validateFragmentEnd(ByteBuf buffer) { + return validateTextFrame(buffer); + } + + @Override + public boolean process(byte bufferByte) { + byte type = TYPES[bufferByte & 0xFF]; + int st = state; + codep = st != UTF8_ACCEPT ? bufferByte & 0x3f | codep << 6 : 0xff >> type & bufferByte; + st = state = STATES[st + type]; + + return st != UTF8_REJECT; + } + } } From 715c5a1ea22ecbc702ef2690a56297dffaed6e35 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 2 Jul 2024 08:30:05 +0300 Subject: [PATCH 08/16] WebSocketServerProtocolHandler: avoid double copy on handling websocket handshake result --- .../websocketx/WebSocketServerProtocolHandler.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index 49d0c76..40f7adc 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -20,6 +20,8 @@ import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -182,11 +184,13 @@ private void handleHandshakeResult( if (cause != null) { handshake.tryFailure(cause); if (cause instanceof WebSocketHandshakeException) { + String errorMessage = cause.getMessage(); + ByteBuf errorContent = + errorMessage == null || errorMessage.isEmpty() + ? Unpooled.EMPTY_BUFFER + : ByteBufUtil.writeUtf8(ctx.alloc(), errorMessage); FullHttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, - HttpResponseStatus.BAD_REQUEST, - Unpooled.wrappedBuffer(cause.getMessage().getBytes())); + new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST, errorContent); ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } else { ctx.fireExceptionCaught(cause); From fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 2 Jul 2024 17:24:44 +0300 Subject: [PATCH 09/16] WebSocketFrameFactory, WebSocketFrameFactory.Encoder: add fragmentation support for outbound binary and text frames. (#7) --- .../http/websocketx/WebSocketCodecTest.java | 683 +++++++++++++++++- .../websocketx/MaskingWebSocketEncoder.java | 72 ++ .../NonMaskingWebSocketEncoder.java | 71 ++ .../websocketx/WebSocketFrameFactory.java | 53 +- 4 files changed, 869 insertions(+), 10 deletions(-) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java index d725aac..4ee69c0 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -155,6 +155,78 @@ void textFramesBulkEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationEncoderClientHandler clientHandler = + new OutboundBinaryFragmentationEncoderClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationClientHandler clientHandler = + new OutboundBinaryFragmentationClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationEncoderClientHandler clientHandler = + new OutboundTextFragmentationEncoderClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationClientHandler clientHandler = + new OutboundTextFragmentationClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @ValueSource(booleans = {true, false}) @ParameterizedTest @@ -393,8 +465,8 @@ void fragmentDefaultDecoder( int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(1500), mask, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(3333); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(3333); Channel client = webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); @@ -413,8 +485,8 @@ void fragmentSmallDecoder() throws Exception { int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(33), false, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(70); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(70); Channel client = webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); @@ -1517,7 +1589,7 @@ public void onOpen(ChannelHandlerContext ctx) { } } - static class FragmentationFramesTestClientHandler + static class InboundFragmentationFramesTestClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); @@ -1525,7 +1597,7 @@ static class FragmentationFramesTestClientHandler WebSocketFrameFactory webSocketFrameFactory; volatile ChannelHandlerContext ctx; - FragmentationFramesTestClientHandler(int frameSize) { + InboundFragmentationFramesTestClientHandler(int frameSize) { this.frameSize = frameSize; } @@ -1638,6 +1710,605 @@ private void sendFrames(ChannelHandlerContext c) { } } + static class OutboundBinaryFragmentationEncoderClientHandler + extends OutboundBinaryFragmentationClientHandler { + OutboundBinaryFragmentationEncoderClientHandler(int frameSize) { + super(frameSize); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, byte content, int size) { + int frameSize = encoder.sizeofBinaryFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + ByteBuf shortFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationEncoderClientHandler + extends OutboundTextFragmentationClientHandler { + + OutboundTextFragmentationEncoderClientHandler(int frameSize, char expectedContent) { + super(frameSize, expectedContent); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, char content, int size) { + int frameSize = encoder.sizeofTextFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + char expected = expectedContent; + ByteBuf shortFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ByteBuf longFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + final char expectedContent; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundTextFragmentationClientHandler(int frameSize, char expectedContent) { + this.frameSize = frameSize; + this.expectedContent = expectedContent; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-text opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, char content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + + static class OutboundBinaryFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundBinaryFragmentationClientHandler(int frameSize) { + this.frameSize = frameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-binary opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, byte content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + static class TextFramesTestServerHandler extends ChannelInboundHandlerAdapter { final String content; final int framesCount; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java index dde1868..3ecd570 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java @@ -59,6 +59,12 @@ static class FrameFactory static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8 | /*MASK*/ (byte) 1 << 7; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8 | /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ + (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; static final int PING_FRAME = @@ -70,6 +76,15 @@ static class FrameFactory static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; + static final WebSocketFrameFactory INSTANCE = new FrameFactory(); static ByteBuf createDataFrame( @@ -100,6 +115,33 @@ public ByteBuf createTextFrame(ByteBufAllocator allocator, int payloadSize) { return createDataFrame(allocator, payloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -177,6 +219,36 @@ public ByteBuf encodeTextFrame(ByteBuf textFrame) { return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 6; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java index 1734904..dfd3fde 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java @@ -56,6 +56,11 @@ static class FrameFactory static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15; static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15; + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = 0; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ (byte) 1 << 15; + static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15; static final int PING_FRAME = OPCODE_PING << 8 | /*FIN*/ (byte) 1 << 15; static final int PONG_FRAME = OPCODE_PONG << 8 | /*FIN*/ (byte) 1 << 15; @@ -64,6 +69,15 @@ static class FrameFactory static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; + static final WebSocketFrameFactory INSTANCE = new FrameFactory(); static ByteBuf createDataFrame( @@ -92,6 +106,33 @@ public ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { return createDataFrame(allocator, textDataSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -157,6 +198,36 @@ public ByteBuf encodeTextFrame(ByteBuf textFrame) { return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 2; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java index 63d19a7..39cab66 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java @@ -20,8 +20,8 @@ import io.netty.buffer.ByteBufAllocator; /** - * Creates frame ByteBufs containing webSocket prefix. It is user's responsibility to call ByteBuf - * mask(ByteBuf) after frame payload is written. + * Creates frame bytebuffers containing webSocket prefix. It is user's responsibility to call + * ByteBuf mask(ByteBuf) after data frame payload is written. */ public interface WebSocketFrameFactory { @@ -32,6 +32,26 @@ default ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { "WebSocketFrameFactory.createTextFrame() not implemented"); } + default ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createBinaryFragmentStart() not implemented"); + } + + default ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createTextFragmentStart() not implemented"); + } + + default ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragment() not implemented"); + } + + default ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragmentEnd() not implemented"); + } + ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason); ByteBuf createPingFrame(ByteBufAllocator allocator, int binaryDataSize); @@ -46,7 +66,7 @@ default BulkEncoder bulkEncoder() { throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented"); } - /** Encodes prefix of single binary websocket frame into provided bytebuffer. */ + /** Encodes prefix of single data websocket frame into provided bytebuffer. */ interface Encoder { ByteBuf encodeBinaryFrame(ByteBuf binaryFrame); @@ -62,9 +82,34 @@ default int sizeofTextFrame(int textPayloadSize) { throw new UnsupportedOperationException( "WebSocketFrameFactory.Encoder.sizeofTextFrame() not implemented"); } + + default ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeBinaryFragmentStart() not implemented"); + } + + default ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeTextFragmentStart() not implemented"); + } + + default ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragment() not implemented"); + } + + default ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragmentEnd() not implemented"); + } + + default int sizeofFragment(int payloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.sizeofFragment() not implemented"); + } } - /** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */ + /** Encodes prefixes of multiple data websocket frames into provided bytebuffer. */ interface BulkEncoder { /** @return frame mask, or -1 if masking not applicable */ From 5b715667f28ef022d1a1435554a0cc26f8542dfa Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 2 Jul 2024 17:59:51 +0300 Subject: [PATCH 10/16] readme --- README.md | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index b151b6f..caf9744 100644 --- a/README.md +++ b/README.md @@ -5,30 +5,33 @@ Alternative Netty implementation of [RFC6455](https://tools.ietf.org/html/rfc6455) - the WebSocket protocol. -Its advantage is significant per-core throughput improvement (1.8 - 2x) for small frames in comparison to netty's out-of-the-box -websocket codecs, and minimal heap allocations on frame path. Library is compatible with +Its advantages are significant per-core throughput improvement (1.8 - 2x) for small frames compared to netty's out-of-the-box +websocket codecs, minimal heap allocations on frame path, and compatibility with [netty-websocket-http2](https://github.com/jauntsdn/netty-websocket-http2). ### use case & scope -* Intended for efficiently encoded, dense binary data: no extensions (compression) support / outbound text frames / inbound -utf8 validation. +* Intended for dense binary data & small text messages: no extensions (compression) support. + +* No per-frame heap allocations in websocket frameFactory / decoder. * Library assumes small frames - many have payload <= 125 bytes, most are < 1500, maximum supported is 65k (65535 bytes). -* Just codec - fragments, pings, close frames are decoded & validated only. It is responsibility of user code +* Just codec - fragments, pings, close frames are decoded & protocol validated only. It is responsibility of user code to handle frames according to protocol (reassemble frame fragments, perform graceful close, -respond to pings). - -* Dedicated decoder for case of exchanging tiny messages over TLS connection: -only non-masked frames with <= 125 bytes of payload for minimal per-webSocket state (memory) overhead. - -* No per-frame heap allocations in websocket frameFactory / decoder. +respond to pings) and do utf8 validation of inbound text frames ([utility](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java#L81) is provided). * Single-threaded (transport IO event-loop) callbacks / frame factory API - -in practice user code has its own message types to carry data, external means (e.g. mpsc / spsc queues) may be used to +in practice user code has its own message types to carry data, and external means (e.g. mpsc / spsc queues) may be used to properly publish messages on eventloop thread. +* On encoder side 3 use cases are supported: frame factory [[1]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L1475) (create bytebuffer and encode frame prefix), +frame encoder [[2]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L1019) (encode frame prefix into provided bytebuffer), +frame bulk-encoder [[3]](https://github.com/jauntsdn/netty-websocket-http1/blob/fb7bbb12d4fc0e62a72845dee89fe8f1d86f9a0a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java#L707) (much more performant - encode multiple frames into provided bytebuffer). + +* Dedicated decoder for case of exchanging tiny messages over TLS connection: +only non-masked frames with <= 125 bytes of payload for minimal per-webSocket state (memory) overhead. + ### performance Per-core throughput [this codec perf-test](https://github.com/jauntsdn/netty-websocket-http1/tree/develop/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest), @@ -77,6 +80,11 @@ to create outbound frames. It is library user responsibility to mask outbound fr public interface WebSocketFrameFactory { ByteBuf createBinaryFrame(ByteBufAllocator allocator, int binaryDataSize); + + // ByteBuf createTextFrame(ByteBufAllocator allocator, int binaryDataSize); + + // ByteBuf create*Fragment*(ByteBufAllocator allocator, int textDataSize); + // create*Frame are omitted for control frames, created in similar fashion ByteBuf mask(ByteBuf frame); From badceaf1fb576b3465fdf6b19657f0d37a0ae1fb Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 11 Jul 2024 01:01:05 +0300 Subject: [PATCH 11/16] WebSocketServerProtocolHandler, WebSocketClientProtocolHandler: make WebSocketCallbacksHandler optional, instead provided via channel handler after websocket handshake completion --- .../websocketx/WebSocketHandshakeTest.java | 201 ++++++++++++++++-- .../WebSocketClientProtocolHandler.java | 14 +- .../WebSocketServerProtocolHandler.java | 17 +- 3 files changed, 196 insertions(+), 36 deletions(-) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java index cca27cf..61bf79b 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketHandshakeTest.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -172,26 +173,6 @@ void smallDecoderConfig() throws Exception { client.close(); } - @Test - void clientBuilderMissingHandler() { - org.junit.jupiter.api.Assertions.assertThrows( - IllegalStateException.class, - () -> { - WebSocketClientProtocolHandler clientProtocolHandler = - WebSocketClientProtocolHandler.create().build(); - }); - } - - @Test - void serverBuilderMissingHandler() { - org.junit.jupiter.api.Assertions.assertThrows( - IllegalStateException.class, - () -> { - WebSocketServerProtocolHandler serverProtocolHandler = - WebSocketServerProtocolHandler.create().build(); - }); - } - @Timeout(15) @Test void clientTimeout() throws InterruptedException { @@ -309,6 +290,74 @@ protected void initChannel(SocketChannel ch) { Assertions.assertThat(client.isOpen()).isFalse(); } + @Test + void noCallbackHandlerHandshake() throws Exception { + String path = "/test"; + NoCallbackServerHandler noCallbackServerHandler = new NoCallbackServerHandler(); + NoCallbackClientHandler noCallbackClientHandler = new NoCallbackClientHandler(); + + Channel s = + server = + new ServerBootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioServerSocketChannel.class) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + HttpServerCodec http1Codec = new HttpServerCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketServerProtocolHandler webSocketProtocolHandler = + WebSocketServerProtocolHandler.create() + .path(path) + .decoderConfig(webSocketDecoderConfig(true, true, 125)) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + http1Codec, + http1Aggregator, + webSocketProtocolHandler, + noCallbackServerHandler); + } + }) + .bind("localhost", 0) + .sync() + .channel(); + + Channel client = + new Bootstrap() + .group(new NioEventLoopGroup(1)) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + HttpClientCodec http1Codec = new HttpClientCodec(); + HttpObjectAggregator http1Aggregator = new HttpObjectAggregator(65536); + WebSocketClientProtocolHandler webSocketProtocolHandler = + WebSocketClientProtocolHandler.create() + .path(path) + .allowMaskMismatch(true) + .maxFramePayloadLength(125) + .mask(true) + .build(); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast( + http1Codec, + http1Aggregator, + webSocketProtocolHandler, + noCallbackClientHandler); + } + }) + .connect(s.localAddress()) + .sync() + .channel(); + + noCallbackClientHandler.exchangeCompleted.get(5, TimeUnit.SECONDS); + } + @SuppressWarnings("deprecation") @Timeout(15) @Test @@ -565,6 +614,118 @@ public void onClose(ChannelHandlerContext ctx) { } } + private static class NoCallbackClientHandler extends ChannelInboundHandlerAdapter { + Promise exchangeCompleted; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + exchangeCompleted = ctx.newPromise(); + super.handlerAdded(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + exchangeCompleted.tryFailure(new ClosedChannelException()); + } + + @SuppressWarnings("Convert2Lambda") + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt + == io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler + .ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { + WebSocketCallbacksHandler.exchange( + ctx, + new WebSocketCallbacksHandler() { + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + ctx.writeAndFlush( + webSocketFrameFactory.mask( + webSocketFrameFactory.createBinaryFrame(ctx.alloc(), 1).writeByte(0xFE))); + + return new WebSocketFrameListener() { + @Override + public void onChannelRead( + ChannelHandlerContext context, + boolean finalFragment, + int rsv, + int opcode, + ByteBuf payload) { + int readableBytes = payload.readableBytes(); + if (readableBytes != 1) { + payload.release(); + exchangeCompleted.setFailure( + new IllegalStateException("unexpected payload size: " + readableBytes)); + return; + } + byte content = payload.readByte(); + if (content != (byte) 0xFE) { + payload.release(); + exchangeCompleted.setFailure( + new IllegalStateException( + "unexpected payload content: " + Integer.toHexString(content))); + return; + } + payload.release(); + exchangeCompleted.setSuccess(null); + } + }; + } + }); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.safeRelease(msg); + } + } + + private static class NoCallbackServerHandler extends ChannelInboundHandlerAdapter { + + @SuppressWarnings("Convert2Lambda") + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt + instanceof + io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete) { + WebSocketCallbacksHandler.exchange( + ctx, + new WebSocketCallbacksHandler() { + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + return new WebSocketFrameListener() { + @Override + public void onChannelRead( + ChannelHandlerContext context, + boolean finalFragment, + int rsv, + int opcode, + ByteBuf payload) { + ByteBuf binaryFrame = + webSocketFrameFactory.mask( + webSocketFrameFactory.createBinaryFrame( + ctx.alloc(), payload.readableBytes())); + binaryFrame.writeBytes(payload); + payload.release(); + ctx.writeAndFlush(binaryFrame); + } + }; + } + }); + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.safeRelease(msg); + } + } + static WebSocketDecoderConfig webSocketDecoderConfig( boolean expectMasked, boolean allowMaskMismatch, int maxFramePayloadLength) { return WebSocketDecoderConfig.newBuilder() diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java index fdda21c..3e3a812 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java @@ -63,7 +63,7 @@ private WebSocketClientProtocolHandler( boolean allowMaskMismatch, int maxFramePayloadLength, long handshakeTimeoutMillis, - WebSocketCallbacksHandler webSocketHandler) { + @Nullable WebSocketCallbacksHandler webSocketHandler) { this.address = address; this.path = path; this.subprotocol = subprotocol; @@ -183,7 +183,10 @@ private void completeHandshake(ChannelHandlerContext ctx, FullHttpResponse respo cancelHandshakeTimeout(); } ctx.pipeline().remove(this); - WebSocketCallbacksHandler.exchange(ctx, webSocketHandler); + WebSocketCallbacksHandler handler = webSocketHandler; + if (handler != null) { + WebSocketCallbacksHandler.exchange(ctx, handler); + } handshakeCompleted.trySuccess(); ctx.fireUserEventTriggered( io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler @@ -343,16 +346,13 @@ public Builder handshakeTimeoutMillis(long handshakeTimeoutMillis) { * @param webSocketHandler handler to process successfully handshaked webSocket * @return this Builder instance */ - public Builder webSocketHandler(WebSocketCallbacksHandler webSocketHandler) { - this.webSocketHandler = Objects.requireNonNull(webSocketHandler, "webSocketHandler"); + public Builder webSocketHandler(@Nullable WebSocketCallbacksHandler webSocketHandler) { + this.webSocketHandler = webSocketHandler; return this; } /** @return new WebSocketClientProtocolHandler instance */ public WebSocketClientProtocolHandler build() { - if (webSocketHandler == null) { - throw new IllegalStateException("webSocketHandler was not provided"); - } int maxPayloadLength = maxFramePayloadLength; boolean maskMismatch = allowMaskMismatch; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index 40f7adc..9811a6d 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -67,7 +67,7 @@ private WebSocketServerProtocolHandler( String subprotocols, WebSocketDecoderConfig webSocketDecoderConfig, long handshakeTimeoutMillis, - WebSocketCallbacksHandler webSocketHandler) { + @Nullable WebSocketCallbacksHandler webSocketHandler) { this.path = path; this.subprotocols = subprotocols; this.decoderConfig = webSocketDecoderConfig; @@ -197,7 +197,10 @@ private void handleHandshakeResult( ctx.close(); } } else { - WebSocketCallbacksHandler.exchange(ctx, webSocketHandler); + WebSocketCallbacksHandler handler = webSocketHandler; + if (handler != null) { + WebSocketCallbacksHandler.exchange(ctx, handler); + } handshake.trySuccess(); ChannelPipeline p = ctx.channel().pipeline(); p.fireUserEventTriggered( @@ -303,19 +306,15 @@ public Builder handshakeTimeoutMillis(long handshakeTimeoutMillis) { * @param webSocketHandler handler to process successfully handshaked webSocket * @return this Builder instance */ - public Builder webSocketCallbacksHandler(WebSocketCallbacksHandler webSocketHandler) { - this.webSocketCallbacksHandler = Objects.requireNonNull(webSocketHandler, "webSocketHandler"); + public Builder webSocketCallbacksHandler(@Nullable WebSocketCallbacksHandler webSocketHandler) { + this.webSocketCallbacksHandler = webSocketHandler; return this; } /** @return new WebSocketServerProtocolHandler instance */ public WebSocketServerProtocolHandler build() { - WebSocketCallbacksHandler handler = webSocketCallbacksHandler; - if (handler == null) { - throw new IllegalStateException("webSocketCallbacksHandler was not provided"); - } return new WebSocketServerProtocolHandler( - path, subprotocols, decoderConfig, handshakeTimeoutMillis, handler); + path, subprotocols, decoderConfig, handshakeTimeoutMillis, webSocketCallbacksHandler); } private static long requirePositive(long val, String desc) { From 139a5d0de53131e3d883f753d8e0201bddac5807 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sat, 20 Jul 2024 20:55:29 +0300 Subject: [PATCH 12/16] Utf8FrameValidator: optimize for ASCII content --- .../websocketx/WebSocketValidationTest.java | 37 +++++++++++ .../websocketx/WebSocketFrameListener.java | 61 ++++++++++++++++--- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java index 09439e8..9843801 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketValidationTest.java @@ -39,6 +39,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -272,6 +273,42 @@ void invalidFragmentCompletion() throws Exception { } } + @Test + void utf8Validator() { + String ascii = "Are those shy Eurasian footwear, cowboy chaps, or jolly earthmoving headgear"; + String utf8 = "Чуєш їх, доцю, га? Кумедна ж ти, прощайся без ґольфів!"; + List asciiList = stringList(ByteBufAllocator.DEFAULT, ascii); + List utf8List = stringList(ByteBufAllocator.DEFAULT, utf8); + try { + WebSocketFrameListener.Utf8FrameValidator validator = + WebSocketFrameListener.Utf8FrameValidator.create(); + for (ByteBuf byteBuf : asciiList) { + Assertions.assertThat(validator.validateTextFrame(byteBuf)).isTrue(); + } + for (ByteBuf byteBuf : utf8List) { + Assertions.assertThat(validator.validateTextFrame(byteBuf)).isTrue(); + } + } finally { + for (ByteBuf byteBuf : asciiList) { + byteBuf.release(); + } + for (ByteBuf byteBuf : utf8List) { + byteBuf.release(); + } + } + } + + static List stringList(ByteBufAllocator allocator, String string) { + int length = string.length(); + List list = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + String substring = string.substring(0, i + 1); + ByteBuf byteBuf = ByteBufUtil.writeUtf8(allocator, substring); + list.add(byteBuf); + } + return list; + } + @Test void utf8TextFrameValidator() { ByteBufAllocator alloc = ByteBufAllocator.DEFAULT; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java index 3842e0b..9dd7957 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameListener.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.ByteProcessor; import io.netty.util.CharsetUtil; /** @@ -76,9 +75,9 @@ public static String reason(ByteBuf payload) { /** * UTF8 finite state machine based implementation from - * https://bjoern.hoehrmann.de/utf-8/decoder/dfa/ + * https://bjoern.hoehrmann.de/utf-8/decoder/dfa/ optimized for ASCII content. */ - final class Utf8FrameValidator implements ByteProcessor { + final class Utf8FrameValidator { public static final int UTF8_VALIDATION_ERROR_CODE = 1007; public static final String UTF8_VALIDATION_ERROR_MESSAGE = "inbound text frame with non-utf8 contents"; @@ -120,7 +119,7 @@ public static Utf8FrameValidator create() { * @return true if payload is utf8 encoded, false otherwise */ public boolean validateTextFrame(ByteBuf buffer) { - buffer.forEachByte(this); + checkUtf8(buffer); int st = state; state = UTF8_ACCEPT; codep = 0; @@ -132,7 +131,7 @@ public boolean validateTextFrame(ByteBuf buffer) { * @return true if payload is utf8 encoded, false otherwise */ public boolean validateTextFragmentStart(ByteBuf buffer) { - buffer.forEachByte(this); + checkUtf8(buffer); return state != UTF8_REJECT; } @@ -152,8 +151,56 @@ public boolean validateFragmentEnd(ByteBuf buffer) { return validateTextFrame(buffer); } - @Override - public boolean process(byte bufferByte) { + private void checkUtf8(ByteBuf buffer) { + int readableBytes = buffer.readableBytes(); + int from = buffer.readerIndex(); + int to = from + readableBytes; + boolean cont = true; + int step = Long.BYTES; + while (to - from >= step) { + long bytes = buffer.getLong(from); + if ( + /*is non-ascii*/ (bytes & 0x8080808080808080L) != 0) { + for (int i = 0; i < step; i++) { + byte b = (byte) ((bytes >> 8 * (step - (i + 1))) & 0xFF); + cont = checkUtf8(b); + if (!cont) { + break; + } + } + } + from += step; + } + if (cont) { + step = Integer.BYTES; + while (to - from >= step) { + int bytes = buffer.getInt(from); + if ( + /*is non-ascii*/ (bytes & 0x80808080) != 0) { + for (int i = 0; i < step; i++) { + byte b = (byte) ((bytes >> 8 * (step - (i + 1))) & 0xFF); + cont = checkUtf8(b); + if (!cont) { + break; + } + } + } + from += step; + } + } + if (cont) { + while (to - from >= 1) { + byte b = buffer.getByte(from); + cont = checkUtf8(b); + if (!cont) { + break; + } + from += 1; + } + } + } + + private boolean checkUtf8(byte bufferByte) { byte type = TYPES[bufferByte & 0xFF]; int st = state; codep = st != UTF8_ACCEPT ? bufferByte & 0x3f | codep << 6 : 0xff >> type & bufferByte; From 3166e7052bedd28043a152fb3036a8f29f2d1f8c Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 23 Jul 2024 15:16:36 +0300 Subject: [PATCH 13/16] ci: update os --- .github/workflows/ci-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml index fbea04c..4b80974 100644 --- a/.github/workflows/ci-build.yml +++ b/.github/workflows/ci-build.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: - os: [ ubuntu-20.04, macos-11, windows-2019 ] + os: [ ubuntu-20.04, macos-12, windows-2019 ] jdk: [ 8, 11, 17, 21 ] fail-fast: false From 4fb24d9ab56d0b416709391258608bd1a213f80a Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 23 Jul 2024 15:38:31 +0300 Subject: [PATCH 14/16] soak test: remove redundant start scripts --- netty-websocket-http1-soaktest/build.gradle | 10 ---------- soak_client.sh | 3 --- soak_server.sh | 3 --- 3 files changed, 16 deletions(-) delete mode 100755 soak_client.sh delete mode 100755 soak_server.sh diff --git a/netty-websocket-http1-soaktest/build.gradle b/netty-websocket-http1-soaktest/build.gradle index e8e5d8a..83523b1 100644 --- a/netty-websocket-http1-soaktest/build.gradle +++ b/netty-websocket-http1-soaktest/build.gradle @@ -33,16 +33,6 @@ dependencies { runtimeOnly "ch.qos.logback:logback-classic" } -task runServer(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.server.Main" -} - -task runClient(type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.client.Main" -} - task serverScripts(type: CreateStartScripts) { mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.soaktest.server.Main" applicationName = "${project.name}-server" diff --git a/soak_client.sh b/soak_client.sh deleted file mode 100755 index 73d13df..0000000 --- a/soak_client.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -./gradlew netty-websocket-http1-soaktest:runClient \ No newline at end of file diff --git a/soak_server.sh b/soak_server.sh deleted file mode 100755 index 8df1e31..0000000 --- a/soak_server.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -./gradlew netty-websocket-http1-soaktest:runServer \ No newline at end of file From 17bec65aa06dfaad5a7028326df15187b1dc4d0f Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Wed, 24 Jul 2024 06:26:35 +0300 Subject: [PATCH 15/16] update dependencies --- gradle.properties | 2 +- netty-websocket-http1-test/gradle.lockfile | 20 ++++++++++---------- netty-websocket-http1/gradle.lockfile | 16 ++++++++-------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/gradle.properties b/gradle.properties index a04c92e..cbd2d35 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,7 +7,7 @@ gitPluginVersion=0.13.0 osDetectorPluginVersion=1.7.3 versionsPluginVersion=0.45.0 -nettyVersion=4.1.110.Final +nettyVersion=4.1.112.Final nettyTcnativeVersion=2.0.65.Final hdrHistogramVersion=2.1.12 slf4jVersion=1.7.36 diff --git a/netty-websocket-http1-test/gradle.lockfile b/netty-websocket-http1-test/gradle.lockfile index 0cd39b2..e24ff6c 100644 --- a/netty-websocket-http1-test/gradle.lockfile +++ b/netty-websocket-http1-test/gradle.lockfile @@ -9,16 +9,16 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec-http:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-codec:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-common:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-handler:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-resolver:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-epoll:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-classes-kqueue:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport-native-unix-common:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath -io.netty:netty-transport:4.1.110.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec-http:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-epoll:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-kqueue:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-native-unix-common:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.112.Final=compileClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.14.11=testCompileClasspath,testRuntimeClasspath org.apiguardian:apiguardian-api:1.1.2=testCompileClasspath org.assertj:assertj-core:3.25.3=testCompileClasspath,testRuntimeClasspath diff --git a/netty-websocket-http1/gradle.lockfile b/netty-websocket-http1/gradle.lockfile index 054601b..9a50314 100644 --- a/netty-websocket-http1/gradle.lockfile +++ b/netty-websocket-http1/gradle.lockfile @@ -7,13 +7,13 @@ com.google.errorprone:javac-shaded:9+181-r4173-1=googleJavaFormat1.6 com.google.googlejavaformat:google-java-format:1.6=googleJavaFormat1.6 com.google.guava:guava:22.0=googleJavaFormat1.6 com.google.j2objc:j2objc-annotations:1.1=googleJavaFormat1.6 -io.netty:netty-buffer:4.1.110.Final=compileClasspath -io.netty:netty-codec-http:4.1.110.Final=compileClasspath -io.netty:netty-codec:4.1.110.Final=compileClasspath -io.netty:netty-common:4.1.110.Final=compileClasspath -io.netty:netty-handler:4.1.110.Final=compileClasspath -io.netty:netty-resolver:4.1.110.Final=compileClasspath -io.netty:netty-transport-native-unix-common:4.1.110.Final=compileClasspath -io.netty:netty-transport:4.1.110.Final=compileClasspath +io.netty:netty-buffer:4.1.112.Final=compileClasspath +io.netty:netty-codec-http:4.1.112.Final=compileClasspath +io.netty:netty-codec:4.1.112.Final=compileClasspath +io.netty:netty-common:4.1.112.Final=compileClasspath +io.netty:netty-handler:4.1.112.Final=compileClasspath +io.netty:netty-resolver:4.1.112.Final=compileClasspath +io.netty:netty-transport-native-unix-common:4.1.112.Final=compileClasspath +io.netty:netty-transport:4.1.112.Final=compileClasspath org.codehaus.mojo:animal-sniffer-annotations:1.14=googleJavaFormat1.6 empty=annotationProcessor From 5c1f545469860e4b7b0d57476e9cd4413d9d7183 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 25 Jul 2024 06:03:10 +0300 Subject: [PATCH 16/16] update version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index cbd2d35..40aa9bb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group=com.jauntsdn.netty -version=1.1.5 +version=1.2.0 googleJavaFormatPluginVersion=0.9 dependencyManagementPluginVersion=1.1.0