Skip to content

Commit

Permalink
generate
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec committed Dec 18, 2024
1 parent 22c855e commit 8bb8b01
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package remote
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -43,8 +42,8 @@ type dispatcher struct {
}

type key struct {
capID string
donID uint32
capId string
donId uint32
}

var _ services.Service = &dispatcher{}
Expand Down Expand Up @@ -75,7 +74,7 @@ func (d *dispatcher) Start(ctx context.Context) error {
d.peer = d.peerWrapper.GetPeer()
d.peerID = d.peer.ID()
if d.peer == nil {
return errors.New("peer is not initialized")
return fmt.Errorf("peer is not initialized")
}
d.wg.Add(1)
go func() {
Expand Down Expand Up @@ -104,13 +103,13 @@ type receiver struct {
ch chan *types.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityID string, donID uint32, rec types.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityID, donID}
k := key{capabilityId, donId}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityID, donID)
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId)
}

receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize())
Expand All @@ -135,24 +134,23 @@ func (d *dispatcher) SetReceiver(capabilityID string, donID uint32, rec types.Re
ch: receiverCh,
}

d.lggr.Debugw("receiver set", "capabilityId", capabilityID, "donId", donID)
d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityID string, donID uint32) {
func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
d.mu.Lock()
defer d.mu.Unlock()

receiverKey := key{capabilityID, donID}
receiverKey := key{capabilityId, donId}
if receiver, ok := d.receivers[receiverKey]; ok {
receiver.cancel()
delete(d.receivers, receiverKey)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityID, "donID", donID)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
//nolint:gosec // G115
msgBody.Version = uint32(d.cfg.SupportedVersion())
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
Expand Down Expand Up @@ -196,17 +194,17 @@ func (d *dispatcher) receive() {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID)
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}

receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize())
capReceiveChannelUsage.WithLabelValues(k.capID, strconv.FormatUint(uint64(k.donID), 10)).Set(receiverQueueUsage)
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capID, "donId", k.donID)
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId)
}
}
}
Expand Down

0 comments on commit 8bb8b01

Please sign in to comment.