]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix dht.Server.Announce never stopping if the starting addresses can't be contacted
authorMatt Joiner <anacrolix@gmail.com>
Sat, 7 May 2016 08:58:43 +0000 (18:58 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 7 May 2016 08:58:43 +0000 (18:58 +1000)
dht/announce.go
dht/dht_test.go

index 68567fbe12ed0203c3fbfd6c6a0b9eaaf3efbf53..cff0ac731f0951777789f112d7f83a74379f3aec 100644 (file)
@@ -18,14 +18,25 @@ type Announce struct {
        mu    sync.Mutex
        Peers chan PeersValues
        // Inner chan is set to nil when on close.
-       values              chan PeersValues
-       stop                chan struct{}
-       triedAddrs          *bloom.BloomFilter
-       pending             int
-       server              *Server
-       infoHash            string
-       numContacted        int
-       announcePort        int
+       values     chan PeersValues
+       stop       chan struct{}
+       triedAddrs *bloom.BloomFilter
+       // True when contact with all starting addrs has been initiated. This
+       // prevents a race where the first transaction finishes before the rest
+       // have been opened, sees no other transactions are pending and ends the
+       // announce.
+       contactedStartAddrs bool
+       // How many transactions are still ongoing.
+       pending  int
+       server   *Server
+       infoHash string
+       // Count of (probably) distinct addresses we've sent get_peers requests
+       // to.
+       numContacted int
+       // The torrent port that we're announcing.
+       announcePort int
+       // The torrent port should be determined by the receiver in case we're
+       // being NATed.
        announcePortImplied bool
 }
 
@@ -84,17 +95,25 @@ func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announc
                        }
                }
        }()
-       for i, addr := range startAddrs {
-               if i != 0 {
-                       time.Sleep(time.Millisecond)
+       go func() {
+               for i, addr := range startAddrs {
+                       if i != 0 {
+                               time.Sleep(time.Millisecond)
+                       }
+                       disc.mu.Lock()
+                       disc.contact(addr)
+                       disc.mu.Unlock()
                }
-               disc.mu.Lock()
-               disc.contact(addr)
-               disc.mu.Unlock()
-       }
+               disc.contactedStartAddrs = true
+               // If we failed to contact any of the starting addrs, no transactions
+               // will complete triggering a check that there are no pending
+               // responses.
+               disc.maybeClose()
+       }()
        return disc, nil
 }
 
+// TODO: Merge this with maybeGetPeersFromAddr.
 func (a *Announce) gotNodeAddr(addr Addr) {
        if addr.UDPAddr().Port == 0 {
                // Not a contactable address.
@@ -115,6 +134,7 @@ func (a *Announce) gotNodeAddr(addr Addr) {
        a.contact(addr)
 }
 
+// TODO: Merge this with maybeGetPeersFromAddr.
 func (a *Announce) contact(addr Addr) {
        a.numContacted++
        a.triedAddrs.Add([]byte(addr.String()))
@@ -125,14 +145,17 @@ func (a *Announce) contact(addr Addr) {
        a.pending++
 }
 
-func (a *Announce) transactionClosed() {
-       a.pending--
-       if a.pending == 0 {
+func (a *Announce) maybeClose() {
+       if a.contactedStartAddrs && a.pending == 0 {
                a.close()
-               return
        }
 }
 
+func (a *Announce) transactionClosed() {
+       a.pending--
+       a.maybeClose()
+}
+
 func (a *Announce) responseNode(node NodeInfo) {
        a.gotNodeAddr(node.Addr)
 }
index 8464d8053461aaa6c0b16333927fef4e7919aa90..166e8f0cf4a7e3ed5d71981e7b6cd8ff901769e9 100644 (file)
@@ -9,6 +9,7 @@ import (
        "testing"
        "time"
 
+       _ "github.com/anacrolix/envpprof"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )