"io"
"math"
"math/rand"
- "net"
"net/url"
"os"
"sync"
// Adds peers revealed in an announce until the announce ends, or we have
// enough peers.
-func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
+func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
cl := t.cl
- // Count all the unique addresses we got during this announce.
- allAddrs := make(map[string]struct{})
- for {
- select {
- case v, ok := <-pvs:
- if !ok {
- return
- }
- addPeers := make([]Peer, 0, len(v.Peers))
- for _, cp := range v.Peers {
- if cp.Port == 0 {
- // Can't do anything with this.
- continue
- }
- addPeers = append(addPeers, Peer{
- IP: cp.IP[:],
- Port: cp.Port,
- Source: peerSourceDHTGetPeers,
- })
- key := (&net.UDPAddr{
- IP: cp.IP[:],
- Port: cp.Port,
- }).String()
- allAddrs[key] = struct{}{}
- }
- cl.lock()
- t.addPeers(addPeers)
- numPeers := t.peers.Len()
- cl.unlock()
- if numPeers >= cl.config.TorrentPeersHighWater {
- return
+ for v := range pvs {
+ cl.lock()
+ for _, cp := range v.Peers {
+ if cp.Port == 0 {
+ // Can't do anything with this.
+ continue
}
- case <-t.closed.LockedChan(cl.locker()):
- return
+ t.addPeer(Peer{
+ IP: cp.IP[:],
+ Port: cp.Port,
+ Source: peerSourceDHTGetPeers,
+ })
}
+ cl.unlock()
}
}
-func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
- cl := t.cl
- ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
+func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
+ ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
if err != nil {
- return
+ return err
+ }
+ go t.consumeDhtAnnouncePeers(ps.Peers)
+ select {
+ case <-t.closed.LockedChan(t.cl.locker()):
+ case <-time.After(5 * time.Minute):
+ case <-t.wantPeersEvent.LockedChan(t.cl.locker()):
}
- t.consumeDHTAnnounce(ps.Peers)
ps.Close()
- return
+ return nil
}
func (t *Torrent) dhtAnnouncer(s *dht.Server) {
case <-t.closed.LockedChan(cl.locker()):
return
}
- err := t.announceDHT(true, s)
+ err := t.announceToDht(true, s)
func() {
cl.lock()
defer cl.unlock()
t.logger.Printf("error announcing %q to DHT: %s", t, err)
}
}()
- select {
- case <-t.closed.LockedChan(cl.locker()):
- return
- case <-time.After(5 * time.Minute):
- }
}
}