]> Sergey Matveev's repositories - btrtrc.git/commitdiff
dht: Provide Announce, that combines get_peers and announce_peer
authorMatt Joiner <anacrolix@gmail.com>
Thu, 29 Jan 2015 03:20:21 +0000 (14:20 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 29 Jan 2015 03:20:21 +0000 (14:20 +1100)
client.go
dht/dht.go
dht/getpeers.go

index 11be66800083fa2415fbba0422da3023ed9ce602..ff8dbf4bf65abc82cc5727831ed20efff3f744a2 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1855,7 +1855,7 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
        for cl.waitWantPeers(t) {
                log.Printf("getting peers for %q from DHT", t)
-               ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
+               ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
                        log.Printf("error getting peers from dht: %s", err)
                        return
@@ -1900,22 +1900,6 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
                ps.Close()
                log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
 
-               // After a GetPeers, we can announce on the best nodes that gave us an
-               // announce token.
-
-               port := cl.incomingPeerPort()
-               // If port is zero, then we're not listening, and there's nothing to
-               // announce.
-               if port != 0 {
-                       // We can't allow the port to be implied as long as the UTP and
-                       // DHT ports are different.
-                       err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
-                       if err != nil {
-                               log.Printf("error announcing torrent to DHT: %s", err)
-                       } else {
-                               log.Printf("announced %q to DHT", t)
-                       }
-               }
        }
 }
 
index 350d6d1dae4a883b5b8fef004e55d0cb311074b5..d50115f306837c428ca121b0c59e9bd18c2d8267 100644 (file)
@@ -254,12 +254,23 @@ func (m Msg) ID() string {
        return m[m["y"].(string)].(map[string]interface{})["id"].(string)
 }
 
-func (m Msg) Nodes() []NodeInfo {
-       var r findNodeResponse
-       if err := r.UnmarshalKRPCMsg(m); err != nil {
-               return nil
+// Suggested nodes in a response.
+func (m Msg) Nodes() (nodes []NodeInfo) {
+       b := func() string {
+               defer func() {
+                       recover()
+               }()
+               return m["r"].(map[string]interface{})["nodes"].(string)
+       }()
+       for i := 0; i < len(b); i += 26 {
+               var n NodeInfo
+               err := n.UnmarshalCompact([]byte(b[i : i+26]))
+               if err != nil {
+                       continue
+               }
+               nodes = append(nodes, n)
        }
-       return r.Nodes
+       return
 }
 
 type KRPCError struct {
@@ -292,9 +303,10 @@ func (m Msg) Error() (ret *KRPCError) {
 
 // Returns the token given in response to a get_peers request for future
 // announce_peer requests to that node.
-func (m Msg) AnnounceToken() string {
+func (m Msg) AnnounceToken() (token string, ok bool) {
        defer func() { recover() }()
-       return m["r"].(map[string]interface{})["token"].(string)
+       token, ok = m["r"].(map[string]interface{})["token"].(string)
+       return
 }
 
 type transaction struct {
@@ -795,6 +807,9 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err
 }
 
 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) error {
+       if port == 0 && !impliedPort {
+               return errors.New("nothing to announce")
+       }
        t, err := s.query(node, "announce_peer", map[string]interface{}{
                "implied_port": func() int {
                        if impliedPort {
@@ -817,38 +832,6 @@ func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token str
        return err
 }
 
-type findNodeResponse struct {
-       Nodes []NodeInfo
-}
-
-func getResponseNodes(m Msg) (s string, err error) {
-       defer func() {
-               r := recover()
-               if r == nil {
-                       return
-               }
-               err = fmt.Errorf("couldn't get response nodes: %s: %#v", r, m)
-       }()
-       s = m["r"].(map[string]interface{})["nodes"].(string)
-       return
-}
-
-func (me *findNodeResponse) UnmarshalKRPCMsg(m Msg) error {
-       b, err := getResponseNodes(m)
-       if err != nil {
-               return err
-       }
-       for i := 0; i < len(b); i += 26 {
-               var n NodeInfo
-               err := n.UnmarshalCompact([]byte(b[i : i+26]))
-               if err != nil {
-                       return err
-               }
-               me.Nodes = append(me.Nodes, n)
-       }
-       return nil
-}
-
 func (t *transaction) setOnResponse(f func(m Msg)) {
        if t.onResponse != nil {
                panic(t.onResponse)
@@ -861,23 +844,16 @@ func (s *Server) liftNodes(d Msg) {
        if d["y"] != "r" {
                return
        }
-       var r findNodeResponse
-       err := r.UnmarshalKRPCMsg(d)
-       if err != nil {
-               // log.Print(err)
-       } else {
-               for _, cni := range r.Nodes {
-                       if util.AddrPort(cni.Addr) == 0 {
-                               // TODO: Why would people even do this?
-                               continue
-                       }
-                       if s.ipBlocked(util.AddrIP(cni.Addr)) {
-                               continue
-                       }
-                       n := s.getNode(cni.Addr)
-                       n.SetIDFromBytes(cni.ID[:])
+       for _, cni := range d.Nodes() {
+               if util.AddrPort(cni.Addr) == 0 {
+                       // TODO: Why would people even do this?
+                       continue
                }
-               // log.Printf("lifted %d nodes", len(r.Nodes))
+               if s.ipBlocked(util.AddrIP(cni.Addr)) {
+                       continue
+               }
+               n := s.getNode(cni.Addr)
+               n.SetIDFromBytes(cni.ID[:])
        }
 }
 
@@ -895,7 +871,9 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err er
        return
 }
 
-func extractValues(m Msg) (vs []util.CompactPeer) {
+// In a get_peers response, the addresses of torrent clients involved with the
+// queried info-hash.
+func (m Msg) Values() (vs []util.CompactPeer) {
        r, ok := m["r"]
        if !ok {
                return
@@ -941,7 +919,10 @@ func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err er
        }
        t.setOnResponse(func(m Msg) {
                s.liftNodes(m)
-               s.getNode(addr).announceToken = m.AnnounceToken()
+               at, ok := m.AnnounceToken()
+               if ok {
+                       s.getNode(addr).announceToken = at
+               }
        })
        return
 }
index e58a0358a5679b9bc6a99dcce7d8f4ffa71171fd..74c366f38bc52a9b17b436ba6debca14566e64f9 100644 (file)
@@ -6,6 +6,8 @@ import (
        "log"
        "time"
 
+       "bitbucket.org/anacrolix/go.torrent/logonce"
+
        "bitbucket.org/anacrolix/go.torrent/util"
        "bitbucket.org/anacrolix/sync"
        "github.com/willf/bloom"
@@ -13,13 +15,22 @@ import (
 
 type peerDiscovery struct {
        *peerStream
-       triedAddrs *bloom.BloomFilter
-       pending    int
-       server     *Server
-       infoHash   string
+       triedAddrs          *bloom.BloomFilter
+       pending             int
+       server              *Server
+       infoHash            string
+       numContacted        int
+       announcePort        int
+       announcePortImplied bool
+}
+
+func (pd *peerDiscovery) NumContacted() int {
+       pd.mu.Lock()
+       defer pd.mu.Unlock()
+       return pd.numContacted
 }
 
-func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
+func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) {
        s.mu.Lock()
        startAddrs := func() (ret []dHTAddr) {
                for _, n := range s.closestGoodNodes(160, infoHash) {
@@ -39,13 +50,15 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
        }
        disc := &peerDiscovery{
                peerStream: &peerStream{
-                       Values: make(chan peerStreamValue),
+                       Values: make(chan peerStreamValue, 100),
                        stop:   make(chan struct{}),
                        values: make(chan peerStreamValue),
                },
-               triedAddrs: bloom.NewWithEstimates(1000, 0.5),
-               server:     s,
-               infoHash:   infoHash,
+               triedAddrs:          bloom.NewWithEstimates(1000, 0.5),
+               server:              s,
+               infoHash:            infoHash,
+               announcePort:        port,
+               announcePortImplied: impliedPort,
        }
        // Function ferries from values to Values until discovery is halted.
        go func() {
@@ -71,7 +84,7 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
                disc.contact(addr)
                disc.mu.Unlock()
        }
-       return disc.peerStream, nil
+       return disc, nil
 }
 
 func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
@@ -89,6 +102,7 @@ func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
 }
 
 func (me *peerDiscovery) contact(addr dHTAddr) {
+       me.numContacted++
        me.triedAddrs.Add([]byte(addr.String()))
        if err := me.getPeers(addr); err != nil {
                log.Printf("error sending get_peers request to %s: %s", addr, err)
@@ -113,6 +127,13 @@ func (me *peerDiscovery) closingCh() chan struct{} {
        return me.peerStream.stop
 }
 
+func (me *peerDiscovery) announcePeer(to dHTAddr, token string) {
+       err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
+       if err != nil {
+               logonce.Stderr.Printf("error announcing peer: %s", err)
+       }
+}
+
 func (me *peerDiscovery) getPeers(addr dHTAddr) error {
        me.server.mu.Lock()
        defer me.server.mu.Unlock()
@@ -129,17 +150,12 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error {
                                me.responseNode(n)
                        }
                        me.mu.Unlock()
-                       if vs := extractValues(m); vs != nil {
+
+                       if vs := m.Values(); vs != nil {
                                nodeInfo := NodeInfo{
                                        Addr: t.remoteAddr,
                                }
-                               id := func() string {
-                                       defer func() {
-                                               recover()
-                                       }()
-                                       return m["r"].(map[string]interface{})["id"].(string)
-                               }()
-                               copy(nodeInfo.ID[:], id)
+                               copy(nodeInfo.ID[:], m.ID())
                                select {
                                case me.peerStream.values <- peerStreamValue{
                                        Peers:    vs,
@@ -148,6 +164,10 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error {
                                case <-me.peerStream.stop:
                                }
                        }
+
+                       if at, ok := m.AnnounceToken(); ok {
+                               me.announcePeer(addr, at)
+                       }
                case <-me.closingCh():
                }
                t.Close()