"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
}
}
-func (t *Torrent) announceDHT(impliedPort bool) {
+// Adds peers revealed in an announce until the announce ends, or we have
+// enough peers.
+func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
cl := t.cl
+ // Count all the unique addresses we got during this announce.
+ allAddrs := make(map[string]struct{})
for {
select {
- case <-t.wantPeersEvent.LockedChan(&cl.mu):
+ case v, ok := <-pvs:
+ if !ok {
+ return
+ }
+ addPeers := make([]Peer, 0, len(v.Peers))
+ for _, cp := range v.Peers {
+ if cp.Port == 0 {
+ // Can't do anything with this.
+ continue
+ }
+ addPeers = append(addPeers, Peer{
+ IP: cp.IP[:],
+ Port: cp.Port,
+ Source: peerSourceDHT,
+ })
+ key := (&net.UDPAddr{
+ IP: cp.IP[:],
+ Port: cp.Port,
+ }).String()
+ allAddrs[key] = struct{}{}
+ }
+ cl.mu.Lock()
+ t.addPeers(addPeers)
+ numPeers := len(t.peers)
+ cl.mu.Unlock()
+ if numPeers >= torrentPeersHighWater {
+ return
+ }
case <-t.closed.LockedChan(&cl.mu):
return
}
- // log.Printf("getting peers for %q from DHT", t)
- ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
- if err != nil {
- log.Printf("error getting peers from dht: %s", err)
+ }
+}
+
+func (t *Torrent) announceDHT(impliedPort bool) (err error) {
+ cl := t.cl
+ ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
+ if err != nil {
+ return
+ }
+ t.consumeDHTAnnounce(ps.Peers)
+ ps.Close()
+ return
+}
+
+func (t *Torrent) dhtAnnouncer() {
+ cl := t.cl
+ for {
+ select {
+ case <-t.wantPeersEvent.LockedChan(&cl.mu):
+ case <-t.closed.LockedChan(&cl.mu):
return
}
- cl.mu.Lock()
- t.numDHTAnnounces++
- cl.mu.Unlock()
- // Count all the unique addresses we got during this announce.
- allAddrs := make(map[string]struct{})
- getPeers:
- for {
- select {
- case v, ok := <-ps.Peers:
- if !ok {
- break getPeers
- }
- addPeers := make([]Peer, 0, len(v.Peers))
- for _, cp := range v.Peers {
- if cp.Port == 0 {
- // Can't do anything with this.
- continue
- }
- addPeers = append(addPeers, Peer{
- IP: cp.IP[:],
- Port: cp.Port,
- Source: peerSourceDHT,
- })
- key := (&net.UDPAddr{
- IP: cp.IP[:],
- Port: cp.Port,
- }).String()
- allAddrs[key] = struct{}{}
- }
- cl.mu.Lock()
- t.addPeers(addPeers)
- numPeers := len(t.peers)
- cl.mu.Unlock()
- if numPeers >= torrentPeersHighWater {
- break getPeers
- }
- case <-t.closed.LockedChan(&cl.mu):
- ps.Close()
- return
- }
+ err := t.announceDHT(true)
+ if err == nil {
+ cl.mu.Lock()
+ t.numDHTAnnounces++
+ cl.mu.Unlock()
+ } else {
+ log.Printf("error announcing %q to DHT: %s", t, err)
+ }
+ select {
+ case <-t.closed.LockedChan(&cl.mu):
+ return
+ case <-time.After(5 * time.Minute):
}
- ps.Close()
- // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
}
}