From: Matt Joiner Date: Thu, 29 Jan 2015 03:20:21 +0000 (+1100) Subject: dht: Provide Announce, that combines get_peers and announce_peer X-Git-Tag: v1.0.0~1356 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=b3380f1ceed535781aafebe10ce1d80ca9e2c69c;p=btrtrc.git dht: Provide Announce, that combines get_peers and announce_peer --- diff --git a/client.go b/client.go index 11be6680..ff8dbf4b 100644 --- a/client.go +++ b/client.go @@ -1855,7 +1855,7 @@ func (cl *Client) waitWantPeers(t *torrent) bool { func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { for cl.waitWantPeers(t) { log.Printf("getting peers for %q from DHT", t) - ps, err := cl.dHT.GetPeers(string(t.InfoHash[:])) + ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort) if err != nil { log.Printf("error getting peers from dht: %s", err) return @@ -1900,22 +1900,6 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { ps.Close() log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs)) - // After a GetPeers, we can announce on the best nodes that gave us an - // announce token. - - port := cl.incomingPeerPort() - // If port is zero, then we're not listening, and there's nothing to - // announce. - if port != 0 { - // We can't allow the port to be implied as long as the UTP and - // DHT ports are different. - err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString()) - if err != nil { - log.Printf("error announcing torrent to DHT: %s", err) - } else { - log.Printf("announced %q to DHT", t) - } - } } } diff --git a/dht/dht.go b/dht/dht.go index 350d6d1d..d50115f3 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -254,12 +254,23 @@ func (m Msg) ID() string { return m[m["y"].(string)].(map[string]interface{})["id"].(string) } -func (m Msg) Nodes() []NodeInfo { - var r findNodeResponse - if err := r.UnmarshalKRPCMsg(m); err != nil { - return nil +// Suggested nodes in a response. +func (m Msg) Nodes() (nodes []NodeInfo) { + b := func() string { + defer func() { + recover() + }() + return m["r"].(map[string]interface{})["nodes"].(string) + }() + for i := 0; i < len(b); i += 26 { + var n NodeInfo + err := n.UnmarshalCompact([]byte(b[i : i+26])) + if err != nil { + continue + } + nodes = append(nodes, n) } - return r.Nodes + return } type KRPCError struct { @@ -292,9 +303,10 @@ func (m Msg) Error() (ret *KRPCError) { // Returns the token given in response to a get_peers request for future // announce_peer requests to that node. -func (m Msg) AnnounceToken() string { +func (m Msg) AnnounceToken() (token string, ok bool) { defer func() { recover() }() - return m["r"].(map[string]interface{})["token"].(string) + token, ok = m["r"].(map[string]interface{})["token"].(string) + return } type transaction struct { @@ -795,6 +807,9 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err } func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) error { + if port == 0 && !impliedPort { + return errors.New("nothing to announce") + } t, err := s.query(node, "announce_peer", map[string]interface{}{ "implied_port": func() int { if impliedPort { @@ -817,38 +832,6 @@ func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token str return err } -type findNodeResponse struct { - Nodes []NodeInfo -} - -func getResponseNodes(m Msg) (s string, err error) { - defer func() { - r := recover() - if r == nil { - return - } - err = fmt.Errorf("couldn't get response nodes: %s: %#v", r, m) - }() - s = m["r"].(map[string]interface{})["nodes"].(string) - return -} - -func (me *findNodeResponse) UnmarshalKRPCMsg(m Msg) error { - b, err := getResponseNodes(m) - if err != nil { - return err - } - for i := 0; i < len(b); i += 26 { - var n NodeInfo - err := n.UnmarshalCompact([]byte(b[i : i+26])) - if err != nil { - return err - } - me.Nodes = append(me.Nodes, n) - } - return nil -} - func (t *transaction) setOnResponse(f func(m Msg)) { if t.onResponse != nil { panic(t.onResponse) @@ -861,23 +844,16 @@ func (s *Server) liftNodes(d Msg) { if d["y"] != "r" { return } - var r findNodeResponse - err := r.UnmarshalKRPCMsg(d) - if err != nil { - // log.Print(err) - } else { - for _, cni := range r.Nodes { - if util.AddrPort(cni.Addr) == 0 { - // TODO: Why would people even do this? - continue - } - if s.ipBlocked(util.AddrIP(cni.Addr)) { - continue - } - n := s.getNode(cni.Addr) - n.SetIDFromBytes(cni.ID[:]) + for _, cni := range d.Nodes() { + if util.AddrPort(cni.Addr) == 0 { + // TODO: Why would people even do this? + continue } - // log.Printf("lifted %d nodes", len(r.Nodes)) + if s.ipBlocked(util.AddrIP(cni.Addr)) { + continue + } + n := s.getNode(cni.Addr) + n.SetIDFromBytes(cni.ID[:]) } } @@ -895,7 +871,9 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err er return } -func extractValues(m Msg) (vs []util.CompactPeer) { +// In a get_peers response, the addresses of torrent clients involved with the +// queried info-hash. +func (m Msg) Values() (vs []util.CompactPeer) { r, ok := m["r"] if !ok { return @@ -941,7 +919,10 @@ func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err er } t.setOnResponse(func(m Msg) { s.liftNodes(m) - s.getNode(addr).announceToken = m.AnnounceToken() + at, ok := m.AnnounceToken() + if ok { + s.getNode(addr).announceToken = at + } }) return } diff --git a/dht/getpeers.go b/dht/getpeers.go index e58a0358..74c366f3 100644 --- a/dht/getpeers.go +++ b/dht/getpeers.go @@ -6,6 +6,8 @@ import ( "log" "time" + "bitbucket.org/anacrolix/go.torrent/logonce" + "bitbucket.org/anacrolix/go.torrent/util" "bitbucket.org/anacrolix/sync" "github.com/willf/bloom" @@ -13,13 +15,22 @@ import ( type peerDiscovery struct { *peerStream - triedAddrs *bloom.BloomFilter - pending int - server *Server - infoHash string + triedAddrs *bloom.BloomFilter + pending int + server *Server + infoHash string + numContacted int + announcePort int + announcePortImplied bool +} + +func (pd *peerDiscovery) NumContacted() int { + pd.mu.Lock() + defer pd.mu.Unlock() + return pd.numContacted } -func (s *Server) GetPeers(infoHash string) (*peerStream, error) { +func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) { s.mu.Lock() startAddrs := func() (ret []dHTAddr) { for _, n := range s.closestGoodNodes(160, infoHash) { @@ -39,13 +50,15 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) { } disc := &peerDiscovery{ peerStream: &peerStream{ - Values: make(chan peerStreamValue), + Values: make(chan peerStreamValue, 100), stop: make(chan struct{}), values: make(chan peerStreamValue), }, - triedAddrs: bloom.NewWithEstimates(1000, 0.5), - server: s, - infoHash: infoHash, + triedAddrs: bloom.NewWithEstimates(1000, 0.5), + server: s, + infoHash: infoHash, + announcePort: port, + announcePortImplied: impliedPort, } // Function ferries from values to Values until discovery is halted. go func() { @@ -71,7 +84,7 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) { disc.contact(addr) disc.mu.Unlock() } - return disc.peerStream, nil + return disc, nil } func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) { @@ -89,6 +102,7 @@ func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) { } func (me *peerDiscovery) contact(addr dHTAddr) { + me.numContacted++ me.triedAddrs.Add([]byte(addr.String())) if err := me.getPeers(addr); err != nil { log.Printf("error sending get_peers request to %s: %s", addr, err) @@ -113,6 +127,13 @@ func (me *peerDiscovery) closingCh() chan struct{} { return me.peerStream.stop } +func (me *peerDiscovery) announcePeer(to dHTAddr, token string) { + err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied) + if err != nil { + logonce.Stderr.Printf("error announcing peer: %s", err) + } +} + func (me *peerDiscovery) getPeers(addr dHTAddr) error { me.server.mu.Lock() defer me.server.mu.Unlock() @@ -129,17 +150,12 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error { me.responseNode(n) } me.mu.Unlock() - if vs := extractValues(m); vs != nil { + + if vs := m.Values(); vs != nil { nodeInfo := NodeInfo{ Addr: t.remoteAddr, } - id := func() string { - defer func() { - recover() - }() - return m["r"].(map[string]interface{})["id"].(string) - }() - copy(nodeInfo.ID[:], id) + copy(nodeInfo.ID[:], m.ID()) select { case me.peerStream.values <- peerStreamValue{ Peers: vs, @@ -148,6 +164,10 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error { case <-me.peerStream.stop: } } + + if at, ok := m.AnnounceToken(); ok { + me.announcePeer(addr, at) + } case <-me.closingCh(): } t.Close()