Skip to content

Commit

Permalink
Fixes memory leaks found during stress testing #252
Browse files Browse the repository at this point in the history
- Code to proactively cleanup loaded history during pagination was not actually reducing memory pressure as the underlying slice does not shrink when taking sub-slices.

- Holding onto side effect payloads after they have been processed provides no benefit and just eats memory. We can just delete the entry once we've served the SideEffect value.
  • Loading branch information
mastermanu authored Sep 21, 2020
1 parent 9fde76d commit e1e8a42
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
4 changes: 4 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
panic(fmt.Sprintf("No cached result found for side effectID=%v. KnownSideEffects=%v",
sideEffectID, keys))
}

// Once the SideEffect has been consumed, we can free the referenced payload
// to reduce memory pressure
delete(wc.sideEffectResult, sideEffectID)
wc.logger.Debug("SideEffect returning already calculated result.",
tagSideEffectID, sideEffectID)
} else {
Expand Down
9 changes: 8 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,14 @@ OrderEvents:
}

// shrink loaded events so it can be GCed
eh.loadedEvents = eh.loadedEvents[eh.currentIndex:]
eh.loadedEvents = append(
make(
[]*historypb.HistoryEvent,
0,
len(eh.loadedEvents)-eh.currentIndex),
eh.loadedEvents[eh.currentIndex:]...,
)

eh.currentIndex = 0

return nextEvents, markers, nil
Expand Down

0 comments on commit e1e8a42

Please sign in to comment.