Skip to content

Commit

Permalink
WTEL-4587
Browse files Browse the repository at this point in the history
  • Loading branch information
i.navrotskyj committed May 24, 2024
1 parent 7a1ea5f commit 9367aea
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
24 changes: 24 additions & 0 deletions queue/attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Attempt struct {
stickyAgentId *int32

processingForm model.ProcessingForm
processingFields sync.Map
processingFormStarted bool
bridgedAt int64
manualDistribution bool
Expand Down Expand Up @@ -158,6 +159,25 @@ func (a *Attempt) ProcessingFormStarted() bool {
return a.processingFormStarted
}

func (a *Attempt) UpdateProcessingFields(fields map[string]string) {
for k, v := range fields {
a.processingFields.Store(k, v)
}
}

func (a *Attempt) ProcessingFields() map[string]string {
if !a.processingFormStarted {
return nil
}
res := make(map[string]string)
a.processingFields.Range(func(key, value any) bool {
res[fmt.Sprintf("%s", key)] = fmt.Sprintf("%s", value)
return true
})

return res
}

func (a *Attempt) SetMemberStopCause(cause *string) {
a.Lock()
a.memberStopCause = cause
Expand Down Expand Up @@ -340,6 +360,10 @@ func (a *Attempt) ExportSchemaVariables() map[string]string {
res[k] = fmt.Sprintf("%v", v)
}

for k, v := range a.ProcessingFields() {
res[k] = v
}

if a.member.Seq != nil {
res[model.QUEUE_ATTEMPT_SEQ] = fmt.Sprintf("%d", *a.member.Seq)
}
Expand Down
1 change: 1 addition & 0 deletions queue/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (qm *QueueManager) attemptProcessingActionForm(attemptId int64, action stri
}

if attempt.processingForm != nil && attempt.agent != nil {
attempt.UpdateProcessingFields(fields)
_, err := attempt.processingForm.ActionForm(attempt.Context, action, fields)
if err != nil {
attempt.Log(err.Error())
Expand Down

0 comments on commit 9367aea

Please sign in to comment.