diff --git a/.github/workflows/ci-unwelcome-words.yml b/.github/workflows/ci-unwelcome-words.yml new file mode 100644 index 0000000..a369f6e --- /dev/null +++ b/.github/workflows/ci-unwelcome-words.yml @@ -0,0 +1,23 @@ +name: CI + +on: + pull_request: + +jobs: + test: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + with: + ref: ${{ github.sha }} + - name: Checkout tool + uses: actions/checkout@v2 + with: + repository: exactpro-th2/ci-github-action + ref: master + token: ${{ secrets.PAT_CI_ACTION }} + path: ci-github-action + - name: Run CI action + uses: ./ci-github-action + with: + ref: ${{ github.sha }} \ No newline at end of file diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/dev-docker-publish.yml new file mode 100644 index 0000000..03c796d --- /dev/null +++ b/.github/workflows/dev-docker-publish.yml @@ -0,0 +1,47 @@ +name: Dev build and publish Docker distributions to Github Container Registry ghcr.io + +on: + push: + branches-ignore: + - master + - version-* + paths-ignore: + - README.md + +jobs: + build: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + # Prepare custom build version + - name: Get branch name + id: branch + run: echo ::set-output name=branch_name::${GITHUB_REF#refs/*/} + - name: Get release_version + id: ver + uses: christian-draeger/read-properties@1.0.1 + with: + path: gradle.properties + property: release_version + - name: Build custom release version + id: release_ver + run: echo ::set-output name=value::"${{ steps.ver.outputs.value }}-${{ steps.branch.outputs.branch_name }}-${{ github.run_id }}" + - name: Show custom release version + run: echo ${{ steps.release_ver.outputs.value }} + # Build and publish image + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.CR_PAT }} + - run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')" + id: meta + - name: Build and push + id: docker_build + uses: docker/build-push-action@v2 + with: + push: true + tags: ghcr.io/${{ github.repository }}:${{ steps.release_ver.outputs.value }} + labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.ver.outputs.value }} \ No newline at end of file diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 0000000..9f51020 --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,37 @@ +name: Build and publish Docker distributions to Github Container Registry ghcr.io + +on: + push: + branches: + - master + - version-* + paths: + - gradle.properties + +jobs: + build: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.CR_PAT }} + - run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')" + id: meta + - name: Read version from gradle.properties + id: read_property + uses: christian-draeger/read-properties@1.0.1 + with: + path: ./gradle.properties + property: release_version + - name: Build and push + id: docker_build + uses: docker/build-push-action@v2 + with: + push: true + tags: ghcr.io/${{ github.repository }}:${{ steps.read_property.outputs.value }} + labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.read_property.outputs.value }} \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..7bedeca --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,7 @@ +variables: + APP_NAME: th2-codec-json + +include: + - project: vivarium/th2/pipelines + ref: v2 + file: /.gitlab-ci-java.yml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9b5e925 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM gradle:7.1.0-jdk11 AS build +ARG release_version +COPY ./ . +RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} + +FROM adoptopenjdk/openjdk11:alpine +WORKDIR /home +COPY --from=build /home/gradle/build/docker . +ENTRYPOINT ["/home/service/bin/service"] \ No newline at end of file diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..3b503f0 --- /dev/null +++ b/build.gradle @@ -0,0 +1,106 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'java' + id 'org.jetbrains.kotlin.jvm' version '1.5.31' + id 'com.palantir.docker' version '0.25.0' +} + +apply plugin: 'application' +apply plugin: 'com.palantir.docker' +apply plugin: 'kotlin-kapt' + +group 'com.exactpro.th2' +version release_version + +repositories { + mavenCentral() + + maven { + name 'Sonatype_snapshots' + url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' + } + + maven { + name 'Sonatype_releases' + url 'https://s01.oss.sonatype.org/content/repositories/releases/' + } + + mavenLocal() + + configurations.all { + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' + resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' + } +} + +dependencies { + api platform('com.exactpro.th2:bom:3.1.0') + implementation 'com.google.auto.service:auto-service:1.0.1' + + implementation 'com.exactpro.th2:common:3.32.0' + implementation 'com.exactpro.th2:conn-dirty-tcp-core:0.0.5' + + implementation 'io.netty:netty-all:4.1.72.Final' + + implementation 'io.github.microutils:kotlin-logging:2.1.21' + + implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.6.0' + + testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" + + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2' + + annotationProcessor 'com.google.auto.service:auto-service:1.0.1' + kapt 'com.google.auto.service:auto-service:1.0.1' +} + +test { + useJUnitPlatform() +} + +application { + mainClassName 'com.exactpro.th2.conn.dirty.tcp.core.Main' +} + +applicationName = 'service' + +distTar { + archiveName "${applicationName}.tar" +} + +dockerPrepare { + dependsOn distTar +} + +docker { + copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) +} + +compileKotlin { + kotlinOptions { + jvmTarget = '11' + } +} + +compileTestKotlin { + kotlinOptions { + jvmTarget = '11' + } +} + diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..e7564ef --- /dev/null +++ b/gradle.properties @@ -0,0 +1 @@ +release_version=0.0.1 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..7454180 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..69a9715 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 0000000..744e882 --- /dev/null +++ b/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MSYS* | MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..c0047b4 --- /dev/null +++ b/settings.gradle @@ -0,0 +1,2 @@ +rootProject.name = 'th2-conn-dirty-pillar' + diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandler.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandler.kt new file mode 100644 index 0000000..b077a6e --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandler.kt @@ -0,0 +1,360 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler + +import com.exactpro.th2.conn.dirty.pillar.handler.util.* +import com.exactpro.th2.conn.dirty.tcp.core.api.IChannel +import com.exactpro.th2.conn.dirty.tcp.core.api.IContext +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandler +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings +import io.netty.buffer.ByteBuf +import mu.KotlinLogging +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +class PillarHandler(private val context: IContext): IProtocolHandler { + + private var state = AtomicReference(State.SESSION_CLOSE) + private val executor = Executors.newScheduledThreadPool(1) + private var clientFuture: Future<*>? = CompletableFuture.completedFuture(null) + private var serverFuture: Future<*>? = CompletableFuture.completedFuture(null) + private lateinit var streamIdRead: StreamIdEncode + private lateinit var streamIdWrite: StreamIdEncode + private var settings: PillarHandlerSettings + lateinit var channel: IChannel + private var connectRead = AtomicReference(OpenType.CLOSE) + private var connectWrite = AtomicReference(OpenType.CLOSE) + private var serverSeqNum = AtomicInteger(0) + private var clientSeqNum = AtomicInteger(0) + private val seqNum = 1000 + + init{ + settings = context.settings as PillarHandlerSettings + require(settings.heartbeatInterval > 0) { "Heartbeat sending interval must be greater than zero." } + require(settings.streamAvailInterval > 0) { "StreamAvail sending interval must be greater than zero." } + } + + override fun onOpen() { + if (state.compareAndSet(State.SESSION_CLOSE, State.SESSION_CREATED)) { + LOGGER.info { "Setting a new state -> ${state.get()}." } + + channel = context.channel + channel.send(Login(settings).login(), messageMetadata(MessageType.LOGIN), IChannel.SendMode.MANGLE) + LOGGER.info { "Message has been sent to server - Login." } + serverFuture?.cancel(false) + serverFuture = + executor.schedule(this::reconnect, settings.streamAvailInterval, TimeUnit.MILLISECONDS) + + clientFuture?.cancel(false) + clientFuture = executor.scheduleWithFixedDelay(this::startSendHeartBeats, settings.heartbeatInterval, settings.heartbeatInterval, TimeUnit.MILLISECONDS) + + LOGGER.info { "Handler is connected." } + } else LOGGER.info { "Failed to set a new state. ${State.SESSION_CLOSE} -> ${state.get()}." } + } + + override fun onReceive(buffer: ByteBuf): ByteBuf? { + val bufferLength = buffer.readableBytes() + val offset = buffer.readerIndex() + if (bufferLength == 0) { + LOGGER.warn { "Cannot parse empty buffer." } + return null + } + + if (bufferLength < 4){ + LOGGER.warn { "Not enough bytes to read header." } + return null + } + + buffer.markReaderIndex() + val messageType = buffer.readUnsignedShortLE() + val messageLength = buffer.readUnsignedShortLE() + buffer.resetReaderIndex() + + if (!MessageType.contains(messageType)){ + LOGGER.error { "Message type is not supported. Type: $messageType, length: $messageLength" } + return null + } + + if(bufferLength < messageLength) { + LOGGER.warn { "Buffer length is less than the declared one: $bufferLength -> $messageLength." } + return null + } + + if(bufferLength > messageLength) { + LOGGER.info { "Buffer length is longer than the declared one: $bufferLength -> $messageLength." } + buffer.readerIndex(buffer.writerIndex()) + return buffer.retainedSlice(offset, messageLength) + } + + buffer.readerIndex(buffer.writerIndex()) + return buffer.retainedSlice(offset, messageLength) + } + + override fun onIncoming(message: ByteBuf): Map { + val msgHeader = MsgHeader(message) + + when (val msgType = msgHeader.type) { + MessageType.LOGIN_RESPONSE.type -> { + return checkLoginResponse(message) + } + + MessageType.STREAM_AVAIL.type -> { + return checkStreamAvail(message) + } + + MessageType.OPEN_RESPONSE.type -> { + return checkOpenResponse(message) + } + + MessageType.SEQMSG.type -> { + return checkSeqMsg(message, msgHeader) + } + + MessageType.CLOSE_RESPONSE.type -> { + return checkCloseResponse(message) + } + + else -> error("Message type is not supported: $msgType.") + } + } + + private fun checkLoginResponse(message: ByteBuf): Map { + val loginResponse = LoginResponse(message) + + LOGGER.info { "Type message - LoginResponse: $loginResponse" } + when (val status = Status.getStatus(loginResponse.status)) { + Status.OK -> { + LOGGER.info { "Login successful. Server start sending heartbeats." } + + if (state.compareAndSet( + State.SESSION_CREATED, + State.LOGGED_IN + ) + ) { + LOGGER.info("Setting a new state -> ${state.get()}.") + serverFuture?.cancel(false) + serverFuture = + executor.schedule(this::receivedHeartBeats, settings.streamAvailInterval, TimeUnit.MILLISECONDS) + } else LOGGER.info { "Failed to set a new state. ${State.LOGGED_IN} -> ${state.get()}." } + } + Status.NOT_LOGGED_IN -> { + if (!state.compareAndSet(State.SESSION_CREATED, State.SESSION_CLOSE)) + LOGGER.info { "Failed to set a new state. ${State.SESSION_CLOSE} -> ${state.get()}." } + stopSendHeartBeats() + LOGGER.info("Received `not logged in` status. Fall in to error state.") + channel.send(Login(settings).login(), messageMetadata(MessageType.LOGIN), IChannel.SendMode.MANGLE) + } + else -> error("Received $status status.") + } + return messageMetadata(MessageType.LOGIN_RESPONSE) + } + + private fun checkStreamAvail(message: ByteBuf): Map { + serverFuture?.cancel(false) + serverFuture = + executor.schedule(this::receivedHeartBeats, settings.streamAvailInterval, TimeUnit.MILLISECONDS) + + val streamAvail = StreamAvail(message) + + LOGGER.info { "Type message - StreamAvail: $streamAvail" } + message.readerIndex(4) + val streamIdAvail = StreamIdEncode(StreamId(message)) + val nextSeq = streamAvail.nextSeq.toInt() + + if (streamAvail.streamId.streamType == StreamType.REF.value || streamAvail.streamId.streamType == StreamType.GT.value) { + if (connectRead.compareAndSet(OpenType.CLOSE, OpenType.SENT) || connectRead.get() == OpenType.SENT) { + streamIdRead = streamIdAvail + serverSeqNum.getAndSet(nextSeq) + channel.send( + Open( + streamIdRead.streamId, + nextSeq, + nextSeq + seqNum + ).open(), + messageMetadata(MessageType.OPEN), IChannel.SendMode.MANGLE + ) + } + } else if (streamAvail.streamId.streamType == StreamType.TG.value) { + if (connectWrite.compareAndSet(OpenType.CLOSE, OpenType.SENT) || connectWrite.get() == OpenType.SENT) { + streamIdWrite = streamIdAvail + clientSeqNum.getAndSet(nextSeq) + channel.send( + Open( + streamIdWrite.streamId, + nextSeq, + nextSeq + seqNum + ).open(), + messageMetadata(MessageType.OPEN), IChannel.SendMode.MANGLE + ) + } + } + + return messageMetadata(MessageType.STREAM_AVAIL) + } + + private fun checkOpenResponse(message: ByteBuf): Map { + + val openResponse = OpenResponse(message) + LOGGER.info { "Type message - OpenResponse: $openResponse" } + + when (Status.getStatus(openResponse.status)) { + Status.OK -> { + LOGGER.info("Open successful.") + + connectRead.compareAndSet(OpenType.SENT, OpenType.OPEN) + connectWrite.compareAndSet(OpenType.SENT, OpenType.OPEN) + + if (connectRead.get() == OpenType.OPEN && connectWrite.get() == OpenType.OPEN){ + if (state.compareAndSet(State.LOGGED_IN, State.OPEN_IN)) + LOGGER.info { "Setting state -> ${state.get()}" } + } + } + + Status.NO_STREAM_PERMISSION -> LOGGER.warn { "No stream permission." } + else -> error("This is not an OpenResponse status.") + } + + return messageMetadata(MessageType.OPEN_RESPONSE) + } + + private fun checkSeqMsg(message: ByteBuf, header: MsgHeader): Map { + val seqMsg = SeqMsg(message, header.length) + LOGGER.info { "Type message - SeqMsg: $seqMsg" } + serverSeqNum.incrementAndGet() + if(serverSeqNum.get() == seqNum){ + connectRead.compareAndSet(OpenType.OPEN, OpenType.CLOSE) + } + return messageMetadata(MessageType.SEQMSG) + } + + private fun checkCloseResponse(message: ByteBuf): Map { + val closeResponse = CloseResponse(message) + + LOGGER.info { "Type message - CloseResponse: $closeResponse" } + + when (Status.getStatus(closeResponse.status)) { + Status.OK -> { + LOGGER.info("Close successful.") + if (closeResponse.streamId == streamIdRead.streamId) + connectRead.getAndSet(OpenType.CLOSE) + if (closeResponse.streamId == streamIdWrite.streamId) + connectWrite.getAndSet(OpenType.CLOSE) + + if (connectRead.get()==OpenType.CLOSE && connectWrite.get()==OpenType.CLOSE) { + if (state.compareAndSet(State.OPEN_OUT, State.LOGGED_OUT)) { + LOGGER.info { "Setting a new state -> ${state.get()}." } + executor.shutdown() + try { + if (!executor.awaitTermination(3000, TimeUnit.MILLISECONDS)) { + executor.shutdownNow() + } + } catch (e: InterruptedException) { + executor.shutdownNow() + } + } else LOGGER.info { "Failed to set a new state ${State.LOGGED_OUT}." } + } + } + Status.STREAM_NOT_OPEN -> LOGGER.warn { "Stream not close." } + else -> error("This is not an CloseResponse status.") + } + + return messageMetadata(MessageType.CLOSE_RESPONSE) + } + + override fun onOutgoing(message: ByteBuf, metadata: Map): Map { + clientSeqNum.incrementAndGet() + SeqMsgToSend(message, clientSeqNum.get(), streamIdWrite.streamId, metadata).seqMsg() + if(clientSeqNum.get() == seqNum){ + connectWrite.compareAndSet(OpenType.OPEN, OpenType.CLOSE) + } + return metadata + } + + override fun onClose() { + if (state.compareAndSet(State.OPEN_IN, State.SESSION_CLOSE)) { + LOGGER.info { "Setting a new state -> ${state.get()}." } + LOGGER.info { "Handler is disconnected." } + connectRead.getAndSet(OpenType.CLOSE) + connectWrite.getAndSet(OpenType.CLOSE) + } else LOGGER.info { "Failed to set a new state ${State.SESSION_CLOSE}." } + } + + override fun close() { + if (state.compareAndSet(State.OPEN_IN, State.OPEN_OUT)){ + LOGGER.info { "Setting a new state -> ${state.get()}." } + if (connectRead.get() == OpenType.OPEN) { + channel.send( + Close(streamIdRead.streamIdBuf).close(), + messageMetadata(MessageType.CLOSE), + IChannel.SendMode.MANGLE + ) + } + if (connectWrite.get() == OpenType.OPEN) { + channel.send( + Close(streamIdWrite.streamIdBuf).close(), + messageMetadata(MessageType.CLOSE), + IChannel.SendMode.MANGLE + ) + } + LOGGER.info { "Message has been sent to server - Close." } + } else LOGGER.info { "Failed to set a new state ${State.OPEN_OUT}." } + } + + private fun startSendHeartBeats() { + channel.send(Heartbeat().heartbeat, messageMetadata(MessageType.HEARTBEAT), IChannel.SendMode.MANGLE) + LOGGER.info { "Message has been sent to server - HeartBeats." } + } + + private fun stopSendHeartBeats() { + clientFuture?.cancel(false) + } + + private fun reconnect() { + LOGGER.info { "Reconnect to server." } + if (state.compareAndSet(State.NOT_HEARTBEAT, State.SESSION_CREATED)) { + LOGGER.info { "Setting a new state -> ${state.get()}." } + channel.send(Login(settings).login(), messageMetadata(MessageType.LOGIN), IChannel.SendMode.MANGLE) + LOGGER.info { "Message has been sent to server - Login." } + connectRead.getAndSet(OpenType.CLOSE) + connectWrite.getAndSet(OpenType.CLOSE) + } else LOGGER.info { "Failed to set a new state ${State.SESSION_CREATED}." } + } + + private fun receivedHeartBeats() { + serverFuture?.cancel(false) + if (state.compareAndSet(state.get(), State.NOT_HEARTBEAT)) { + LOGGER.warn { "Server stopped sending heartbeat." } + LOGGER.info { "Setting a new state -> ${state.get()}." } + reconnect() + } else LOGGER.info { "Failed to set a new state ${State.NOT_HEARTBEAT}." } + } + + private fun messageMetadata(messageType: MessageType): Map{ + val metadata = mutableMapOf() + metadata[TYPE_FIELD_NAME] = messageType.type.toString() + metadata[LENGTH_FIELD_NAME] = messageType.length.toString() + return metadata + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerFactory.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerFactory.kt new file mode 100644 index 0000000..49ef3e7 --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerFactory.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler + +import com.exactpro.th2.conn.dirty.tcp.core.api.IContext +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandler +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerFactory +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings +import com.google.auto.service.AutoService + +@AutoService(IProtocolHandlerFactory::class) +class PillarHandlerFactory: IProtocolHandlerFactory { + override val name: String = PillarHandlerFactory::class.java.name + + override val settings: Class = PillarHandlerSettings::class.java + + override fun create(context: IContext): IProtocolHandler { + return PillarHandler(context) + } +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerSettings.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerSettings.kt new file mode 100644 index 0000000..93d580b --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarHandlerSettings.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler + +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings + +class PillarHandlerSettings: IProtocolHandlerSettings { + var username: String = "username" + + var password: String = "password" + + var mic: String = "mic" + + var version: String = "1.1" + + var heartbeatInterval: Long = 1000L + + var streamAvailInterval: Long = 5000L +} \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageDecoder.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageDecoder.kt new file mode 100644 index 0000000..8ac83fe --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageDecoder.kt @@ -0,0 +1,193 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler + +import com.exactpro.th2.conn.dirty.pillar.handler.util.MessageType +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufUtil +import org.checkerframework.checker.units.qual.Length +import java.lang.Exception +import java.math.BigDecimal +import java.math.BigInteger +import java.nio.charset.StandardCharsets +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset + +class MsgHeader(byteBuf: ByteBuf) { + val type: Int + val length: Int + + init { + try { + type = byteBuf.readUnsignedShortLE() + length = byteBuf.readUnsignedShortLE() + } + catch (e: Exception){ + throw Exception ("Unable to read header in message: ${ByteBufUtil.hexDump(byteBuf)}", e) + } + } +} + +class StreamId(byteBuf: ByteBuf){ + val envId: Short + val sessNum: Int + var streamType: Byte + val userId: Int + val subId: Int + + init { + envId = byteBuf.readUnsignedByte() + sessNum = byteBuf.readMedium() + streamType = byteBuf.readByte() + userId = byteBuf.readUnsignedShort() + subId = byteBuf.readByte().toInt() + } +} + +class LoginResponse(byteBuf: ByteBuf) { + val username: String + val status: Short + + init { + var offset = byteBuf.readerIndex() + username = byteBuf.readCharSequence(16, StandardCharsets.US_ASCII).toString().trimEnd(0.toChar()) + offset += 16 + byteBuf.readerIndex(offset) + status = byteBuf.readUnsignedByte() + require(byteBuf.readerIndex() == MessageType.LOGIN_RESPONSE.length){ "There are bytes left in buffer to read" } + } + + override fun toString(): String { + return "$username $status" + } +} + +class StreamAvail(byteBuf: ByteBuf) { + val streamId: StreamId + val nextSeq: BigDecimal + val access: Short + + init { + var offset = byteBuf.readerIndex() + streamId = StreamId(byteBuf) + + offset += 8 + byteBuf.readerIndex(offset) + val bytes = ByteArray(8) + byteBuf.readBytes(bytes) + bytes.reverse() + nextSeq = BigInteger(1, bytes).toBigDecimal() + + offset += 8 + byteBuf.readerIndex(offset) + access = byteBuf.readUnsignedByte() + + require(byteBuf.readerIndex() == MessageType.STREAM_AVAIL.length) { "There are bytes left in buffer to read" } + } + + override fun toString(): String { + return "${streamId.envId} ${streamId.sessNum} ${streamId.streamType} ${streamId.userId} ${streamId.subId} $nextSeq $access" + } +} + +class OpenResponse(byteBuf: ByteBuf) { + val streamId: StreamId + val status: Short + val access: Short + + init { + var offset = byteBuf.readerIndex() + streamId = StreamId(byteBuf) + + offset += 8 + byteBuf.readerIndex(offset) + status = byteBuf.readUnsignedByte() + + offset++ + byteBuf.readerIndex(offset) + access = byteBuf.readUnsignedByte() + require(byteBuf.readerIndex() == MessageType.OPEN_RESPONSE.length){ "There are bytes left in buffer to read" } + } + + override fun toString(): String { + return "${streamId.envId} ${streamId.sessNum} ${streamId.streamType} ${streamId.userId} ${streamId.subId} $status $access" + } +} + +class CloseResponse(byteBuf: ByteBuf){ + val streamId: StreamId + val status: Short + + init { + var offset = byteBuf.readerIndex() + streamId = StreamId(byteBuf) + + offset += 8 + byteBuf.readerIndex(offset) + status = byteBuf.readUnsignedByte() + + require(byteBuf.readerIndex() == MessageType.CLOSE_RESPONSE.length){ "There are bytes left in buffer to read" } + } + + override fun toString(): String { + return "${streamId.envId} ${streamId.sessNum} ${streamId.streamType} ${streamId.userId} ${streamId.subId} $status" + } +} + +class SeqMsg(byteBuf: ByteBuf, length: Int) { + val streamId: StreamId + val seq: BigDecimal + val reserved1: ByteBuf + val timestamp: LocalDateTime + val payload: MsgHeader + + init { + var offset = byteBuf.readerIndex() + streamId = StreamId(byteBuf) + + offset += 8 + byteBuf.readerIndex(offset) + val bytes = ByteArray(8) + byteBuf.readBytes(bytes) + bytes.reverse() + seq = BigInteger(1, bytes).toBigDecimal() + + offset += 8 + reserved1 = byteBuf.copy(offset, length - MessageType.SEQMSG.length) + + offset += length - MessageType.SEQMSG.length + byteBuf.readerIndex(offset) + + val time = byteBuf.readLongLE().toULong() + val milliseconds = time / 1_000_000UL + val nanoseconds = time % 1_000_000_000UL + timestamp = LocalDateTime.ofInstant(Instant.ofEpochMilli(milliseconds.toLong()), ZoneOffset.UTC).withNano( + nanoseconds.toInt()) + offset += 8 + byteBuf.readerIndex(offset) + payload = MsgHeader(byteBuf) + + require(byteBuf.readerIndex() == length){ "There are bytes left in buffer to read" } + } + + override fun toString(): String { + return "${streamId.envId} ${streamId.sessNum} ${streamId.streamType} ${streamId.userId} ${streamId.subId} $seq $reserved1 $timestamp" + } +} + + diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageEncoder.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageEncoder.kt new file mode 100644 index 0000000..218f858 --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/PillarMessageEncoder.kt @@ -0,0 +1,184 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler + +import com.exactpro.th2.conn.dirty.pillar.handler.util.* +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import java.time.LocalDateTime +import java.time.ZoneOffset + +class Heartbeat{ + val heartbeat: ByteBuf = Unpooled.buffer(MessageType.HEARTBEAT.length) + + init { + heartbeat.writeShortLE(MessageType.HEARTBEAT.type) + heartbeat.writeShortLE(MessageType.HEARTBEAT.length) + } +} + +class StreamIdEncode(var streamId: StreamId) { + val streamIdBuf: ByteBuf = Unpooled.buffer(8) + + init { + streamIdBuf.writeByte(streamId.envId.toInt()) + streamIdBuf.writeMedium(streamId.sessNum) + streamIdBuf.writeByte(streamId.streamType.toInt()) + streamIdBuf.writeShort(streamId.userId) + streamIdBuf.writeByte(streamId.subId) + } +} + +class Login(settings: PillarHandlerSettings) { + private val type: Int = MessageType.LOGIN.type + private val length: Int = MessageType.LOGIN.length + private val username: ByteArray + private val password: ByteArray + private val mic: ByteArray + private val version: ByteArray + + init { + username = settings.username.encodeToByteArray() + require(username.size <= 16 && username.isNotEmpty()) { "Size of username exceeds allowed size or equal to zero." } + password = settings.password.encodeToByteArray() + require(password.size <= 32 && password.isNotEmpty()) { "Size of password exceeds allowed size or equal to zero." } + mic = settings.mic.encodeToByteArray() + require(mic.size <= 4 && mic.isNotEmpty()) { "Size of mic exceeds allowed size or equal to zero." } + version = settings.version.encodeToByteArray() + require(version.size <= 20 && version.isNotEmpty()) { "Size of version exceeds allowed size or equal to zero." } + } + + fun login (): ByteBuf{ + val loginMessage: ByteBuf = Unpooled.buffer(length) + + loginMessage.markWriterIndex() + loginMessage.writeShortLE(type) + loginMessage.writeShortLE(length) + + loginMessage.writerIndex(4) + loginMessage.writeBytes(username) + + loginMessage.writerIndex(20) + loginMessage.writeBytes(password) + + loginMessage.writerIndex(52) + loginMessage.writeBytes(mic) + + loginMessage.writerIndex(56) + loginMessage.writeBytes(version) + + loginMessage.writerIndex(76) + + require ( loginMessage.writerIndex() == length){ "Message size exceeded." } + return loginMessage + } +} + +class Open(private val streamId: StreamId, + private val startSeq: Int, + private val endSeq: Int){ + private val type: Int = MessageType.OPEN.type + private val length: Int = MessageType.OPEN.length + + fun open(): ByteBuf{ + val openMessage: ByteBuf = Unpooled.buffer(length) + + openMessage.markWriterIndex() + openMessage.writeShortLE(type) + openMessage.writeShortLE(length) + + openMessage.writerIndex(4) + openMessage.writeBytes(StreamIdEncode(streamId).streamIdBuf) + + openMessage.writerIndex(12) + openMessage.writeLongLE(startSeq.toLong()) + openMessage.writerIndex(20) + openMessage.writeLongLE(endSeq.toLong()) //TODO + + openMessage.writerIndex(28) + if (streamId.streamType == StreamType.REF.value || streamId.streamType == StreamType.GT.value) + openMessage.writeByte(Access.READ.value.toInt()) + else if (streamId.streamType == StreamType.TG.value) + openMessage.writeByte(Access.WRITE.value.toInt()) + + openMessage.writeByte(MODE_LOSSY) + + require (openMessage.writerIndex() == length){ "Message size exceeded." } + return openMessage + } +} + +class SeqMsgToSend(private val message: ByteBuf, + private val num: Int, + private val streamId: StreamId, + private val metadata: Map) { + private val type: Int = MessageType.SEQMSG.type + private val length: Int = MessageType.SEQMSG.length + + fun seqMsg() { + val offset = message.readerIndex() + val size = message.readableBytes() + + val buffer: ByteBuf = message.copy(0, size) + message.writerIndex(offset) + message.writeShortLE(type) + message.writeShortLE(length + size) + + message.writeByte(streamId.envId.toInt()) + message.writeMedium(streamId.sessNum) + message.writeByte(streamId.streamType.toInt()) + message.writeShort(streamId.userId) + message.writeByte(streamId.subId) + message.writeByte(num) + message.writerIndex(20) + message.writeBytes(buffer) + + message.writerIndex(20 + size) + val time = LocalDateTime.now() + val seconds = time.toEpochSecond(ZoneOffset.UTC).toULong() + val nanoseconds = time.nano.toULong() + message.writeLongLE((seconds * 1_000_000_000UL + nanoseconds).toLong()) + + message.writerIndex(28 + size) + + if (metadata[TYPE_FIELD_NAME] != null) + message.writeShortLE(metadata[TYPE_FIELD_NAME]!!.toInt()) + else message.writeShortLE(0) + + if (metadata[LENGTH_FIELD_NAME] != null) + message.writeShortLE(metadata[LENGTH_FIELD_NAME]!!.toInt()) + else message.writeShortLE(size) + + require(message.writerIndex() == size + length) { "Message size exceeded." } + } +} + +class Close(private val streamId: ByteBuf) { + private val type: Int = MessageType.CLOSE.type + private val length: Int = MessageType.CLOSE.length + + fun close(): ByteBuf { + val closeMessage: ByteBuf = Unpooled.buffer(length) + + closeMessage.writeShortLE(type) + closeMessage.writeShortLE(length) + closeMessage.writeBytes(streamId) + closeMessage.writerIndex(length) + require (closeMessage.writerIndex() == length){ "Message size exceeded." } + return closeMessage + } +} diff --git a/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/util/PillarUtils.kt b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/util/PillarUtils.kt new file mode 100644 index 0000000..35d8770 --- /dev/null +++ b/src/main/java/com/exactpro/th2/conn/dirty/pillar/handler/util/PillarUtils.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.conn.dirty.pillar.handler.util + +import org.checkerframework.checker.units.qual.A +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.experimental.and +import kotlin.experimental.or + +const val TYPE_FIELD_NAME = "type" +const val LENGTH_FIELD_NAME = "length" +const val MODE_LOSSY = 0 + +enum class State(val value: Int){ + SESSION_CREATED(0), + SESSION_CLOSE(1), + LOGGED_IN(2), + LOGGED_OUT(3), + NOT_HEARTBEAT(4), + OPEN_IN(5), + OPEN_OUT(6); +} + +enum class Status(val value: Short) { + OK(0), + NOT_LOGGED_IN(18), + INVALID_LOGIN_DETAILS(24), + ALREADY_LOGGED_IN(27), + HEARTBEAT_TIMEOUT(28), + LOGIN_TIMEOUT(29), + INVALID_MESSAGE(33), + NO_STREAM_PERMISSION(54), + STREAM_NOT_OPEN(85); + + companion object { + fun getStatus(value: Short): Status { + return values().find { it.value == value }!! + } + } +} + +enum class Access(val value: Short) { + READ(1), + WRITE(2), + THROTTLE_REJECT(4); + + companion object { + fun getAccess(value: Short): Access? { + return values().find { it.value == value } + } + } +} + +enum class StreamType (val value: Byte){ + TG(15), + GT(13), + REF(33), + XDP(27); + companion object { + fun getStream(value: Byte): StreamType? { + return values().find { it.value == value } + } + } +} + +enum class MessageType(val type: Int, val length: Int) { + LOGIN(513, 76), + LOGIN_RESPONSE(514, 21), + STREAM_AVAIL(515, 21), + HEARTBEAT(516, 4), + OPEN(517, 30), + OPEN_RESPONSE(518, 14), + CLOSE(519, 12), + CLOSE_RESPONSE(520, 13), + SEQMSG(2309, 32); + + companion object { + fun getEnum(type: Int): MessageType? { + return values().find { it.type == type } + } + + fun contains(type: Int): Boolean { + return values().find { it.type == type } != null + } + } +} + +enum class OpenType (type: Int){ + CLOSE(0), + SENT(1), + OPEN(2); +} diff --git a/src/test/java/TestHandler.kt b/src/test/java/TestHandler.kt new file mode 100644 index 0000000..5f146f7 --- /dev/null +++ b/src/test/java/TestHandler.kt @@ -0,0 +1,495 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.grpc.EventID +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.schema.dictionary.DictionaryType +import com.exactpro.th2.conn.dirty.pillar.handler.* +import com.exactpro.th2.conn.dirty.pillar.handler.util.LENGTH_FIELD_NAME +import com.exactpro.th2.conn.dirty.pillar.handler.util.TYPE_FIELD_NAME +import com.exactpro.th2.conn.dirty.tcp.core.TaskSequencePool +import com.exactpro.th2.conn.dirty.tcp.core.api.IContext +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandler +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolHandlerSettings +import com.exactpro.th2.conn.dirty.tcp.core.api.IProtocolMangler +import com.exactpro.th2.conn.dirty.tcp.core.api.impl.Channel +import com.exactpro.th2.conn.dirty.tcp.core.api.impl.Context +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.Mockito.mock +import com.nhaarman.mockitokotlin2.mock +import org.junit.jupiter.api.Assertions.* +import java.io.InputStream +import java.math.BigDecimal +import java.math.BigInteger +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.concurrent.Executor +import java.util.concurrent.ScheduledExecutorService + +class TestHandler { + private val address: InetSocketAddress = mock(InetSocketAddress::class.java) + private val secure = true + private val sessionAlias: String = "sessionAlias" + private val reconnectDelay: Long = 1000L + private val handler: IProtocolHandler = mock(IProtocolHandler::class.java) + private val mangler: IProtocolMangler = mock(IProtocolMangler::class.java) + private val onEvent: (event: Event, parentEventId: EventID) -> Unit = mock {} + private val onMessage: (RawMessage) -> Unit = mock { } + private val eventLoopGroup: EventLoopGroup = NioEventLoopGroup() + private val executor: ScheduledExecutorService = mock(ScheduledExecutorService::class.java) + private val sequencePool = TaskSequencePool(executor) + private val parentEventId = EventID.newBuilder().setId("root").build()!! + + private var channel = Channel( + address, + secure, + sessionAlias, + reconnectDelay, + handler, + mangler, + onEvent, + onMessage, + executor, + eventLoopGroup, + sequencePool, + parentEventId + ) + private val handlerSettings = PillarHandlerSettings() + private val readDictionary: (DictionaryType) -> InputStream = mock { } + private val sendEvent: (Event) -> Unit = mock { } + private var context: IContext = Context(handlerSettings, readDictionary, sendEvent) + private var settings = PillarHandlerSettings() + + @Test + fun `sending LoginResponse`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(514) + .writeShortLE(21) + .writeBytes("username".encodeToByteArray()) + .writerIndex(20) + .writeByte(0) + + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + val message = pillarHandler.onReceive(buffer) + assertEquals(21, buffer.readerIndex()) + + val metadata = pillarHandler.onIncoming(message!!) + message.readerIndex(4) + val loginResponseMsg = LoginResponse(message) + + assertEquals(514, metadata[TYPE_FIELD_NAME]!!.toInt()) + assertEquals(21, metadata[LENGTH_FIELD_NAME]!!.toInt()) + assertEquals("username", loginResponseMsg.username) + assertEquals(0, loginResponseMsg.status) + + buffer.writeShortLE(515) + .writeShortLE(21) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writeByte(BigDecimal.valueOf(23).toInt()) + .writerIndex(41) + .writeByte(1) + + pillarHandler.onReceive(buffer) + assertEquals(42, buffer.readerIndex()) + + var copyBuf = buffer.copy(21, 21) + copyBuf.readerIndex(4) + var streamAvail = StreamAvail(copyBuf) + assertEquals(5, streamAvail.streamId.envId) + assertEquals(4259845, streamAvail.streamId.sessNum) + assertEquals(15, streamAvail.streamId.streamType) + assertEquals(40287, streamAvail.streamId.userId) + assertEquals(4, streamAvail.streamId.subId) + assertEquals(BigDecimal.valueOf(23), streamAvail.nextSeq) + assertEquals(1, streamAvail.access) + + + buffer.writeShortLE(515) + .writeShortLE(21) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writeByte(BigDecimal.valueOf(23).toInt()) + .writerIndex(62) + .writeByte(2) + + pillarHandler.onReceive(buffer) + assertEquals(63, buffer.readerIndex()) + + copyBuf = buffer.copy(42, 21) + copyBuf.readerIndex(4) + streamAvail = StreamAvail(copyBuf) + assertEquals(5, streamAvail.streamId.envId) + assertEquals(4259845, streamAvail.streamId.sessNum) + assertEquals(15, streamAvail.streamId.streamType) + assertEquals(40287, streamAvail.streamId.userId) + assertEquals(4, streamAvail.streamId.subId) + assertEquals(BigDecimal.valueOf(23), streamAvail.nextSeq) + assertEquals(2, streamAvail.access) + } + + @Test + fun `sending StreamAvail`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(515) + .writeShortLE(21) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writeByte(BigDecimal.valueOf(23).toInt()) + .writerIndex(20) + .writeByte(1) + + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + pillarHandler.onReceive(buffer) + assertEquals(21, buffer.readerIndex()) + buffer.readerIndex(4) + val streamAvail = StreamAvail(buffer) + assertEquals(5, streamAvail.streamId.envId) + assertEquals(4259845, streamAvail.streamId.sessNum) + assertEquals(15, streamAvail.streamId.streamType) + assertEquals(40287, streamAvail.streamId.userId) + assertEquals(4, streamAvail.streamId.subId) + assertEquals(BigDecimal.valueOf(23), streamAvail.nextSeq) + assertEquals(1, streamAvail.access) + } + + @Test + fun `sending OpenResponse`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(518) + .writeShortLE(14) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writeByte(0) + .writeByte(2) + + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + val message = pillarHandler.onReceive(buffer) + assertEquals(14, buffer.readerIndex()) + + message!!.readerIndex(4) + val openResponseMsg = OpenResponse(message) + + assertEquals(5, openResponseMsg.streamId.envId) + assertEquals(4259845, openResponseMsg.streamId.sessNum) + assertEquals(15, openResponseMsg.streamId.streamType) + assertEquals(40287, openResponseMsg.streamId.userId) + assertEquals(4, openResponseMsg.streamId.subId) + assertEquals(0, openResponseMsg.status) + assertEquals(2, openResponseMsg.access) + } + + @Test + fun `sending SeqMsg`() { + val time = LocalDateTime.parse("2021-12-27T13:39:14.524104") + val seconds = time.toEpochSecond(ZoneOffset.UTC).toULong() + val nanoseconds = time.nano.toULong() + + val reserved = Unpooled.buffer().writeBytes(byteArrayOf(0, 1, 2, 3)) + + val buffer: ByteBuf = Unpooled.buffer() + .writeShortLE(2309) + .writeShortLE(36) + .writeByte(5) + .writeMedium(4259845) + .writeByte(13) + .writeShort(40287) + .writeByte(4) + .writeByte(BigDecimal.valueOf(23).toInt()) + .writerIndex(20) + .writeBytes(reserved) + .writerIndex(24) + .writeLongLE((seconds * 1_000_000_000UL + nanoseconds).toLong()) + .writerIndex(32) + .writeShortLE(22) + .writeShortLE(4) + + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + pillarHandler.onReceive(buffer) + assertEquals(36, buffer.readerIndex()) + + buffer.readerIndex(4) + val seqMsg = SeqMsg(buffer, 36) + + assertEquals(5, seqMsg.streamId.envId) + assertEquals(4259845, seqMsg.streamId.sessNum) + assertEquals(13, seqMsg.streamId.streamType) + assertEquals(40287, seqMsg.streamId.userId) + assertEquals(4, seqMsg.streamId.subId) + assertEquals(BigDecimal.valueOf(23), seqMsg.seq) + //assertEquals(reserved.array(), seqMsg.reserved1.array()) + assertEquals("2021-12-27T13:39:14.524104", seqMsg.timestamp.toString()) + assertEquals(22, seqMsg.payload.type) + assertEquals(4, seqMsg.payload.length) + } + + @Test + fun `assembling Login`() { + val settings = PillarHandlerSettings() + val login = Login(settings).login() + + assertEquals(76, login.writerIndex()) + + assertEquals(513, login.readUnsignedShortLE()) + assertEquals(76, login.readUnsignedShortLE()) + assertEquals("username", login.readCharSequence(16, StandardCharsets.US_ASCII).toString().trimEnd(0.toChar())) + assertEquals("password", login.readCharSequence(32, StandardCharsets.US_ASCII).toString().trimEnd(0.toChar())) + assertEquals("mic", login.readCharSequence(4, StandardCharsets.US_ASCII).toString().trimEnd(0.toChar())) + assertEquals("1.1", login.readCharSequence(20, StandardCharsets.US_ASCII).toString().trimEnd(0.toChar())) + } + + @Test + fun `assembling Open`() { + val stream = byteArrayOf( + 5, 65, 0, 5, 15, -99, 95, 4 + ) + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeBytes(stream) + val open = Open(StreamId(buffer), 23, 9999).open() + + assertEquals(30, open.writerIndex()) + + assertEquals(517, open.readUnsignedShortLE()) + assertEquals(30, open.readUnsignedShortLE()) + assertEquals(5, open.readUnsignedByte()) + assertEquals(4259845, open.readMedium()) + assertEquals(15, open.readByte()) + assertEquals(40287, open.readUnsignedShort()) + assertEquals(4, open.readByte().toInt()) + open.readerIndex(12) + val bytes = ByteArray(8) + open.readBytes(bytes) + bytes.reverse() + assertEquals(BigDecimal.valueOf(23), BigInteger(1, bytes).toBigDecimal()) + open.readerIndex(28) + assertEquals(2, open.readUnsignedByte()) + open.readerIndex(29) + assertEquals(0, open.readUnsignedByte()) + } + + @Test + fun `assembling SeqMsg`() { + val time = LocalDateTime.parse("2021-12-27T13:39:14.524104") + val seconds = time.toEpochSecond(ZoneOffset.UTC).toULong() + val nanoseconds = time.nano.toULong() + + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(2309) + .writeShortLE(32) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writerIndex(12) + .writeByte(BigDecimal.valueOf(23).toInt()) + .writerIndex(20) + .writeLongLE((seconds * 1_000_000_000UL + nanoseconds).toLong()) + .writerIndex(28) + .writeShortLE(22) + .writeShortLE(0) + + buffer.readerIndex(4) + + val seqMsg = SeqMsg(buffer, 32) + val message = Unpooled.buffer() + val metadata = HashMap() +// metadata[TYPE_FIELD_NAME] = 22.toString() +// metadata[LENGTH_FIELD_NAME] = 0.toString() + SeqMsgToSend(message, 1, seqMsg.streamId, metadata).seqMsg() + + assertEquals(32, message.writerIndex()) + + assertEquals(2309, message.readUnsignedShortLE()) + assertEquals(32, message.readUnsignedShortLE()) + assertEquals(5, message.readUnsignedByte()) + assertEquals(4259845, message.readMedium()) + assertEquals(15, message.readByte()) + assertEquals(40287, message.readUnsignedShort()) + assertEquals(4, message.readByte().toInt()) + message.readerIndex(28) + assertEquals(0, message.readUnsignedShortLE()) + assertEquals(0, message.readUnsignedShortLE()) + } + + @Test + fun `assembling Close`() { + val stream = byteArrayOf( + 5, 65, 0, 5, 15, -99, 95, 4 + ) + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeBytes(stream) + val streamId = StreamIdEncode(StreamId(buffer)) + val close = Close(streamId.streamIdBuf).close() + + assertEquals(12, close.writerIndex()) + + assertEquals(519, close.readUnsignedShortLE()) + assertEquals(12, close.readUnsignedShortLE()) + assertEquals(5, close.readUnsignedByte()) + assertEquals(4259845, close.readMedium()) + assertEquals(15, close.readByte()) + assertEquals(40287, close.readUnsignedShort()) + assertEquals(4, close.readByte().toInt()) + } + + @Test + fun `invalid size message`() { + val buffer: ByteBuf = Unpooled.buffer(4) + buffer.writeShortLE(2309) + .writeByte(32) + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + assertNull(pillarHandler.onReceive(buffer)) + assertEquals(0, buffer.readerIndex()) + } + + @Test + fun `empty message`() { + val buffer: ByteBuf = Unpooled.buffer() + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + assertNull(pillarHandler.onReceive(buffer)) + assertEquals(0, buffer.readerIndex()) + } + + @Test + fun `message size is less than 4`() { + val buffer: ByteBuf = Unpooled.buffer(3) + buffer.writeShortLE(514) + .writeByte(21) + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + assertNull(pillarHandler.onReceive(buffer)) + assertEquals(0, buffer.readerIndex()) + } + + @Test + fun `send onOutgoing`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(517) + .writeShortLE(21) + .writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + + val streamId: ByteBuf = Unpooled.buffer().writeByte(5) + .writeMedium(4259845) + .writeByte(15) + .writeShort(40287) + .writeByte(4) + .writeByte(BigDecimal.valueOf(23).toInt()) + + val metadata = mutableMapOf() + + metadata[TYPE_FIELD_NAME] = 500.toString() + metadata[LENGTH_FIELD_NAME] = 12.toString() + + SeqMsgToSend(buffer, 1, StreamId(streamId), metadata) + + assertNotNull(buffer) + } + + + @Test + fun `invalid type message`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(500) + .writeShortLE(32) + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + assertNull(pillarHandler.onReceive(buffer)) + assertEquals(0, buffer.readerIndex()) + } + + @Test + fun `invalid heartbeat interval`() { + handlerSettings.heartbeatInterval = 0 + val exception = assertThrows { PillarHandler(context) } + assertEquals("Heartbeat sending interval must be greater than zero.", exception.message) + } + + @Test + fun `invalid status in LoginResponse`() { + val buffer: ByteBuf = Unpooled.buffer() + buffer.writeShortLE(514) + .writeShortLE(21) + .writeBytes("username".encodeToByteArray()) + .writerIndex(20) + .writeByte(85) + + val pillarHandler = PillarHandler(context) + pillarHandler.channel = channel + val message = pillarHandler.onReceive(buffer) + val exception = assertThrows { pillarHandler.onIncoming(message!!) } + assertEquals("Received STREAM_NOT_OPEN status.", exception.message) + } + + @Test + fun `invalid username Login`() { + settings.username = "username_username" + settings.password = String() + val exception = assertThrows { Login(settings) } + assertEquals("Size of username exceeds allowed size or equal to zero.", exception.message) + } + + @Test + fun `invalid password Login`() { + settings.password = String() + val exception = assertThrows { Login(settings) } + assertEquals("Size of password exceeds allowed size or equal to zero.", exception.message) + } + + @Test + fun `invalid mic Login`() { + settings.mic = "mic_mic" + val exception = assertThrows { Login(settings) } + assertEquals("Size of mic exceeds allowed size or equal to zero.", exception.message) + } + + @Test + fun `invalid version Login`() { + settings.version = String() + val exception = assertThrows { Login(settings) } + assertEquals("Size of version exceeds allowed size or equal to zero.", exception.message) + } +}