Skip to content

Commit

Permalink
Merge pull request #136 from lensesio-dev/chore/refactor-simplify-sta…
Browse files Browse the repository at this point in the history
…te-management

HTTP Sink - Avoid State Conflicts
  • Loading branch information
davidsloan authored Oct 16, 2024
2 parents 6b3805f + 4cad579 commit 5b910a3
Show file tree
Hide file tree
Showing 11 changed files with 640 additions and 241 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/func-only.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Functional Tests Only

on:
workflow_dispatch:
pull_request:

jobs:
run-func-tests:
timeout-minutes: 30
#runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
matrix:
module: ["http"]
connectImageVersion: [ 7.3.1, 6.2.2 ]

steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: 'sbt'
- name: Get version
id: version
shell: bash
run: |
if [ "${{ inputs.version }}" != "" ]; then
echo "version=${{ inputs.version }}" >> $GITHUB_OUTPUT
else
echo "version=$(git describe --tags --always)" >> $GITHUB_OUTPUT
fi
- name: Build assembly
env:
JVM_OPTS: -Xmx3200m
VERSION: ${{ steps.version.outputs.version }}
run: sbt "project ${{ matrix.module }};set assembly / test := {}" assembly
- name: Run tests
run: sbt "project ${{ matrix.module }}" fun:test
env:
JVM_OPTS: -Xmx3200m
CONNECT_IMAGE_VERSION: ${{matrix.connectImageVersion}}
- name: Publish test results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
files: "**/target/**/test-reports/*.xml"
check_name: ${{ matrix.module }}-${{ matrix.connectImageVersion }}-fun-results
comment_mode: off

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ trait HttpConfiguration extends LazyLogging {
HttpSinkConfigDef.HttpRequestHeadersProp -> ConfigValue(headerTemplates.mkString(",")),
HttpSinkConfigDef.AuthenticationTypeProp -> ConfigValue("none"), //NoAuthentication
HttpSinkConfigDef.BatchCountProp -> ConfigValue(batchSize),
HttpSinkConfigDef.BatchSizeProp -> ConfigValue(100_000_000),
HttpSinkConfigDef.TimeIntervalProp -> ConfigValue(100_000_000),
HttpSinkConfigDef.JsonTidyProp -> ConfigValue(jsonTidy),
ERROR_REPORTING_ENABLED_PROP -> ConfigValue("false"),
SUCCESS_REPORTING_ENABLED_PROP -> ConfigValue("false"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.scalatest.time.Seconds
import org.scalatest.time.Span

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import scala.jdk.CollectionConverters.ListHasAsScala

class HttpSinkTest
Expand Down Expand Up @@ -193,10 +195,14 @@ class HttpSinkTest
Order(2, "another product", 1.4d, 109),
).asserting {
requests =>
logger.info("Requests size: {}", requests.size)

requests.size should be(1)
val firstRequest = requests.head
firstRequest.getMethod should be(RequestMethod.POST)
new String(firstRequest.getBody) should be(
logger.info("First request: {}", firstRequest)
logger.info("First request body: {}", firstRequest.getBodyAsString)
firstRequest.getBodyAsString should be(
"{\"data\":[{\"id\":1,\"product\":\"myOrder product\",\"price\":1.3,\"qty\":10,\"created\":null},{\"id\":2,\"product\":\"another product\",\"price\":1.4,\"qty\":109,\"created\":null}]}",
)
}
Expand Down Expand Up @@ -250,22 +256,29 @@ class HttpSinkTest
producer =>
createConnectorResource(randomTestId, topic, contentTemplate, converters, batchSize, jsonTidy).use {
_ =>
record.map {
rec => IO(sendRecord[K, V](topic, producer, rec))
}.sequence
.map { _ =>
eventually(timeout(Span(10, Seconds)), interval(Span(500, Millis))) {
verify(postRequestedFor(urlEqualTo(s"/$randomTestId")))
findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList
}
record.map(
sendRecord[K, V](topic, producer, _)
.handleError { error =>
logger.error("Error encountered sending record via producer", error)
fail("Error encountered sending record via producer")
},
).sequence.map { _ =>
eventually(timeout(Span(10, Seconds)), interval(Span(500, Millis))) {
verify(postRequestedFor(urlEqualTo(s"/$randomTestId")))
findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList
}
}

}
}
private def sendRecord[K, V](topic: String, producer: KafkaProducer[K, V], record: V): Unit = {
producer.send(new ProducerRecord[K, V](topic, record)).get
producer.flush()
}

private def sendRecord[K, V](topic: String, producer: KafkaProducer[K, V], record: V): IO[Unit] =
for {
producerRecord <- IO.pure(new ProducerRecord[K, V](topic, record))
scalaFuture = IO(Future(producer.send(producerRecord).get(10, TimeUnit.SECONDS)))
_ <- IO.fromFuture(scalaFuture)
_ <- IO(producer.flush())
} yield ()

def createConnectorResource(
randomTestId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class HttpSinkTaskIT extends AsyncFunSuite with AsyncIOSpec with Eventually {
config: Map[String, String] = Map(
HttpSinkConfigDef.HttpMethodProp -> HttpMethod.Post.toString,
HttpSinkConfigDef.HttpEndpointProp -> s"http://$Host:${server.port()}/awesome/endpoint",
HttpSinkConfigDef.HttpRequestContentProp -> "test",
HttpSinkConfigDef.HttpRequestContentProp -> "test {{value.name}}",
HttpSinkConfigDef.AuthenticationTypeProp -> noAuthentication,
HttpSinkConfigDef.BatchCountProp -> "1",
ERROR_REPORTING_ENABLED_PROP -> "false",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink

import cats.data.NonEmptySeq
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord

/**
* \`BatchInfo\` represents information about a batch of records in the queue.
*/
sealed trait BatchInfo {

/**
* Get the total number of records in the queue.
* @return queueSize the total number of records in the queue.
*/
def queueSize: Int
}

/**
* EmptyBatchInfo may be returned in the following circumstances:
* - there is no data in the queue to process
* - there is data in the queue however not enough for a batch (the commit has not been triggered per the CommitPolicy)
* @param queueSize the total number of records in the queue.
*/
case class EmptyBatchInfo(queueSize: Int) extends BatchInfo

/**
* NonEmptyBatchInfo is returned for an actual batch of data.
* @param batch the RenderedRecords to return
* @param updatedCommitContext the amended commit context
* @param queueSize the total number of records in the queue.
*/
case class NonEmptyBatchInfo(
batch: NonEmptySeq[RenderedRecord],
updatedCommitContext: HttpCommitContext,
queueSize: Int,
) extends BatchInfo
Loading

0 comments on commit 5b910a3

Please sign in to comment.