Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running processor on exception: startConsumeFromSequence has to be a positive number #103

Closed
AndryHTC opened this issue Jul 25, 2023 · 6 comments · Fixed by #104
Closed
Assignees

Comments

@AndryHTC
Copy link

After the update to the 0.2.3 it gives me this error
Running processor on exception: startConsumeFromSequence has to be a positive number

@yanivbh1
Copy link
Contributor

Adding @bazen-teklehaymanot for the help.
Thank you @AndryHTC .

@tbazen
Copy link
Collaborator

tbazen commented Jul 26, 2023

Hi @AndryHTC , can you please provide more detail on how to reproduce the issue?

Thanks!

@AndryHTC
Copy link
Author

Well... This is the code I had before updating:

private static async Task SetupAndStartConsumer(string stationName, string consumerName,
        Func<MemphisMessage, Task<string>> processMessage)
    {
        var consumerOptions = new MemphisConsumerOptions
        {
            StationName = stationName,
            ConsumerName = consumerName,
            ConsumerGroup = "",
            PullIntervalMs = 100
        };

        var consumer = await _memphisClient!.CreateConsumer(consumerOptions);
        consumer.MessageReceived += async (_, e) =>
        {
            if (e.Exception != null)
            {
                Console.Error.WriteLine(e.Exception);
                return;
            }
            
            var producerOptions = new MemphisProducerOptions
            {
                StationName = "pontis-meilisearch-events",
                ProducerName = "debezium-to-meilisearch-producer",
                GenerateUniqueSuffix = true
            };
            _producer = await _memphisClient.CreateProducer(producerOptions);
            var commonHeaders = new NameValueCollection { { "produced-by", "debezium-to-meilisearch-producer" } };

            var messageProcessingTasks = new Dictionary<Task<string>, MemphisMessage>();
            foreach (var msg in e.MessageList)
            {
                var task = processMessage(msg);
                messageProcessingTasks.Add(task, msg);
            }

            while (messageProcessingTasks.Count > 0)
            {
                var completedTask = await Task.WhenAny(messageProcessingTasks.Keys);
                var msg = messageProcessingTasks[completedTask];

                if (completedTask.Status == TaskStatus.RanToCompletion)
                {
                    await _producer.ProduceAsync(
                        Encoding.UTF8.GetBytes(completedTask.Result),
                        commonHeaders
                    );
                    msg.Ack();
                }
                else
                {
                    await Console.Error.WriteLineAsync($"Message processing failed: {completedTask.Exception}");
                    SentrySdk.CaptureException(completedTask.Exception!);
                }

                messageProcessingTasks.Remove(completedTask);
            }
        };

        consumer.ConsumeAsync();
    }

After the update to 0.2.3 it gives me that error so I rolled back to the 0.2.2. I don't have many other info

@tbazen
Copy link
Collaborator

tbazen commented Jul 27, 2023

Thanks @AndryHTC, I'm merging the fix it will be available on the next release

@AndryHTC
Copy link
Author

Any idea on when will be published the next release?

@yanivbh1
Copy link
Contributor

Hey @AndryHTC , Aug 7.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants