From 6b81d57ca211f39a11f0ec250e704bd7d6c0a237 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 1 Apr 2015 17:29:55 +1100 Subject: [PATCH] dht: Clean-up interface, add loads of documentation --- README.md | 13 ++- client.go | 14 ++-- cmd/dht-get-peers/main.go | 9 +- cmd/dht-ping/main.go | 2 +- cmd/dht-server/main.go | 7 +- dht/announce.go | 85 +++++++++---------- dht/dht.go | 167 +++++++++++++++++++++++--------------- dht/dht_test.go | 4 +- 8 files changed, 170 insertions(+), 131 deletions(-) diff --git a/README.md b/README.md index 47b6d280..2354fe9c 100644 --- a/README.md +++ b/README.md @@ -55,4 +55,15 @@ torrentfs mounts a FUSE filesystem at `-mountDir`. The contents are the torrents Creates a magnet link from a torrent file. Note the extracted trackers, display name, and info hash. $ godo github.com/anacrolix/torrent/cmd/torrent-magnet < ubuntu-14.04.2-desktop-amd64.iso.torrent - magnet:?xt=urn:btih:546cf15f724d19c4319cc17b179d7e035f89c1f4&dn=ubuntu-14.04.2-desktop-amd64.iso&tr=http%3A%2F%2Ftorrent.ubuntu.com%3A6969%2Fannounce&tr=http%3A%2F%2Fipv6.torrent.ubuntu.com%3A6969%2Fannounce \ No newline at end of file + magnet:?xt=urn:btih:546cf15f724d19c4319cc17b179d7e035f89c1f4&dn=ubuntu-14.04.2-desktop-amd64.iso&tr=http%3A%2F%2Ftorrent.ubuntu.com%3A6969%2Fannounce&tr=http%3A%2F%2Fipv6.torrent.ubuntu.com%3A6969%2Fannounce + +### dht-ping + +Pings DHT nodes with the given network addresses. + + $ godo ./cmd/dht-ping router.bittorrent.com:6881 router.utorrent.com:6881 + 2015/04/01 17:21:23 main.go:33: dht server on [::]:60058 + 32f54e697351ff4aec29cdbaabf2fbe3467cc267 (router.bittorrent.com:6881): 648.218621ms + ebff36697351ff4aec29cdbaabf2fbe3467cc267 (router.utorrent.com:6881): 873.864706ms + 2/2 responses (100.000000%) + diff --git a/client.go b/client.go index a3785d42..f6533a64 100644 --- a/client.go +++ b/client.go @@ -231,11 +231,11 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID) if cl.dHT != nil { dhtStats := cl.dHT.Stats() - fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes) - fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString()) - fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr())) - fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces) - fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.NumOutstandingTransactions) + fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.Nodes, dhtStats.GoodNodes) + fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID()) + fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.Addr())) + fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces) + fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions) } fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents)) fmt.Fprintln(w) @@ -1269,7 +1269,7 @@ func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) { if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil { conn.Post(pp.Message{ Type: pp.Port, - Port: uint16(AddrPort(me.dHT.LocalAddr())), + Port: uint16(AddrPort(me.dHT.Addr())), }) } } @@ -2407,7 +2407,7 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { getPeers: for { select { - case v, ok := <-ps.Values: + case v, ok := <-ps.Peers: if !ok { break getPeers } diff --git a/cmd/dht-get-peers/main.go b/cmd/dht-get-peers/main.go index edea0f46..d1f3a116 100644 --- a/cmd/dht-get-peers/main.go +++ b/cmd/dht-get-peers/main.go @@ -16,11 +16,6 @@ import ( _ "github.com/anacrolix/torrent/util/profile" ) -type pingResponse struct { - addr string - krpc dht.Msg -} - var ( tableFileName = flag.String("tableFile", "", "name of file for storing node info") serveAddr = flag.String("serveAddr", ":0", "local UDP address") @@ -89,7 +84,7 @@ func init() { if err != nil { log.Fatalf("error loading table: %s", err) } - log.Printf("dht server on %s, ID is %x", s.LocalAddr(), s.IDString()) + log.Printf("dht server on %s, ID is %x", s.Addr(), s.ID()) setupSignals() } @@ -141,7 +136,7 @@ getPeers: values: for { select { - case v, ok := <-ps.Values: + case v, ok := <-ps.Peers: if !ok { break values } diff --git a/cmd/dht-ping/main.go b/cmd/dht-ping/main.go index bdb3fa65..9f8d0734 100644 --- a/cmd/dht-ping/main.go +++ b/cmd/dht-ping/main.go @@ -30,7 +30,7 @@ func main() { if err != nil { log.Fatal(err) } - log.Printf("dht server on %s", s.LocalAddr()) + log.Printf("dht server on %s", s.Addr()) pingResponses := make(chan pingResponse) timeoutChan := make(chan struct{}) go func() { diff --git a/cmd/dht-server/main.go b/cmd/dht-server/main.go index 3bcf5b03..19799ce8 100644 --- a/cmd/dht-server/main.go +++ b/cmd/dht-server/main.go @@ -11,11 +11,6 @@ import ( "github.com/anacrolix/torrent/dht" ) -type pingResponse struct { - addr string - krpc dht.Msg -} - var ( tableFileName = flag.String("tableFile", "", "name of file for storing node info") serveAddr = flag.String("serveAddr", ":0", "local UDP address") @@ -71,7 +66,7 @@ func init() { if err != nil { log.Fatalf("error loading table: %s", err) } - log.Printf("dht server on %s, ID is %q", s.LocalAddr(), s.IDString()) + log.Printf("dht server on %s, ID is %q", s.Addr(), s.ID()) setupSignals() } diff --git a/dht/announce.go b/dht/announce.go index db221a64..4aadd454 100644 --- a/dht/announce.go +++ b/dht/announce.go @@ -13,8 +13,14 @@ import ( "github.com/anacrolix/torrent/util" ) -type peerDiscovery struct { - *peerStream +// Maintains state for an ongoing Announce operation. An Announce is started +// by calling Server.Announce. +type Announce struct { + mu sync.Mutex + Peers chan PeersValues + // Inner chan is set to nil when on close. + values chan PeersValues + stop chan struct{} triedAddrs *bloom.BloomFilter pending int server *Server @@ -24,13 +30,18 @@ type peerDiscovery struct { announcePortImplied bool } -func (pd *peerDiscovery) NumContacted() int { - pd.mu.Lock() - defer pd.mu.Unlock() - return pd.numContacted +// Returns the number of distinct remote addresses the announce has queried. +func (me *Announce) NumContacted() int { + me.mu.Lock() + defer me.mu.Unlock() + return me.numContacted } -func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) { +// This is kind of the main thing you want to do with DHT. It traverses the +// graph toward nodes that store peers for the infohash, streaming them to the +// caller, and announcing the local node to each node if allowed and +// specified. +func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announce, error) { s.mu.Lock() startAddrs := func() (ret []dHTAddr) { for _, n := range s.closestGoodNodes(160, infoHash) { @@ -48,12 +59,10 @@ func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDis startAddrs = append(startAddrs, newDHTAddr(addr)) } } - disc := &peerDiscovery{ - peerStream: &peerStream{ - Values: make(chan peerStreamValue, 100), - stop: make(chan struct{}), - values: make(chan peerStreamValue), - }, + disc := &Announce{ + Peers: make(chan PeersValues, 100), + stop: make(chan struct{}), + values: make(chan PeersValues), triedAddrs: bloom.NewWithEstimates(1000, 0.5), server: s, infoHash: infoHash, @@ -62,12 +71,12 @@ func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDis } // Function ferries from values to Values until discovery is halted. go func() { - defer close(disc.Values) + defer close(disc.Peers) for { select { case psv := <-disc.values: select { - case disc.Values <- psv: + case disc.Peers <- psv: case <-disc.stop: return } @@ -87,7 +96,7 @@ func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDis return disc, nil } -func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) { +func (me *Announce) gotNodeAddr(addr dHTAddr) { if util.AddrPort(addr) == 0 { // Not a contactable address. return @@ -101,7 +110,7 @@ func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) { me.contact(addr) } -func (me *peerDiscovery) contact(addr dHTAddr) { +func (me *Announce) contact(addr dHTAddr) { me.numContacted++ me.triedAddrs.Add([]byte(addr.String())) if err := me.getPeers(addr); err != nil { @@ -111,7 +120,7 @@ func (me *peerDiscovery) contact(addr dHTAddr) { me.pending++ } -func (me *peerDiscovery) transactionClosed() { +func (me *Announce) transactionClosed() { me.pending-- if me.pending == 0 { me.close() @@ -119,15 +128,15 @@ func (me *peerDiscovery) transactionClosed() { } } -func (me *peerDiscovery) responseNode(node NodeInfo) { +func (me *Announce) responseNode(node NodeInfo) { me.gotNodeAddr(node.Addr) } -func (me *peerDiscovery) closingCh() chan struct{} { - return me.peerStream.stop +func (me *Announce) closingCh() chan struct{} { + return me.stop } -func (me *peerDiscovery) announcePeer(to dHTAddr, token string) { +func (me *Announce) announcePeer(to dHTAddr, token string) { me.server.mu.Lock() err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied) me.server.mu.Unlock() @@ -136,7 +145,7 @@ func (me *peerDiscovery) announcePeer(to dHTAddr, token string) { } } -func (me *peerDiscovery) getPeers(addr dHTAddr) error { +func (me *Announce) getPeers(addr dHTAddr) error { me.server.mu.Lock() defer me.server.mu.Unlock() t, err := me.server.getPeers(addr, me.infoHash) @@ -157,11 +166,11 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error { } copy(nodeInfo.ID[:], m.ID()) select { - case me.peerStream.values <- peerStreamValue{ + case me.values <- PeersValues{ Peers: vs, NodeInfo: nodeInfo, }: - case <-me.peerStream.stop: + case <-me.stop: } } @@ -176,28 +185,22 @@ func (me *peerDiscovery) getPeers(addr dHTAddr) error { return nil } -type peerStreamValue struct { +// Corresponds to the "values" key in a get_peers KRPC response. A list of +// peers that a node has reported as being in the swarm for a queried info +// hash. +type PeersValues struct { Peers []util.CompactPeer // Peers given in get_peers response. NodeInfo // The node that gave the response. } -// TODO: This was to be the shared publicly accessible part returned by DHT -// functions that stream peers. Possibly not necessary anymore. -type peerStream struct { - mu sync.Mutex - Values chan peerStreamValue - // Inner chan is set to nil when on close. - values chan peerStreamValue - stop chan struct{} -} - -func (ps *peerStream) Close() { - ps.mu.Lock() - defer ps.mu.Unlock() - ps.close() +// Stop the announce. +func (me *Announce) Close() { + me.mu.Lock() + defer me.mu.Unlock() + me.close() } -func (ps *peerStream) close() { +func (ps *Announce) close() { select { case <-ps.stop: default: diff --git a/dht/dht.go b/dht/dht.go index 5f709934..737449e5 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -1,3 +1,8 @@ +// Package DHT implements a DHT for use with the BitTorrent protocol, +// described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html. +// +// Standard use involves creating a NewServer, and calling Announce on it with +// the details of your local torrent client and infohash of interest. package dht import ( @@ -36,15 +41,15 @@ type transactionKey struct { type Server struct { id string socket net.PacketConn - transactions map[transactionKey]*transaction + transactions map[transactionKey]*Transaction transactionIDInt uint64 - nodes map[string]*Node // Keyed by dHTAddr.String(). + nodes map[string]*node // Keyed by dHTAddr.String(). mu sync.Mutex closed chan struct{} passive bool // Don't respond to queries. ipBlockList *iplist.IPList - NumConfirmedAnnounces int + numConfirmedAnnounces int } type dHTAddr interface { @@ -74,31 +79,42 @@ func newDHTAddr(addr net.Addr) dHTAddr { } type ServerConfig struct { - Addr string - Conn net.PacketConn - Passive bool // Don't respond to queries. -} - -type serverStats struct { - NumGoodNodes int - NumNodes int - NumOutstandingTransactions int -} - -func (s *Server) Stats() (ss serverStats) { + Addr string // Listen address. Used if Conn is nil. + Conn net.PacketConn + // Don't respond to queries from other nodes. + Passive bool +} + +type ServerStats struct { + // Count of nodes in the node table that responded to our last query or + // haven't yet been queried. + GoodNodes int + // Count of nodes in the node table. + Nodes int + // Transactions awaiting a response. + OutstandingTransactions int + // Individual announce_peer requests that got a success response. + ConfirmedAnnounces int +} + +// Returns statistics for the server. +func (s *Server) Stats() (ss ServerStats) { s.mu.Lock() defer s.mu.Unlock() for _, n := range s.nodes { if n.DefinitelyGood() { - ss.NumGoodNodes++ + ss.GoodNodes++ } } - ss.NumNodes = len(s.nodes) - ss.NumOutstandingTransactions = len(s.transactions) + ss.Nodes = len(s.nodes) + ss.OutstandingTransactions = len(s.transactions) + ss.ConfirmedAnnounces = s.numConfirmedAnnounces return } -func (s *Server) LocalAddr() net.Addr { +// Returns the listen address for the server. Packets arriving to this address +// are processed by the server (unless aliens are involved). +func (s *Server) Addr() net.Addr { return s.socket.LocalAddr() } @@ -111,6 +127,7 @@ func makeSocket(addr string) (socket *net.UDPConn, err error) { return } +// Create a new DHT server. func NewServer(c *ServerConfig) (s *Server, err error) { if c == nil { c = &ServerConfig{} @@ -149,6 +166,7 @@ func NewServer(c *ServerConfig) (s *Server, err error) { return } +// Returns a description of the Server. Python repr-style. func (s *Server) String() string { return fmt.Sprintf("dht server on %s", s.socket.LocalAddr()) } @@ -184,7 +202,7 @@ func (nid *nodeID) String() string { return string(nid.i.Bytes()) } -type Node struct { +type node struct { addr dHTAddr id nodeID announceToken string @@ -194,24 +212,24 @@ type Node struct { lastSentQuery time.Time } -func (n *Node) idString() string { +func (n *node) idString() string { return n.id.String() } -func (n *Node) SetIDFromBytes(b []byte) { +func (n *node) SetIDFromBytes(b []byte) { n.id.i.SetBytes(b) n.id.set = true } -func (n *Node) SetIDFromString(s string) { +func (n *node) SetIDFromString(s string) { n.id.i.SetBytes([]byte(s)) } -func (n *Node) IDNotSet() bool { +func (n *node) IDNotSet() bool { return n.id.i.Int64() == 0 } -func (n *Node) NodeInfo() (ret NodeInfo) { +func (n *node) NodeInfo() (ret NodeInfo) { ret.Addr = n.addr if n := copy(ret.ID[:], n.idString()); n != 20 { panic(n) @@ -219,7 +237,7 @@ func (n *Node) NodeInfo() (ret NodeInfo) { return } -func (n *Node) DefinitelyGood() bool { +func (n *node) DefinitelyGood() bool { if len(n.idString()) != 20 { return false } @@ -234,6 +252,10 @@ func (n *Node) DefinitelyGood() bool { return true } +// A wrapper around the unmarshalled KRPC dict that constitutes messages in +// the DHT. There are various helpers for extracting common data from the +// message. In normal use, Msg is abstracted away for you, but it can be of +// interest. type Msg map[string]interface{} var _ fmt.Stringer = Msg{} @@ -316,7 +338,7 @@ func (m Msg) AnnounceToken() (token string, ok bool) { return } -type transaction struct { +type Transaction struct { mu sync.Mutex remoteAddr dHTAddr t string @@ -331,14 +353,15 @@ type transaction struct { userOnResponse func(Msg) } -func (t *transaction) SetResponseHandler(f func(Msg)) { +// Set a function to be called with the response. +func (t *Transaction) SetResponseHandler(f func(Msg)) { t.mu.Lock() defer t.mu.Unlock() t.userOnResponse = f t.tryHandleResponse() } -func (t *transaction) tryHandleResponse() { +func (t *Transaction) tryHandleResponse() { if t.userOnResponse == nil { return } @@ -351,7 +374,7 @@ func (t *transaction) tryHandleResponse() { } } -func (t *transaction) Key() transactionKey { +func (t *Transaction) key() transactionKey { return transactionKey{ t.remoteAddr.String(), t.t, @@ -362,11 +385,11 @@ func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duratio return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus))) } -func (t *transaction) startTimer() { +func (t *Transaction) startTimer() { t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback) } -func (t *transaction) timerCallback() { +func (t *Transaction) timerCallback() { t.mu.Lock() defer t.mu.Unlock() select { @@ -385,7 +408,7 @@ func (t *transaction) timerCallback() { } } -func (t *transaction) sendQuery() error { +func (t *Transaction) sendQuery() error { err := t.s.writeToNode(t.queryPacket, t.remoteAddr) if err != nil { return err @@ -394,7 +417,7 @@ func (t *transaction) sendQuery() error { return nil } -func (t *transaction) timeout() { +func (t *Transaction) timeout() { go func() { t.s.mu.Lock() defer t.s.mu.Unlock() @@ -403,7 +426,7 @@ func (t *transaction) timeout() { t.close() } -func (t *transaction) close() { +func (t *Transaction) close() { if t.closing() { return } @@ -419,7 +442,7 @@ func (t *transaction) close() { }() } -func (t *transaction) closing() bool { +func (t *Transaction) closing() bool { select { case <-t.done: return true @@ -428,13 +451,14 @@ func (t *transaction) closing() bool { } } -func (t *transaction) Close() { +// Abandon the transaction. +func (t *Transaction) Close() { t.mu.Lock() defer t.mu.Unlock() t.close() } -func (t *transaction) handleResponse(m Msg) { +func (t *Transaction) handleResponse(m Msg) { t.mu.Lock() if t.closing() { t.mu.Unlock() @@ -475,10 +499,11 @@ func (s *Server) setDefaults() (err error) { } s.id = string(id[:]) } - s.nodes = make(map[string]*Node, 10000) + s.nodes = make(map[string]*node, 10000) return } +// Packets to and from any address matching a range in the list are dropped. func (s *Server) SetIPBlockList(list *iplist.IPList) { s.mu.Lock() defer s.mu.Unlock() @@ -491,7 +516,7 @@ func (s *Server) init() (err error) { return } s.closed = make(chan struct{}) - s.transactions = make(map[transactionKey]*transaction) + s.transactions = make(map[transactionKey]*Transaction) return } @@ -558,11 +583,12 @@ func (s *Server) ipBlocked(ip net.IP) bool { return s.ipBlockList.Lookup(ip) != nil } +// Adds directly to the node table. func (s *Server) AddNode(ni NodeInfo) { s.mu.Lock() defer s.mu.Unlock() if s.nodes == nil { - s.nodes = make(map[string]*Node) + s.nodes = make(map[string]*node) } n := s.getNode(ni.Addr) if n.IDNotSet() { @@ -570,7 +596,7 @@ func (s *Server) AddNode(ni NodeInfo) { } } -func (s *Server) nodeByID(id string) *Node { +func (s *Server) nodeByID(id string) *node { for _, node := range s.nodes { if node.idString() == id { return node @@ -656,7 +682,7 @@ func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) { if r == nil { r = make(map[string]interface{}, 1) } - r["id"] = s.IDString() + r["id"] = s.ID() m := map[string]interface{}{ "t": t, "y": "r", @@ -672,11 +698,11 @@ func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) { } } -func (s *Server) getNode(addr dHTAddr) (n *Node) { +func (s *Server) getNode(addr dHTAddr) (n *node) { addrStr := addr.String() n = s.nodes[addrStr] if n == nil { - n = &Node{ + n = &node{ addr: addr, } if len(s.nodes) < maxNodes { @@ -718,7 +744,7 @@ func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) { return } -func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *transaction { +func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction { return s.transactions[transactionKey{ sourceNode.String(), transactionID}] @@ -731,30 +757,32 @@ func (s *Server) nextTransactionID() string { return string(b[:n]) } -func (s *Server) deleteTransaction(t *transaction) { - delete(s.transactions, t.Key()) +func (s *Server) deleteTransaction(t *Transaction) { + delete(s.transactions, t.key()) } -func (s *Server) addTransaction(t *transaction) { - if _, ok := s.transactions[t.Key()]; ok { +func (s *Server) addTransaction(t *Transaction) { + if _, ok := s.transactions[t.key()]; ok { panic("transaction not unique") } - s.transactions[t.Key()] = t + s.transactions[t.key()] = t } -func (s *Server) IDString() string { +// Returns the 20-byte server ID. This is the ID used to communicate with the +// DHT network. +func (s *Server) ID() string { if len(s.id) != 20 { panic("bad node id") } return s.id } -func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *transaction, err error) { +func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) { tid := s.nextTransactionID() if a == nil { a = make(map[string]interface{}, 1) } - a["id"] = s.IDString() + a["id"] = s.ID() d := map[string]interface{}{ "t": tid, "y": "q", @@ -765,7 +793,7 @@ func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onRespo if err != nil { return } - t = &transaction{ + t = &Transaction{ remoteAddr: node, t: tid, response: make(chan Msg, 1), @@ -784,6 +812,7 @@ func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onRespo return } +// The size in bytes of a NodeInfo in its compact binary representation. const CompactNodeInfoLen = 26 type NodeInfo struct { @@ -791,6 +820,8 @@ type NodeInfo struct { Addr dHTAddr } +// Writes the node info to its compact binary representation in b. See +// CompactNodeInfoLen. func (ni *NodeInfo) PutCompact(b []byte) error { if n := copy(b[:], ni.ID[:]); n != 20 { panic(n) @@ -818,7 +849,8 @@ func (cni *NodeInfo) UnmarshalCompact(b []byte) error { return nil } -func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) { +// Sends a ping query to the address given. +func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) { s.mu.Lock() defer s.mu.Unlock() return s.query(newDHTAddr(node), "ping", nil, nil) @@ -861,7 +893,7 @@ func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token str logonce.Stderr.Printf("announce_peer response: %s", err) return } - s.NumConfirmedAnnounces++ + s.numConfirmedAnnounces++ }) return } @@ -885,7 +917,7 @@ func (s *Server) liftNodes(d Msg) { } // Sends a find_node query to addr. targetID is the node we're looking for. -func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err error) { +func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) { t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) { // Scrape peers from the response to put in the server's table before // handing the response back to the caller. @@ -934,7 +966,7 @@ func (m Msg) Values() (vs []util.CompactPeer) { return } -func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err error) { +func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) { if len(infoHash) != 20 { err = fmt.Errorf("infohash has bad length") return @@ -972,7 +1004,7 @@ func (s *Server) addRootNodes() error { return err } for _, addr := range addrs { - s.nodes[addr.String()] = &Node{ + s.nodes[addr.String()] = &node{ addr: newDHTAddr(addr), } } @@ -992,7 +1024,7 @@ func (s *Server) bootstrap() (err error) { for { var outstanding sync.WaitGroup for _, node := range s.nodes { - var t *transaction + var t *Transaction t, err = s.findNode(node.addr, s.id) if err != nil { err = fmt.Errorf("error sending find_node: %s", err) @@ -1034,12 +1066,14 @@ func (s *Server) numGoodNodes() (num int) { return } +// Returns how many nodes are in the node table. func (s *Server) NumNodes() int { s.mu.Lock() defer s.mu.Unlock() return len(s.nodes) } +// Exports the current node table. func (s *Server) Nodes() (nis []NodeInfo) { s.mu.Lock() defer s.mu.Unlock() @@ -1058,6 +1092,7 @@ func (s *Server) Nodes() (nis []NodeInfo) { return } +// Stops the server network activity. This is all that's required to clean-up a Server. func (s *Server) Close() { s.mu.Lock() select { @@ -1076,13 +1111,13 @@ func init() { maxDistance.SetBit(&zero, 160, 1) } -func (s *Server) closestGoodNodes(k int, targetID string) []*Node { - return s.closestNodes(k, nodeIDFromString(targetID), func(n *Node) bool { return n.DefinitelyGood() }) +func (s *Server) closestGoodNodes(k int, targetID string) []*node { + return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() }) } -func (s *Server) closestNodes(k int, target nodeID, filter func(*Node) bool) []*Node { +func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node { sel := newKClosestNodesSelector(k, target) - idNodes := make(map[string]*Node, len(s.nodes)) + idNodes := make(map[string]*node, len(s.nodes)) for _, node := range s.nodes { if !filter(node) { continue @@ -1091,7 +1126,7 @@ func (s *Server) closestNodes(k int, target nodeID, filter func(*Node) bool) []* idNodes[node.idString()] = node } ids := sel.IDs() - ret := make([]*Node, 0, len(ids)) + ret := make([]*node, 0, len(ids)) for _, id := range ids { ret = append(ret, idNodes[id.String()]) } diff --git a/dht/dht_test.go b/dht/dht_test.go index a2e5d11e..f5e1c63c 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -142,7 +142,7 @@ func TestPing(t *testing.T) { defer srv0.Close() tn, err := srv.Ping(&net.UDPAddr{ IP: []byte{127, 0, 0, 1}, - Port: srv0.LocalAddr().(*net.UDPAddr).Port, + Port: srv0.Addr().(*net.UDPAddr).Port, }) if err != nil { t.Fatal(err) @@ -150,7 +150,7 @@ func TestPing(t *testing.T) { defer tn.Close() ok := make(chan bool) tn.SetResponseHandler(func(msg Msg) { - ok <- msg.ID() == srv0.IDString() + ok <- msg.ID() == srv0.ID() }) if !<-ok { t.FailNow() -- 2.48.1