From 73978d7b7cebf06e77842a021de00ede52891c2d Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Thu, 21 Nov 2024 21:05:24 -0800 Subject: [PATCH 1/6] When leveraging Kafka Pubsub Avro serialization, caching should be enabled by default with a TTL of 5min Signed-off-by: Patrick Assuied --- common/component/kafka/kafka.go | 11 +++++++++++ common/component/kafka/metadata.go | 2 ++ 2 files changed, 13 insertions(+) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 2f3b67be0d..94083215a2 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -208,14 +208,17 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.consumeRetryInterval = meta.ConsumeRetryInterval if meta.SchemaRegistryURL != "" { + k.logger.Debugf("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL) k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL) // Empty password is a possibility if meta.SchemaRegistryAPIKey != "" { k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret) } + k.logger.Debugf("Schema caching enabled: %v", meta.SchemaCachingEnabled) k.srClient.CachingEnabled(meta.SchemaCachingEnabled) if meta.SchemaCachingEnabled { k.latestSchemaCache = make(map[string]SchemaCacheEntry) + k.logger.Debugf("Schema cache TTL: %v", meta.SchemaLatestVersionCacheTTL) k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL } } @@ -313,6 +316,12 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, return nil, nil, err } + // measure the time taken to get the schema + start := time.Now() + defer func() { + k.logger.Debugf("Time taken to get latest schema for topic %s: %v", topic, time.Since(start)) + }() + subject := getSchemaSubject(topic) if k.schemaCachingEnabled { k.latestSchemaCacheReadLock.Lock() @@ -321,8 +330,10 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, // Cache present and not expired if ok && cacheEntry.expirationTime.After(time.Now()) { + k.logger.Debugf("Schema cache hit for subject %s", subject) return cacheEntry.schema, cacheEntry.codec, nil } + k.logger.Debugf("Cache not found or expired for subject %s. Fetching from registry...", subject) schema, errSchema := srClient.GetLatestSchema(subject) if errSchema != nil { return nil, nil, errSchema diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index c4d0a6bb56..5f0011be9f 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -163,6 +163,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval, HeartbeatInterval: 3 * time.Second, SessionTimeout: 10 * time.Second, + SchemaCachingEnabled: true, + SchemaLatestVersionCacheTTL: 5 * time.Minute, EscapeHeaders: false, } From 83338e3b4fb05dbb53b7c88698f3b8eff094aa6c Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 22 Nov 2024 15:09:30 -0800 Subject: [PATCH 2/6] - Address another issue where poison pill messages could be skipped in some weird circumstances - To avoid complexity, always exiting the for loop in case the RetryNotify fails... - Let dapr re-initiate the session in those cases - Address PR feedback and fix breaking tests Signed-off-by: Patrick Assuied --- common/component/kafka/consumer.go | 2 ++ common/component/kafka/kafka.go | 5 ++--- common/component/kafka/kafka_test.go | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 7cb923a455..caf994668a 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -91,6 +91,8 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) }); err != nil { consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) + // Once reached here, exit the loop to avoid processing more messages and potentially skipping the poison pill message. + return nil } } else { err := consumer.doCallback(session, message) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 94083215a2..515c8e9d12 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -208,13 +208,13 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.consumeRetryInterval = meta.ConsumeRetryInterval if meta.SchemaRegistryURL != "" { - k.logger.Debugf("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL) + k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL) k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL) // Empty password is a possibility if meta.SchemaRegistryAPIKey != "" { k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret) } - k.logger.Debugf("Schema caching enabled: %v", meta.SchemaCachingEnabled) + k.logger.Infof("Schema caching enabled: %v", meta.SchemaCachingEnabled) k.srClient.CachingEnabled(meta.SchemaCachingEnabled) if meta.SchemaCachingEnabled { k.latestSchemaCache = make(map[string]SchemaCacheEntry) @@ -330,7 +330,6 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, // Cache present and not expired if ok && cacheEntry.expirationTime.After(time.Now()) { - k.logger.Debugf("Schema cache hit for subject %s", subject) return cacheEntry.schema, cacheEntry.codec, nil } k.logger.Debugf("Cache not found or expired for subject %s. Fetching from registry...", subject) diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index 3fbe8c7a2e..a65fb29124 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks" + "github.com/dapr/kit/logger" ) func TestGetValueSchemaType(t *testing.T) { @@ -62,6 +63,7 @@ func TestDeserializeValue(t *testing.T) { k := Kafka{ srClient: registry, schemaCachingEnabled: true, + logger: logger.NewLogger("kafka_test"), } schemaIDBytes := make([]byte, 4) @@ -175,6 +177,7 @@ func TestSerializeValueCachingDisabled(t *testing.T) { k := Kafka{ srClient: registry, schemaCachingEnabled: false, + logger: logger.NewLogger("kafka_test"), } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { @@ -250,6 +253,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Minute * 5, + logger: logger.NewLogger("kafka_test"), } t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) { @@ -280,6 +284,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 10, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(1) @@ -302,6 +307,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: true, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: time.Second * 1, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) @@ -326,6 +332,7 @@ func TestLatestSchemaCaching(t *testing.T) { schemaCachingEnabled: false, latestSchemaCache: make(map[string]SchemaCacheEntry), latestSchemaCacheTTL: 0, + logger: logger.NewLogger("kafka_test"), } m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2) From 5810dfdadfe89eeeadbe8aff7a5709a62522555d Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 22 Nov 2024 17:09:02 -0800 Subject: [PATCH 3/6] Taking a different approach to handle weird behavior that skips messages in some weird circumstances Signed-off-by: Patrick Assuied --- common/component/kafka/consumer.go | 33 ++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index caf994668a..8d2320cf67 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -14,6 +14,7 @@ limitations under the License. package kafka import ( + "context" "errors" "fmt" "net/url" @@ -32,6 +33,28 @@ type consumer struct { mutex sync.Mutex } +func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, b backoff.BackOff) error { + for { + if err := retry.NotifyRecover(func() error { + return consumer.doCallback(session, message) + }, b, func(err error, d time.Duration) { + consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) + }, func() { + consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) + }); err != nil { + // If the retry policy got interrupted, it could mean that either + // the policy has reached its maximum number of attempts or the context has been cancelled. + // There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done. + // This is a workaround to handle that edge case and reprocess the current message. + if err == context.Canceled && session.Context().Err() == nil { + consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying...") + continue + } + return err + } + } +} + func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context()) isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic()) @@ -83,16 +106,8 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } if consumer.k.consumeRetryEnabled { - if err := retry.NotifyRecover(func() error { - return consumer.doCallback(session, message) - }, b, func(err error, d time.Duration) { - consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) - }, func() { - consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key)) - }); err != nil { + if err := notifyRecover(consumer, message, session, b); err != nil { consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) - // Once reached here, exit the loop to avoid processing more messages and potentially skipping the poison pill message. - return nil } } else { err := consumer.doCallback(session, message) From 6b8091bfc03c87cd7c343fc00ac87a9fa2bc9cdf Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 22 Nov 2024 18:05:45 -0800 Subject: [PATCH 4/6] missed exit path Signed-off-by: Patrick Assuied --- common/component/kafka/consumer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 8d2320cf67..5c1aebdbd1 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -52,6 +52,8 @@ func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session } return err } + return nil + } } From e6c2d4b61bf1439a98b425af21fc1afdd3246c74 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 22 Nov 2024 18:06:27 -0800 Subject: [PATCH 5/6] format Signed-off-by: Patrick Assuied --- common/component/kafka/consumer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index 5c1aebdbd1..b7ea51240c 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -53,7 +53,6 @@ func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session return err } return nil - } } From 9a57158d416f4f7392c8e1dee283dc26a8ad1d91 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Tue, 26 Nov 2024 11:04:05 -0800 Subject: [PATCH 6/6] Remove timing stat debug per PR feedback Signed-off-by: Patrick Assuied --- common/component/kafka/kafka.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 515c8e9d12..c4d9989fb2 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -316,12 +316,6 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, return nil, nil, err } - // measure the time taken to get the schema - start := time.Now() - defer func() { - k.logger.Debugf("Time taken to get latest schema for topic %s: %v", topic, time.Since(start)) - }() - subject := getSchemaSubject(topic) if k.schemaCachingEnabled { k.latestSchemaCacheReadLock.Lock()