Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keys are not delete from redis when redis.command is SET #32

Open
valentinno7 opened this issue Dec 6, 2023 · 1 comment
Open

Keys are not delete from redis when redis.command is SET #32

valentinno7 opened this issue Dec 6, 2023 · 1 comment

Comments

@valentinno7
Copy link

We have noticed that in v0.9.0 (currently we are using redis/redis-enterprise-kafka:6.7.4 and don't have this issue) keys are not deleted from redis if message value is null when redis.command is SET, instead of deletion value is set to empty string.
Here https://redis-field-engineering.github.io/redis-kafka-connect/#_sink_string it says:
String or bytes. If value is null the key is deleted. So we assume that redis item deletion is expected behaviour.
Providing example reproduced using jupyter notebooks:
image

@CJPoll
Copy link

CJPoll commented Jul 9, 2024

I'm experiencing this issue too. In fact, it kills the whole connector.

My connector configuration (using terraform to define):

resource "kafka-connect_connector" "redis-kafka-connect" {
  name = "redis-kafka-connect"

  config = {
    "name": "redis-kafka-connect",
    "connector.class": "com.redis.kafka.connect.RedisSinkConnector",
    "tasks.max": "1",
    "topics": "${var.topic}",
    "redis.uri": "redis://redis:6379",
    "redis.command": "SET",

    # The key is serialized using Avro, but for visibility in tooling we want to convert it to just the entity UUID using a Single-Message Transform
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",

    # The value is also serialized in Avro, but we don't want to make any changes - leave it serialized in Redis
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

    # Take the `id` field from the key and replace the entire key with just the UUID value. Picture below showing that this transform works as expected for create/update events
    "transforms": "ExtractField",
    "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractField.field": "id"
  }
} 

This setup works for creates/updates
image

But it kills the connector when the value is a null value.

Error: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect  | 2024-07-09T05:05:18.013549922Z com.redis.kafka.connect.shaded.io.lettuce.core.RedisException: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis. 
connect  | 2024-07-09T05:05:18.013552003Z       at com.redis.kafka.connect.shaded.io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:106)
connect  | 2024-07-09T05:05:18.013553622Z       at com.redis.kafka.connect.shaded.com.redis.spring.batch.common.AbstractOperationExecutor.process(AbstractOperationExecutor.java:124)                                                                       
connect  | 2024-07-09T05:05:18.013555046Z       at com.redis.kafka.connect.shaded.com.redis.spring.batch.writer.AbstractOperationItemWriter.write(AbstractOperationItemWriter.java:47)
connect  | 2024-07-09T05:05:18.013556414Z       at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:328)                                                                                                                                  
connect  | 2024-07-09T05:05:18.013557686Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect  | 2024-07-09T05:05:18.013558962Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)                                                                                                                            
connect  | 2024-07-09T05:05:18.013560234Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect  | 2024-07-09T05:05:18.013561502Z       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)                                                                                                                         
connect  | 2024-07-09T05:05:18.013562770Z       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect  | 2024-07-09T05:05:18.013564125Z       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)                                                                                                                                     
connect  | 2024-07-09T05:05:18.013565430Z       at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)                                                                                                            
connect  | 2024-07-09T05:05:18.013566721Z       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect  | 2024-07-09T05:05:18.013568017Z       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                       
connect  | 2024-07-09T05:05:18.013569932Z       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                                                
connect  | 2024-07-09T05:05:18.013571252Z       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect  | 2024-07-09T05:05:18.013572582Z       at java.base/java.lang.Thread.run(Thread.java:829)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants