From 1d87653738dd5a044b40b598e7f40722eebd8f90 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 7 May 2016 18:58:43 +1000 Subject: [PATCH] Fix dht.Server.Announce never stopping if the starting addresses can't be contacted --- dht/announce.go | 61 ++++++++++++++++++++++++++++++++++--------------- dht/dht_test.go | 1 + 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/dht/announce.go b/dht/announce.go index 68567fbe..cff0ac73 100644 --- a/dht/announce.go +++ b/dht/announce.go @@ -18,14 +18,25 @@ type Announce struct { mu sync.Mutex Peers chan PeersValues // Inner chan is set to nil when on close. - values chan PeersValues - stop chan struct{} - triedAddrs *bloom.BloomFilter - pending int - server *Server - infoHash string - numContacted int - announcePort int + values chan PeersValues + stop chan struct{} + triedAddrs *bloom.BloomFilter + // True when contact with all starting addrs has been initiated. This + // prevents a race where the first transaction finishes before the rest + // have been opened, sees no other transactions are pending and ends the + // announce. + contactedStartAddrs bool + // How many transactions are still ongoing. + pending int + server *Server + infoHash string + // Count of (probably) distinct addresses we've sent get_peers requests + // to. + numContacted int + // The torrent port that we're announcing. + announcePort int + // The torrent port should be determined by the receiver in case we're + // being NATed. announcePortImplied bool } @@ -84,17 +95,25 @@ func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announc } } }() - for i, addr := range startAddrs { - if i != 0 { - time.Sleep(time.Millisecond) + go func() { + for i, addr := range startAddrs { + if i != 0 { + time.Sleep(time.Millisecond) + } + disc.mu.Lock() + disc.contact(addr) + disc.mu.Unlock() } - disc.mu.Lock() - disc.contact(addr) - disc.mu.Unlock() - } + disc.contactedStartAddrs = true + // If we failed to contact any of the starting addrs, no transactions + // will complete triggering a check that there are no pending + // responses. + disc.maybeClose() + }() return disc, nil } +// TODO: Merge this with maybeGetPeersFromAddr. func (a *Announce) gotNodeAddr(addr Addr) { if addr.UDPAddr().Port == 0 { // Not a contactable address. @@ -115,6 +134,7 @@ func (a *Announce) gotNodeAddr(addr Addr) { a.contact(addr) } +// TODO: Merge this with maybeGetPeersFromAddr. func (a *Announce) contact(addr Addr) { a.numContacted++ a.triedAddrs.Add([]byte(addr.String())) @@ -125,14 +145,17 @@ func (a *Announce) contact(addr Addr) { a.pending++ } -func (a *Announce) transactionClosed() { - a.pending-- - if a.pending == 0 { +func (a *Announce) maybeClose() { + if a.contactedStartAddrs && a.pending == 0 { a.close() - return } } +func (a *Announce) transactionClosed() { + a.pending-- + a.maybeClose() +} + func (a *Announce) responseNode(node NodeInfo) { a.gotNodeAddr(node.Addr) } diff --git a/dht/dht_test.go b/dht/dht_test.go index 8464d805..166e8f0c 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + _ "github.com/anacrolix/envpprof" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -- 2.48.1