Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages randomly not delivered #738

Open
nicolasassi opened this issue Sep 25, 2022 · 7 comments
Open

Messages randomly not delivered #738

nicolasassi opened this issue Sep 25, 2022 · 7 comments

Comments

@nicolasassi
Copy link

nicolasassi commented Sep 25, 2022

Hi!

I'm using elasticmq with Go and currently this is my pooling implementation:

func listenForLocalSQSEvents(urls []string) {
	ctx := context.Background()
	sess := session.Must(session.NewSessionWithOptions(session.Options{
		Config: aws.Config{
			CredentialsChainVerboseErrors: aws.Bool(true),
			Endpoint:                      aws.String(env.GetString("ELASTICMQ_URI")),
			Region:                        aws.String("ELASTICMQ_REGION"),
		},
	}))
	svc := sqs.New(sess)
	for _, url := range urls {
			go func(url string) {
				for {
					log.Printf("requesting at: %v", time.Now())
					msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
						AttributeNames: []*string{
							aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
						},
						MessageAttributeNames: []*string{
							aws.String(sqs.QueueAttributeNameAll),
						},
						QueueUrl:            aws.String(url),
						MaxNumberOfMessages: aws.Int64(10),
						WaitTimeSeconds:     aws.Int64(5),
					})
					if err != nil {
						panic(err)
					}
					log.Printf("msgResult: %+v", msgResult.String())
					if err := eventsLambda.Consume(ctx, msgResult); err != nil {
						log.Printf("local sqs consumer error: %v", err)
						continue
					}
				}
			}(v)
	}
}

The problem is, some messages are never delivered and some are delivered with no delay at all.
Using the UI I noticed that the messages which are never delivered increase the counter on the Approximate number of not visible Messages column.

@micossow
Copy link
Contributor

micossow commented Nov 1, 2022

Hi @nicolasassi, if you want us to investigate the issue, please provide an application reproducing the problem, preferably in a form of GH repo, that we'd be able to run locally.

@sergiuionescu
Copy link

I've experience similar behaviour, I have a hunch this is related to the dedletter queue functionality and the way messages ids are being assigned.

I've noticed that the messages are pushed directly to the deadletter queue.

Was not quite able to confirm the root cause.

Removing the deadletter queue from the configuration or raising maxReceiveCount seems to adress the issue so it's not breaking our functional tests.

@sergiuionescu
Copy link

@micossow If you could help me with a logback configuration that would log client calls and internals to the console output I could try to get more insight into this.

So far, my configuration only outputs initialisation logs. on 1.3.14:

`

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <layout class="ch.qos.logback.classic.PatternLayout">
        <Pattern>
            %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
        </Pattern>
    </layout>
</appender>

<logger name="com.mkyong" level="debug" additivity="false">
    <appender-ref ref="CONSOLE"/>
</logger>

<root level="error">
    <appender-ref ref="CONSOLE"/>
</root>
`

@micossow
Copy link
Contributor

You can try with debug log messages from akka and elasticmq:

    <logger name="org.elasticmq" additivity="false" level="DEBUG">
        <appender-ref ref="CONSOLE"/>
    </logger>

    <logger name="akka" additivity="false" level="DEBUG">
        <appender-ref ref="CONSOLE"/>
    </logger>

@sergiuionescu
Copy link

sergiuionescu commented Mar 31, 2023

Thanks I got more log data with this.

With the following queue config:

local-test-notification {
        defaultVisibilityTimeout = 0 seconds
        delay = 0 seconds
        receiveMessageWait = 0 seconds

        deadLettersQueue {
            name = "local-test-notification-deadletter"
            maxReceiveCount = 3
        }
    }

    local-test-notification-deadletter {
        defaultVisibilityTimeout = 10 seconds
        delay = 0 seconds
        receiveMessageWait = 0 seconds
    }
sqs  | 14:32:58.022 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Sent message with id 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1
sqs  | 14:32:58.023 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Receiving message 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1
sqs  | 14:32:58.024 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Awaiting messages: replying to sequence 0 with 1 messages.
sqs  | 14:32:58.025 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Receiving message 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1
sqs  | 14:32:58.026 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Awaiting messages: replying to sequence 1 with 1 messages.
sqs  | 14:32:58.035 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Receiving message 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1
sqs  | 14:32:58.039 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: Awaiting messages: replying to sequence 2 with 1 messages.
sqs  | 14:32:58.059 [elasticmq-akka.actor.default-dispatcher-12] DEBUG o.elasticmq.actor.queue.QueueActor - local-test-notification: send message InternalMessage(0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1,ArrayBuffer(0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1#bcac6d6b-4661-426f-ad81-4c1c1ec59c72, 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1#9e7393fe-224e-4b7b-9e7a-358ec72b3ec6, 0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1#e0764a3d-3e1c-4384-8a03-e60ac5912aa7),1680273178035,O:36:\"Symfony\\Component\\Messenger\\Envelope\":2:{s:44:\"\0Symfony\\Component\\Messenger\\Envelope\0stamps\";a:1:{s:46:\"Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\";a:1:{i:0;O:46:\"Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\":1:{s:55:\"\0Symfony\\Component\\Messenger\\Stamp\\BusNameStamp\0busName\";s:21:\"messenger.bus.default\";}}}s:45:\"\0Symfony\\Component\\Messenger\\Envelope\0message\";O:63:\"Message\\Notification\":1:{s:83:\"\0Message\\Notification\0id\";s:36:\"973261f1-7cdc-467d-9f02-97812bb49e67\";}},Map(),2023-03-31T14:32:58.022Z,0,OnDateTimeReceived(2023-03-31T14:32:58.023Z),3,false,None,None,None,None) to dead letters actor Some(Actor[akka://elasticmq/user/$a/$c#2062594922])
sqs  | 14:32:58.070 [elasticmq-akka.actor.default-dispatcher-8] DEBUG o.elasticmq.actor.queue.QueueActor - Moved message (0abdd44f-ad6f-4c1a-8b7d-2fd965d411f1) to local-test-notification-deadletter

The message seems to be moved directly to the deadletter queue, but I still do not see the reason.

@micossow
Copy link
Contributor

Well, that's how the SQS works. If you receive the message maxReceiveCount times, but don't delete it using DeleteMessage, it gets moved to dead letter queue.
SQS messages do not get deleted automatically.

@sergiuionescu
Copy link

Thanks @micossow there is definetly some eagger and unsolicited fetching from happening on the client library side that I have to track down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants