Skip to content

Commit

Permalink
[prog] fix a stop bug (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
Asphaltt authored Aug 17, 2021
1 parent fd5c3e9 commit 8a0799a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
16 changes: 11 additions & 5 deletions perf_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type PerfEvents struct {
// PollTimeoutMs is timeout for blocking call of poll()
// Defaults to 100ms
PollTimeoutMs int
poller *perfEventPoller

perfMap Map
updatesChannel chan []byte
Expand Down Expand Up @@ -132,6 +133,8 @@ func (pe *PerfEvents) StartForAllProcessesAndCPUs(bufferSize int) (<-chan []byte

// Stop stops event polling loop
func (pe *PerfEvents) Stop() {
// Stop poller firstly
pe.poller.Stop()
// Stop poll loop
close(pe.stopChannel)
// Wait until poll loop stopped, then close updates channel
Expand All @@ -154,22 +157,25 @@ func (pe *PerfEvents) startLoop() {

func (pe *PerfEvents) loop() {
// Setup poller to poll all handlers (one handler per CPU)
poller := newPerfEventPoller()
pe.poller = newPerfEventPoller()
for _, handler := range pe.handlers {
poller.Add(handler)
pe.poller.Add(handler)
}

// Start poller
pollerCh := poller.Start(pe.PollTimeoutMs)
pollerCh := pe.poller.Start(pe.PollTimeoutMs)
defer func() {
poller.Stop()
pe.wg.Done()
}()

// Wait until at least one perf event fd becomes readable (has new data)
for {
select {
case handler := <-pollerCh:
case handler, ok := <-pollerCh:
if !ok {
return
}

pe.handlePerfEvent(handler)

case <-pe.stopChannel:
Expand Down
8 changes: 7 additions & 1 deletion perf_events_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (p *perfEventPoller) Stop() {
// Stop loop
close(p.stopChannel)
p.wg.Wait()
close(p.updateChannel)
}

func (p *perfEventPoller) loop() {
Expand All @@ -119,7 +120,12 @@ func (p *perfEventPoller) loop() {

// Send perfEventHandlers with pending updates, if any
for i := 0; i < readyCnt; i++ {
p.updateChannel <- p.items[int(p.fds[i])]
select {
case p.updateChannel <- p.items[int(p.fds[i])]:

case <-p.stopChannel:
return
}
}
}
}

0 comments on commit 8a0799a

Please sign in to comment.