Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't find the way to setup one KCQL statement for multiple topics using the [topics] setting #4

Open
mauro-dellachiesa-bl opened this issue Jul 18, 2024 · 1 comment

Comments

@mauro-dellachiesa-bl
Copy link

mauro-dellachiesa-bl commented Jul 18, 2024

I'm trying to setup one instance of the S3 Sink connector to backup around 15 topics.
Looking at the documentation here I'm not able to figure out exactly what is the KCQL statement needed.
As an example, having topic1, topic2, topic3, these are the different settings I have tried:

topics setting attempted

  • topics=topic1, topic2, topic3
  • topics=topic1;topic2;topic3

connect.s3.kcql setting attempted

aBucket:andItsPath is not the actual value, I replaced it just for documenting this issue.

  • INSERT INTO aBucket:andItsPath SELECT * FROM TOPICNAME STOREAS BYTES PROPERTIES ('flush.count'=1)
  • INSERT INTO aBucket:andItsPath SELECT * FROM * STOREAS BYTES PROPERTIES ('flush.count'=1)
  • INSERT INTO aBucket:andItsPath SELECT * FROM {TOPIC} STOREAS BYTES PROPERTIES ('flush.count'=1)
  • INSERT INTO aBucket:andItsPath SELECT * FROM some-topic STOREAS BYTES PROPERTIES ('flush.count'=1)

The error I got the most is this one:
Can't find the KCQL for source topic [Topic(topic1)]. The topics defined via [topics] or [topics.regex] need to have an equivalent KCQL statement: INSERT INTO {DESTINATION} SELECT * FROM {TOPIC}.

In terms of the connector version (kafka-connect-aws-s3-assembly-7.3.2.jar), this is what I see in the logs:
Instantiated connector aConnector with version 7.3.2 of type class io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:284)

Combining the information I get from the error logs and the documentation, I cannot figure out a way to achieve the configuration I need.
Is it possible to have such configuration or should I replicate the KCQL statements and separate them with ; for each topic I want to backup?

Thanks in advance!

@mauro-dellachiesa-bl
Copy link
Author

mauro-dellachiesa-bl commented Jul 18, 2024

With this KCQL I'm now getting an AWS associated error, which makes me think I found the correct one:
INSERT INTO aBucket:andItsPath SELECT * FROM `*` STOREAS BYTES PROPERTIES ('flush.count'=1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant