Skip to content

Commit

Permalink
no messaging between torrent loop and announcer
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Mar 2, 2019
1 parent 4038388 commit 14ed4dc
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 61 deletions.
40 changes: 11 additions & 29 deletions internal/announcer/cancelable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import (
type cancelableAnnouncer struct {
ResponseC chan *tracker.AnnounceResponse
ErrorC chan error
requestC chan *Request
getTorrent func() tracker.Torrent
newPeers chan []*net.TCPAddr
tracker tracker.Tracker
announcing bool
stopC chan struct{}
doneC chan struct{}
}

func newCancelableAnnouncer(trk tracker.Tracker, requestC chan *Request, newPeers chan []*net.TCPAddr) *cancelableAnnouncer {
func newCancelableAnnouncer(trk tracker.Tracker, getTorrent func() tracker.Torrent, newPeers chan []*net.TCPAddr) *cancelableAnnouncer {
return &cancelableAnnouncer{
tracker: trk,
requestC: requestC,
newPeers: newPeers,
ResponseC: make(chan *tracker.AnnounceResponse),
ErrorC: make(chan error),
tracker: trk,
getTorrent: getTorrent,
newPeers: newPeers,
ResponseC: make(chan *tracker.AnnounceResponse),
ErrorC: make(chan error),
}
}

Expand All @@ -33,7 +33,8 @@ func (a *cancelableAnnouncer) Announce(e tracker.Event, numWant int) {
a.announcing = true
a.stopC = make(chan struct{})
a.doneC = make(chan struct{})
go announce(a.tracker, e, numWant, a.requestC, a.newPeers, a.ResponseC, a.ErrorC, a.stopC, a.doneC)
torrent := a.getTorrent() // get latests stats
go announce(a.tracker, e, numWant, torrent, a.newPeers, a.ResponseC, a.ErrorC, a.stopC, a.doneC)
}

func (a *cancelableAnnouncer) Cancel() {
Expand All @@ -48,7 +49,7 @@ func announce(
trk tracker.Tracker,
e tracker.Event,
numWant int,
requestC chan *Request,
torrent tracker.Torrent,
newPeers chan []*net.TCPAddr,
responseC chan *tracker.AnnounceResponse,
errC chan error,
Expand All @@ -67,27 +68,8 @@ func announce(
}
}()

req := &Request{
Response: make(chan Response),
Cancel: make(chan struct{}),
}
defer close(req.Cancel)

select {
case requestC <- req:
case <-stopC:
return
}

var resp Response
select {
case resp = <-req.Response:
case <-stopC:
return
}

annReq := tracker.AnnounceRequest{
Torrent: resp.Torrent,
Torrent: torrent,
Event: e,
NumWant: numWant,
}
Expand Down
17 changes: 4 additions & 13 deletions internal/announcer/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type PeriodicalAnnouncer struct {
completedC chan struct{}
newPeers chan []*net.TCPAddr
backoff backoff.BackOff
requests chan *Request
getTorrent func() tracker.Torrent
lastAnnounce time.Time
HasAnnounced bool
closeC chan struct{}
Expand All @@ -45,16 +45,7 @@ type PeriodicalAnnouncer struct {
needMorePeersC chan struct{}
}

type Request struct {
Response chan Response
Cancel chan struct{}
}

type Response struct {
Torrent tracker.Torrent
}

func NewPeriodicalAnnouncer(trk tracker.Tracker, numWant int, minInterval time.Duration, requests chan *Request, completedC chan struct{}, newPeers chan []*net.TCPAddr, l logger.Logger) *PeriodicalAnnouncer {
func NewPeriodicalAnnouncer(trk tracker.Tracker, numWant int, minInterval time.Duration, getTorrent func() tracker.Torrent, completedC chan struct{}, newPeers chan []*net.TCPAddr, l logger.Logger) *PeriodicalAnnouncer {
return &PeriodicalAnnouncer{
Tracker: trk,
status: NotContactedYet,
Expand All @@ -64,7 +55,7 @@ func NewPeriodicalAnnouncer(trk tracker.Tracker, numWant int, minInterval time.D
log: l,
completedC: completedC,
newPeers: newPeers,
requests: requests,
getTorrent: getTorrent,
needMorePeersC: make(chan struct{}, 1),
closeC: make(chan struct{}),
doneC: make(chan struct{}),
Expand Down Expand Up @@ -120,7 +111,7 @@ func (a *PeriodicalAnnouncer) Run() {
timer := time.NewTimer(math.MaxInt64)
defer timer.Stop()

ca := newCancelableAnnouncer(a.Tracker, a.requests, a.newPeers)
ca := newCancelableAnnouncer(a.Tracker, a.getTorrent, a.newPeers)
defer ca.Cancel()

ca.Announce(tracker.EventStarted, a.numWant)
Expand Down
2 changes: 2 additions & 0 deletions torrent/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (t *torrent) handleAllocationDone(al *allocator.Allocator) {

// No need to verify files if they didn't exist when we create them.
if !al.NeedHashCheck {
t.mBitfield.Lock()
t.bitfield = bitfield.New(t.info.NumPieces)
t.mBitfield.Unlock()
t.processQueuedMessages()
t.startAcceptor()
t.startAnnouncers()
Expand Down
8 changes: 6 additions & 2 deletions torrent/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package torrent

import (
"math"
"sync/atomic"

"github.com/cenkalti/rain/internal/tracker"
)
Expand All @@ -11,13 +12,16 @@ func (t *torrent) announcerFields() tracker.Torrent {
InfoHash: t.infoHash,
PeerID: t.peerID,
Port: t.port,
BytesDownloaded: t.resumerStats.BytesDownloaded,
BytesUploaded: t.resumerStats.BytesUploaded,
BytesDownloaded: atomic.LoadInt64(&t.resumerStats.BytesDownloaded),
BytesUploaded: atomic.LoadInt64(&t.resumerStats.BytesUploaded),
}
t.mBitfield.RLock()
if t.bitfield == nil {
// Some trackers don't send any peer address if don't tell we have missing bytes.
tr.BytesLeft = math.MaxUint32
} else {
tr.BytesLeft = t.info.TotalLength - t.bytesComplete()
}
t.mBitfield.RUnlock()
return tr
}
2 changes: 0 additions & 2 deletions torrent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/cenkalti/rain/internal/addrlist"
"github.com/cenkalti/rain/internal/allocator"
"github.com/cenkalti/rain/internal/announcer"
"github.com/cenkalti/rain/internal/bitfield"
"github.com/cenkalti/rain/internal/blocklist"
"github.com/cenkalti/rain/internal/bufferpool"
Expand Down Expand Up @@ -119,7 +118,6 @@ func (o *options) NewTorrent(infoHash []byte, sto storage.Storage) (*torrent, er
outgoingHandshakers: make(map[*outgoinghandshaker.OutgoingHandshaker]struct{}),
incomingHandshakerResultC: make(chan *incominghandshaker.IncomingHandshaker),
outgoingHandshakerResultC: make(chan *outgoinghandshaker.OutgoingHandshaker),
announcerRequestC: make(chan *announcer.Request),
allocatorProgressC: make(chan allocator.Progress),
allocatorResultC: make(chan *allocator.Allocator),
verifierProgressC: make(chan verifier.Progress),
Expand Down
9 changes: 2 additions & 7 deletions torrent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net"
"time"

"github.com/cenkalti/rain/internal/announcer"
"github.com/cenkalti/rain/internal/bitfield"
"github.com/cenkalti/rain/internal/handshaker/incominghandshaker"
"github.com/cenkalti/rain/internal/handshaker/outgoinghandshaker"
Expand Down Expand Up @@ -122,12 +121,6 @@ func (t *torrent) run() {
ourExtensions,
t.config.ForceIncomingEncryption,
)
case req := <-t.announcerRequestC:
tr := t.announcerFields()
select {
case req.Response <- announcer.Response{Torrent: tr}:
case <-req.Cancel:
}
case pw := <-t.pieceWriterResultC:
pw.Piece.Writing = false

Expand All @@ -151,7 +144,9 @@ func (t *torrent) run() {
if t.bitfield.Test(pw.Piece.Index) {
panic("already have the piece")
}
t.mBitfield.Lock()
t.bitfield.Set(pw.Piece.Index)
t.mBitfield.Unlock()

if t.piecePicker != nil {
for _, pe := range t.piecePicker.RequestedPeers(pw.Piece.Index) {
Expand Down
2 changes: 1 addition & 1 deletion torrent/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (t *torrent) startNewAnnouncer(tr tracker.Tracker) {
tr,
t.config.TrackerNumWant,
t.config.TrackerMinAnnounceInterval,
t.announcerRequestC,
t.announcerFields,
t.completeC,
t.addrsFromTrackers,
t.log,
Expand Down
4 changes: 0 additions & 4 deletions torrent/stats.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package torrent

import (
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -152,9 +151,6 @@ func (t *torrent) stats() Stats {
s.PieceLength = t.info.PieceLength
s.Pieces.Total = t.info.NumPieces
} else {
// Some trackers don't send any peer address if don't tell we have missing bytes.
s.Bytes.Incomplete = math.MaxUint32

s.Name = t.name
}
if t.bitfield != nil {
Expand Down
6 changes: 3 additions & 3 deletions torrent/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type torrent struct {
// Bitfield for pieces we have. It is created after we got info.
bitfield *bitfield.Bitfield

// Protects bitfield writing from torrent loop and reading from announcer loop.
mBitfield sync.RWMutex

// Unique peer ID is generated per downloader.
peerID [20]byte

Expand Down Expand Up @@ -190,9 +193,6 @@ type torrent struct {
// When metadata of the torrent downloaded completely, a message is sent to this channel.
infoDownloaderResultC chan *infodownloader.InfoDownloader

// Announcers send a request to this channel to get information about the torrent.
announcerRequestC chan *announcer.Request

// A ticker that ticks periodically to keep a certain number of peers unchoked.
unchokeTicker *time.Ticker

Expand Down
2 changes: 2 additions & 0 deletions torrent/verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func (t *torrent) handleVerificationDone(ve *verifier.Verifier) {
}

// Now we have a constructed and verified bitfield.
t.mBitfield.Lock()
t.bitfield = ve.Bitfield
t.mBitfield.Unlock()

// Save the bitfield to resume db.
if t.resume != nil {
Expand Down

0 comments on commit 14ed4dc

Please sign in to comment.