-
Notifications
You must be signed in to change notification settings - Fork 369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP Sink MVP #1013
HTTP Sink MVP #1013
Conversation
5939a47
to
a46b520
Compare
kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
Outdated
Show resolved
Hide resolved
...http/src/main/scala/io/lenses/streamreactor/connect/http/sink/client/HttpRequestSender.scala
Outdated
Show resolved
Hide resolved
|
||
class KafkaConnectKeyBinding(name: Option[String]) extends KafkaConnectBaseBinding() { | ||
override def get(sinkRecord: SinkRecord): AnyRef = KafkaConnectExtractor.extractFromKey(sinkRecord, name).leftMap(e => | ||
throw new ConnectException(s"unable to extract field $name for template, ", e), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a nit pick :)
I would simply pattern match on the result of extractFromKey
and throw on the left branch. It's a bit more explicit behavior wise
215bb61
to
dea8b29
Compare
350b347
to
de082c6
Compare
85e076a
to
a31e016
Compare
90d6c39
to
d58199f
Compare
* Regex templating * Commit Policies * Integration testing * Fix logging * Error handling * Add configuration for error threshold and upload sync period
d58199f
to
cadf56e
Compare
@@ -246,6 +246,28 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7")) | |||
.configureFunctionalTests() | |||
.enablePlugins(PackPlugin) | |||
|
|||
lazy val http = (project in file("kafka-connect-http")) | |||
.dependsOn(common) | |||
//.dependsOn(`test-common` % "fun->compile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the functional tests are to be added later? Do we need that line then?
.configureAssembly(false) | ||
.configureTests(baseTestDeps ++ kafkaConnectHttpTestDeps) | ||
.configureIntegrationTests(baseTestDeps ++ kafkaConnectHttpTestDeps) | ||
//.configureFunctionalTests(kafkaConnectS3FuncTestDeps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the functional tests are to be added later? Do we need that line then?
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushInterval | ||
import io.lenses.streamreactor.connect.cloud.common.sink.config.FlushSettings.defaultFlushSize | ||
|
||
object CloudCommitPolicy extends LazyLogging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to extend LazyLogging here right ?
.../src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/commit/CommitContext.scala
Outdated
Show resolved
Hide resolved
kafka-connect-http/src/it/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskIT.scala
Outdated
Show resolved
Hide resolved
kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala
Outdated
Show resolved
Hide resolved
}.unsafeRunSync() | ||
() | ||
} | ||
_ <- IO.delay(scheduler.scheduleAtFixedRate(runnable, 0, uploadSyncPeriod.toLong, TimeUnit.SECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have a look at this on Monday maybe.
Unfortunately, doing it like this means that it will be ever running and cannot be stopped.
Running the test from HttpSinkTaskIT
you can see that even tho the test terminates this process keeps running until the jvm is terminated. Ideally we would want to have a "stop" method that could interrupt this scheduling.
Work to date on a HTTP sink.
Not yet covered:
This is a work in progress and should not be used in a production environment.
Configuration
For speed of development the configuration is provided via a Kafka Connect configuration property (
connect.http.config
), through the means of a Json template. Here is an example:There was a discussion about using KCQL however it was decided that this is not suitable for this use case.
Templating
Two different types of templates can be provided:
XML is used here as an example, in fact any markup language or text content can be supported.
Single Template
For when you want to substitute the contents of a single message into the template content. It could be used for multiple messages but this wouldn't make much sense as there is no looping part.
Multi Template
For when you want to merge multiple messages inside a single HTTP request. It loops through every message.