func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
for cl.waitWantPeers(t) {
log.Printf("getting peers for %q from DHT", t)
- ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
+ ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
log.Printf("error getting peers from dht: %s", err)
return
ps.Close()
log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
- // After a GetPeers, we can announce on the best nodes that gave us an
- // announce token.
-
- port := cl.incomingPeerPort()
- // If port is zero, then we're not listening, and there's nothing to
- // announce.
- if port != 0 {
- // We can't allow the port to be implied as long as the UTP and
- // DHT ports are different.
- err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
- if err != nil {
- log.Printf("error announcing torrent to DHT: %s", err)
- } else {
- log.Printf("announced %q to DHT", t)
- }
- }
}
}
return m[m["y"].(string)].(map[string]interface{})["id"].(string)
}
-func (m Msg) Nodes() []NodeInfo {
- var r findNodeResponse
- if err := r.UnmarshalKRPCMsg(m); err != nil {
- return nil
+// Suggested nodes in a response.
+func (m Msg) Nodes() (nodes []NodeInfo) {
+ b := func() string {
+ defer func() {
+ recover()
+ }()
+ return m["r"].(map[string]interface{})["nodes"].(string)
+ }()
+ for i := 0; i < len(b); i += 26 {
+ var n NodeInfo
+ err := n.UnmarshalCompact([]byte(b[i : i+26]))
+ if err != nil {
+ continue
+ }
+ nodes = append(nodes, n)
}
- return r.Nodes
+ return
}
type KRPCError struct {
// Returns the token given in response to a get_peers request for future
// announce_peer requests to that node.
-func (m Msg) AnnounceToken() string {
+func (m Msg) AnnounceToken() (token string, ok bool) {
defer func() { recover() }()
- return m["r"].(map[string]interface{})["token"].(string)
+ token, ok = m["r"].(map[string]interface{})["token"].(string)
+ return
}
type transaction struct {
}
func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) error {
+ if port == 0 && !impliedPort {
+ return errors.New("nothing to announce")
+ }
t, err := s.query(node, "announce_peer", map[string]interface{}{
"implied_port": func() int {
if impliedPort {
return err
}
-type findNodeResponse struct {
- Nodes []NodeInfo
-}
-
-func getResponseNodes(m Msg) (s string, err error) {
- defer func() {
- r := recover()
- if r == nil {
- return
- }
- err = fmt.Errorf("couldn't get response nodes: %s: %#v", r, m)
- }()
- s = m["r"].(map[string]interface{})["nodes"].(string)
- return
-}
-
-func (me *findNodeResponse) UnmarshalKRPCMsg(m Msg) error {
- b, err := getResponseNodes(m)
- if err != nil {
- return err
- }
- for i := 0; i < len(b); i += 26 {
- var n NodeInfo
- err := n.UnmarshalCompact([]byte(b[i : i+26]))
- if err != nil {
- return err
- }
- me.Nodes = append(me.Nodes, n)
- }
- return nil
-}
-
func (t *transaction) setOnResponse(f func(m Msg)) {
if t.onResponse != nil {
panic(t.onResponse)
if d["y"] != "r" {
return
}
- var r findNodeResponse
- err := r.UnmarshalKRPCMsg(d)
- if err != nil {
- // log.Print(err)
- } else {
- for _, cni := range r.Nodes {
- if util.AddrPort(cni.Addr) == 0 {
- // TODO: Why would people even do this?
- continue
- }
- if s.ipBlocked(util.AddrIP(cni.Addr)) {
- continue
- }
- n := s.getNode(cni.Addr)
- n.SetIDFromBytes(cni.ID[:])
+ for _, cni := range d.Nodes() {
+ if util.AddrPort(cni.Addr) == 0 {
+ // TODO: Why would people even do this?
+ continue
}
- // log.Printf("lifted %d nodes", len(r.Nodes))
+ if s.ipBlocked(util.AddrIP(cni.Addr)) {
+ continue
+ }
+ n := s.getNode(cni.Addr)
+ n.SetIDFromBytes(cni.ID[:])
}
}
return
}
-func extractValues(m Msg) (vs []util.CompactPeer) {
+// In a get_peers response, the addresses of torrent clients involved with the
+// queried info-hash.
+func (m Msg) Values() (vs []util.CompactPeer) {
r, ok := m["r"]
if !ok {
return
}
t.setOnResponse(func(m Msg) {
s.liftNodes(m)
- s.getNode(addr).announceToken = m.AnnounceToken()
+ at, ok := m.AnnounceToken()
+ if ok {
+ s.getNode(addr).announceToken = at
+ }
})
return
}
"log"
"time"
+ "bitbucket.org/anacrolix/go.torrent/logonce"
+
"bitbucket.org/anacrolix/go.torrent/util"
"bitbucket.org/anacrolix/sync"
"github.com/willf/bloom"
type peerDiscovery struct {
*peerStream
- triedAddrs *bloom.BloomFilter
- pending int
- server *Server
- infoHash string
+ triedAddrs *bloom.BloomFilter
+ pending int
+ server *Server
+ infoHash string
+ numContacted int
+ announcePort int
+ announcePortImplied bool
+}
+
+func (pd *peerDiscovery) NumContacted() int {
+ pd.mu.Lock()
+ defer pd.mu.Unlock()
+ return pd.numContacted
}
-func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
+func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) {
s.mu.Lock()
startAddrs := func() (ret []dHTAddr) {
for _, n := range s.closestGoodNodes(160, infoHash) {
}
disc := &peerDiscovery{
peerStream: &peerStream{
- Values: make(chan peerStreamValue),
+ Values: make(chan peerStreamValue, 100),
stop: make(chan struct{}),
values: make(chan peerStreamValue),
},
- triedAddrs: bloom.NewWithEstimates(1000, 0.5),
- server: s,
- infoHash: infoHash,
+ triedAddrs: bloom.NewWithEstimates(1000, 0.5),
+ server: s,
+ infoHash: infoHash,
+ announcePort: port,
+ announcePortImplied: impliedPort,
}
// Function ferries from values to Values until discovery is halted.
go func() {
disc.contact(addr)
disc.mu.Unlock()
}
- return disc.peerStream, nil
+ return disc, nil
}
func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
}
func (me *peerDiscovery) contact(addr dHTAddr) {
+ me.numContacted++
me.triedAddrs.Add([]byte(addr.String()))
if err := me.getPeers(addr); err != nil {
log.Printf("error sending get_peers request to %s: %s", addr, err)
return me.peerStream.stop
}
+func (me *peerDiscovery) announcePeer(to dHTAddr, token string) {
+ err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
+ if err != nil {
+ logonce.Stderr.Printf("error announcing peer: %s", err)
+ }
+}
+
func (me *peerDiscovery) getPeers(addr dHTAddr) error {
me.server.mu.Lock()
defer me.server.mu.Unlock()
me.responseNode(n)
}
me.mu.Unlock()
- if vs := extractValues(m); vs != nil {
+
+ if vs := m.Values(); vs != nil {
nodeInfo := NodeInfo{
Addr: t.remoteAddr,
}
- id := func() string {
- defer func() {
- recover()
- }()
- return m["r"].(map[string]interface{})["id"].(string)
- }()
- copy(nodeInfo.ID[:], id)
+ copy(nodeInfo.ID[:], m.ID())
select {
case me.peerStream.values <- peerStreamValue{
Peers: vs,
case <-me.peerStream.stop:
}
}
+
+ if at, ok := m.AnnounceToken(); ok {
+ me.announcePeer(addr, at)
+ }
case <-me.closingCh():
}
t.Close()