]> Sergey Matveev's repositories - btrtrc.git/blob - dht/dht.go
Multiple file metainfo mode support in cmd tools
[btrtrc.git] / dht / dht.go
1 // Package DHT implements a DHT for use with the BitTorrent protocol,
2 // described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html.
3 //
4 // Standard use involves creating a NewServer, and calling Announce on it with
5 // the details of your local torrent client and infohash of interest.
6 package dht
7
8 import (
9         "crypto"
10         _ "crypto/sha1"
11         "encoding/binary"
12         "errors"
13         "fmt"
14         "io"
15         "log"
16         "math/big"
17         "math/rand"
18         "net"
19         "os"
20         "time"
21
22         "github.com/anacrolix/sync"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/iplist"
26         "github.com/anacrolix/torrent/logonce"
27         "github.com/anacrolix/torrent/util"
28 )
29
30 const (
31         maxNodes         = 320
32         queryResendEvery = 5 * time.Second
33 )
34
35 // Uniquely identifies a transaction to us.
36 type transactionKey struct {
37         RemoteAddr string // host:port
38         T          string // The KRPC transaction ID.
39 }
40
41 type Server struct {
42         id               string
43         socket           net.PacketConn
44         transactions     map[transactionKey]*Transaction
45         transactionIDInt uint64
46         nodes            map[string]*node // Keyed by dHTAddr.String().
47         mu               sync.Mutex
48         closed           chan struct{}
49         passive          bool // Don't respond to queries.
50         ipBlockList      *iplist.IPList
51
52         numConfirmedAnnounces int
53         bootstrapNodes        []string
54 }
55
56 type dHTAddr interface {
57         net.Addr
58         UDPAddr() *net.UDPAddr
59 }
60
61 type cachedAddr struct {
62         a net.Addr
63         s string
64 }
65
66 func (ca cachedAddr) Network() string {
67         return ca.a.Network()
68 }
69
70 func (ca cachedAddr) String() string {
71         return ca.s
72 }
73
74 func (ca cachedAddr) UDPAddr() *net.UDPAddr {
75         return ca.a.(*net.UDPAddr)
76 }
77
78 func newDHTAddr(addr net.Addr) dHTAddr {
79         return cachedAddr{addr, addr.String()}
80 }
81
82 type ServerConfig struct {
83         Addr string // Listen address. Used if Conn is nil.
84         Conn net.PacketConn
85         // Don't respond to queries from other nodes.
86         Passive bool
87         // DHT Bootstrap nodes
88         BootstrapNodes []string
89 }
90
91 type ServerStats struct {
92         // Count of nodes in the node table that responded to our last query or
93         // haven't yet been queried.
94         GoodNodes int
95         // Count of nodes in the node table.
96         Nodes int
97         // Transactions awaiting a response.
98         OutstandingTransactions int
99         // Individual announce_peer requests that got a success response.
100         ConfirmedAnnounces int
101 }
102
103 // Returns statistics for the server.
104 func (s *Server) Stats() (ss ServerStats) {
105         s.mu.Lock()
106         defer s.mu.Unlock()
107         for _, n := range s.nodes {
108                 if n.DefinitelyGood() {
109                         ss.GoodNodes++
110                 }
111         }
112         ss.Nodes = len(s.nodes)
113         ss.OutstandingTransactions = len(s.transactions)
114         ss.ConfirmedAnnounces = s.numConfirmedAnnounces
115         return
116 }
117
118 // Returns the listen address for the server. Packets arriving to this address
119 // are processed by the server (unless aliens are involved).
120 func (s *Server) Addr() net.Addr {
121         return s.socket.LocalAddr()
122 }
123
124 func makeSocket(addr string) (socket *net.UDPConn, err error) {
125         addr_, err := net.ResolveUDPAddr("", addr)
126         if err != nil {
127                 return
128         }
129         socket, err = net.ListenUDP("udp", addr_)
130         return
131 }
132
133 // Create a new DHT server.
134 func NewServer(c *ServerConfig) (s *Server, err error) {
135         if c == nil {
136                 c = &ServerConfig{}
137         }
138         s = &Server{}
139         if c.Conn != nil {
140                 s.socket = c.Conn
141         } else {
142                 s.socket, err = makeSocket(c.Addr)
143                 if err != nil {
144                         return
145                 }
146         }
147         s.passive = c.Passive
148         s.bootstrapNodes = c.BootstrapNodes
149         err = s.init()
150         if err != nil {
151                 return
152         }
153         go func() {
154                 err := s.serve()
155                 select {
156                 case <-s.closed:
157                         return
158                 default:
159                 }
160                 if err != nil {
161                         panic(err)
162                 }
163         }()
164         go func() {
165                 err := s.bootstrap()
166                 if err != nil {
167                         log.Printf("error bootstrapping DHT: %s", err)
168                 }
169         }()
170         return
171 }
172
173 // Returns a description of the Server. Python repr-style.
174 func (s *Server) String() string {
175         return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
176 }
177
178 type nodeID struct {
179         i   big.Int
180         set bool
181 }
182
183 func (nid *nodeID) IsUnset() bool {
184         return !nid.set
185 }
186
187 func nodeIDFromString(s string) (ret nodeID) {
188         if s == "" {
189                 return
190         }
191         ret.i.SetBytes([]byte(s))
192         ret.set = true
193         return
194 }
195
196 func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) {
197         if nid0.IsUnset() != nid1.IsUnset() {
198                 ret = maxDistance
199                 return
200         }
201         ret.Xor(&nid0.i, &nid1.i)
202         return
203 }
204
205 func (nid *nodeID) String() string {
206         return string(nid.i.Bytes())
207 }
208
209 type node struct {
210         addr          dHTAddr
211         id            nodeID
212         announceToken string
213
214         lastGotQuery    time.Time
215         lastGotResponse time.Time
216         lastSentQuery   time.Time
217 }
218
219 func (n *node) idString() string {
220         return n.id.String()
221 }
222
223 func (n *node) SetIDFromBytes(b []byte) {
224         n.id.i.SetBytes(b)
225         n.id.set = true
226 }
227
228 func (n *node) SetIDFromString(s string) {
229         n.id.i.SetBytes([]byte(s))
230 }
231
232 func (n *node) IDNotSet() bool {
233         return n.id.i.Int64() == 0
234 }
235
236 func (n *node) NodeInfo() (ret NodeInfo) {
237         ret.Addr = n.addr
238         if n := copy(ret.ID[:], n.idString()); n != 20 {
239                 panic(n)
240         }
241         return
242 }
243
244 func (n *node) DefinitelyGood() bool {
245         if len(n.idString()) != 20 {
246                 return false
247         }
248         // No reason to think ill of them if they've never been queried.
249         if n.lastSentQuery.IsZero() {
250                 return true
251         }
252         // They answered our last query.
253         if n.lastSentQuery.Before(n.lastGotResponse) {
254                 return true
255         }
256         return true
257 }
258
259 // A wrapper around the unmarshalled KRPC dict that constitutes messages in
260 // the DHT. There are various helpers for extracting common data from the
261 // message. In normal use, Msg is abstracted away for you, but it can be of
262 // interest.
263 type Msg map[string]interface{}
264
265 var _ fmt.Stringer = Msg{}
266
267 func (m Msg) String() string {
268         return fmt.Sprintf("%#v", m)
269 }
270
271 func (m Msg) T() (t string) {
272         tif, ok := m["t"]
273         if !ok {
274                 return
275         }
276         t, _ = tif.(string)
277         return
278 }
279
280 func (m Msg) ID() string {
281         defer func() {
282                 recover()
283         }()
284         return m[m["y"].(string)].(map[string]interface{})["id"].(string)
285 }
286
287 // Suggested nodes in a response.
288 func (m Msg) Nodes() (nodes []NodeInfo) {
289         b := func() string {
290                 defer func() {
291                         recover()
292                 }()
293                 return m["r"].(map[string]interface{})["nodes"].(string)
294         }()
295         if len(b)%26 != 0 {
296                 return
297         }
298         for i := 0; i < len(b); i += 26 {
299                 var n NodeInfo
300                 err := n.UnmarshalCompact([]byte(b[i : i+26]))
301                 if err != nil {
302                         continue
303                 }
304                 nodes = append(nodes, n)
305         }
306         return
307 }
308
309 type KRPCError struct {
310         Code int
311         Msg  string
312 }
313
314 func (me KRPCError) Error() string {
315         return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
316 }
317
318 var _ error = KRPCError{}
319
320 func (m Msg) Error() (ret *KRPCError) {
321         if m["y"] != "e" {
322                 return
323         }
324         ret = &KRPCError{}
325         switch e := m["e"].(type) {
326         case []interface{}:
327                 ret.Code = int(e[0].(int64))
328                 ret.Msg = e[1].(string)
329         case string:
330                 ret.Msg = e
331         default:
332                 logonce.Stderr.Printf(`KRPC error "e" value has unexpected type: %T`, e)
333         }
334         return
335 }
336
337 // Returns the token given in response to a get_peers request for future
338 // announce_peer requests to that node.
339 func (m Msg) AnnounceToken() (token string, ok bool) {
340         defer func() { recover() }()
341         token, ok = m["r"].(map[string]interface{})["token"].(string)
342         return
343 }
344
345 type Transaction struct {
346         mu             sync.Mutex
347         remoteAddr     dHTAddr
348         t              string
349         response       chan Msg
350         onResponse     func(Msg) // Called with the server locked.
351         done           chan struct{}
352         queryPacket    []byte
353         timer          *time.Timer
354         s              *Server
355         retries        int
356         lastSend       time.Time
357         userOnResponse func(Msg)
358 }
359
360 // Set a function to be called with the response.
361 func (t *Transaction) SetResponseHandler(f func(Msg)) {
362         t.mu.Lock()
363         defer t.mu.Unlock()
364         t.userOnResponse = f
365         t.tryHandleResponse()
366 }
367
368 func (t *Transaction) tryHandleResponse() {
369         if t.userOnResponse == nil {
370                 return
371         }
372         select {
373         case r := <-t.response:
374                 t.userOnResponse(r)
375                 // Shouldn't be called more than once.
376                 t.userOnResponse = nil
377         default:
378         }
379 }
380
381 func (t *Transaction) key() transactionKey {
382         return transactionKey{
383                 t.remoteAddr.String(),
384                 t.t,
385         }
386 }
387
388 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
389         return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
390 }
391
392 func (t *Transaction) startTimer() {
393         t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
394 }
395
396 func (t *Transaction) timerCallback() {
397         t.mu.Lock()
398         defer t.mu.Unlock()
399         select {
400         case <-t.done:
401                 return
402         default:
403         }
404         if t.retries == 2 {
405                 t.timeout()
406                 return
407         }
408         t.retries++
409         t.sendQuery()
410         if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
411                 panic("timer should have fired to get here")
412         }
413 }
414
415 func (t *Transaction) sendQuery() error {
416         err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
417         if err != nil {
418                 return err
419         }
420         t.lastSend = time.Now()
421         return nil
422 }
423
424 func (t *Transaction) timeout() {
425         go func() {
426                 t.s.mu.Lock()
427                 defer t.s.mu.Unlock()
428                 t.s.nodeTimedOut(t.remoteAddr)
429         }()
430         t.close()
431 }
432
433 func (t *Transaction) close() {
434         if t.closing() {
435                 return
436         }
437         t.queryPacket = nil
438         close(t.response)
439         t.tryHandleResponse()
440         close(t.done)
441         t.timer.Stop()
442         go func() {
443                 t.s.mu.Lock()
444                 defer t.s.mu.Unlock()
445                 t.s.deleteTransaction(t)
446         }()
447 }
448
449 func (t *Transaction) closing() bool {
450         select {
451         case <-t.done:
452                 return true
453         default:
454                 return false
455         }
456 }
457
458 // Abandon the transaction.
459 func (t *Transaction) Close() {
460         t.mu.Lock()
461         defer t.mu.Unlock()
462         t.close()
463 }
464
465 func (t *Transaction) handleResponse(m Msg) {
466         t.mu.Lock()
467         if t.closing() {
468                 t.mu.Unlock()
469                 return
470         }
471         close(t.done)
472         t.mu.Unlock()
473         if t.onResponse != nil {
474                 t.s.mu.Lock()
475                 t.onResponse(m)
476                 t.s.mu.Unlock()
477         }
478         t.queryPacket = nil
479         select {
480         case t.response <- m:
481         default:
482                 panic("blocked handling response")
483         }
484         close(t.response)
485         t.tryHandleResponse()
486 }
487
488 func (s *Server) setDefaults() (err error) {
489         if s.id == "" {
490                 var id [20]byte
491                 h := crypto.SHA1.New()
492                 ss, err := os.Hostname()
493                 if err != nil {
494                         log.Print(err)
495                 }
496                 ss += s.socket.LocalAddr().String()
497                 h.Write([]byte(ss))
498                 if b := h.Sum(id[:0:20]); len(b) != 20 {
499                         panic(len(b))
500                 }
501                 if len(id) != 20 {
502                         panic(len(id))
503                 }
504                 s.id = string(id[:])
505         }
506         s.nodes = make(map[string]*node, 10000)
507         return
508 }
509
510 // Packets to and from any address matching a range in the list are dropped.
511 func (s *Server) SetIPBlockList(list *iplist.IPList) {
512         s.mu.Lock()
513         defer s.mu.Unlock()
514         s.ipBlockList = list
515 }
516
517 func (s *Server) init() (err error) {
518         err = s.setDefaults()
519         if err != nil {
520                 return
521         }
522         s.closed = make(chan struct{})
523         s.transactions = make(map[transactionKey]*Transaction)
524         return
525 }
526
527 func (s *Server) processPacket(b []byte, addr dHTAddr) {
528         var d Msg
529         err := bencode.Unmarshal(b, &d)
530         if err != nil {
531                 func() {
532                         if se, ok := err.(*bencode.SyntaxError); ok {
533                                 // The message was truncated.
534                                 if int(se.Offset) == len(b) {
535                                         return
536                                 }
537                                 // Some messages seem to drop to nul chars abrubtly.
538                                 if int(se.Offset) < len(b) && b[se.Offset] == 0 {
539                                         return
540                                 }
541                                 // The message isn't bencode from the first.
542                                 if se.Offset == 0 {
543                                         return
544                                 }
545                         }
546                         log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b)
547                 }()
548                 return
549         }
550         s.mu.Lock()
551         defer s.mu.Unlock()
552         if d["y"] == "q" {
553                 s.handleQuery(addr, d)
554                 return
555         }
556         t := s.findResponseTransaction(d.T(), addr)
557         if t == nil {
558                 //log.Printf("unexpected message: %#v", d)
559                 return
560         }
561         node := s.getNode(addr)
562         node.lastGotResponse = time.Now()
563         // TODO: Update node ID as this is an authoritative packet.
564         go t.handleResponse(d)
565         s.deleteTransaction(t)
566 }
567
568 func (s *Server) serve() error {
569         var b [0x10000]byte
570         for {
571                 n, addr, err := s.socket.ReadFrom(b[:])
572                 if err != nil {
573                         return err
574                 }
575                 if n == len(b) {
576                         logonce.Stderr.Printf("received dht packet exceeds buffer size")
577                         continue
578                 }
579                 if s.ipBlocked(util.AddrIP(addr)) {
580                         continue
581                 }
582                 s.processPacket(b[:n], newDHTAddr(addr))
583         }
584 }
585
586 func (s *Server) ipBlocked(ip net.IP) bool {
587         if s.ipBlockList == nil {
588                 return false
589         }
590         return s.ipBlockList.Lookup(ip) != nil
591 }
592
593 // Adds directly to the node table.
594 func (s *Server) AddNode(ni NodeInfo) {
595         s.mu.Lock()
596         defer s.mu.Unlock()
597         if s.nodes == nil {
598                 s.nodes = make(map[string]*node)
599         }
600         n := s.getNode(ni.Addr)
601         if n.IDNotSet() {
602                 n.SetIDFromBytes(ni.ID[:])
603         }
604 }
605
606 func (s *Server) nodeByID(id string) *node {
607         for _, node := range s.nodes {
608                 if node.idString() == id {
609                         return node
610                 }
611         }
612         return nil
613 }
614
615 func (s *Server) handleQuery(source dHTAddr, m Msg) {
616         args := m["a"].(map[string]interface{})
617         node := s.getNode(source)
618         node.SetIDFromString(args["id"].(string))
619         node.lastGotQuery = time.Now()
620         // Don't respond.
621         if s.passive {
622                 return
623         }
624         switch m["q"] {
625         case "ping":
626                 s.reply(source, m["t"].(string), nil)
627         case "get_peers": // TODO: Extract common behaviour with find_node.
628                 targetID := args["info_hash"].(string)
629                 if len(targetID) != 20 {
630                         break
631                 }
632                 var rNodes []NodeInfo
633                 // TODO: Reply with "values" list if we have peers instead.
634                 for _, node := range s.closestGoodNodes(8, targetID) {
635                         rNodes = append(rNodes, node.NodeInfo())
636                 }
637                 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
638                 for i, ni := range rNodes {
639                         err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
640                         if err != nil {
641                                 panic(err)
642                         }
643                 }
644                 s.reply(source, m["t"].(string), map[string]interface{}{
645                         "nodes": string(nodesBytes),
646                         "token": "hi",
647                 })
648         case "find_node": // TODO: Extract common behaviour with get_peers.
649                 targetID := args["target"].(string)
650                 if len(targetID) != 20 {
651                         log.Printf("bad DHT query: %v", m)
652                         return
653                 }
654                 var rNodes []NodeInfo
655                 if node := s.nodeByID(targetID); node != nil {
656                         rNodes = append(rNodes, node.NodeInfo())
657                 } else {
658                         for _, node := range s.closestGoodNodes(8, targetID) {
659                                 rNodes = append(rNodes, node.NodeInfo())
660                         }
661                 }
662                 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
663                 for i, ni := range rNodes {
664                         // TODO: Put IPv6 nodes into the correct dict element.
665                         if ni.Addr.UDPAddr().IP.To4() == nil {
666                                 continue
667                         }
668                         err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
669                         if err != nil {
670                                 log.Printf("error compacting %#v: %s", ni, err)
671                                 continue
672                         }
673                 }
674                 s.reply(source, m["t"].(string), map[string]interface{}{
675                         "nodes": string(nodesBytes),
676                 })
677         case "announce_peer":
678                 // TODO(anacrolix): Implement this lolz.
679                 // log.Print(m)
680         case "vote":
681                 // TODO(anacrolix): Or reject, I don't think I want this.
682         default:
683                 log.Printf("%s: not handling received query: q=%s", s, m["q"])
684                 return
685         }
686 }
687
688 func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) {
689         if r == nil {
690                 r = make(map[string]interface{}, 1)
691         }
692         r["id"] = s.ID()
693         m := map[string]interface{}{
694                 "t": t,
695                 "y": "r",
696                 "r": r,
697         }
698         b, err := bencode.Marshal(m)
699         if err != nil {
700                 panic(err)
701         }
702         err = s.writeToNode(b, addr)
703         if err != nil {
704                 log.Printf("error replying to %s: %s", addr, err)
705         }
706 }
707
708 func (s *Server) getNode(addr dHTAddr) (n *node) {
709         addrStr := addr.String()
710         n = s.nodes[addrStr]
711         if n == nil {
712                 n = &node{
713                         addr: addr,
714                 }
715                 if len(s.nodes) < maxNodes {
716                         s.nodes[addrStr] = n
717                 }
718         }
719         return
720 }
721 func (s *Server) nodeTimedOut(addr dHTAddr) {
722         node, ok := s.nodes[addr.String()]
723         if !ok {
724                 return
725         }
726         if node.DefinitelyGood() {
727                 return
728         }
729         if len(s.nodes) < maxNodes {
730                 return
731         }
732         delete(s.nodes, addr.String())
733 }
734
735 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
736         if list := s.ipBlockList; list != nil {
737                 if r := list.Lookup(util.AddrIP(node.UDPAddr())); r != nil {
738                         err = fmt.Errorf("write to %s blocked: %s", node, r.Description)
739                         return
740                 }
741         }
742         n, err := s.socket.WriteTo(b, node.UDPAddr())
743         if err != nil {
744                 err = fmt.Errorf("error writing %d bytes to %s: %#v", len(b), node, err)
745                 return
746         }
747         if n != len(b) {
748                 err = io.ErrShortWrite
749                 return
750         }
751         return
752 }
753
754 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction {
755         return s.transactions[transactionKey{
756                 sourceNode.String(),
757                 transactionID}]
758 }
759
760 func (s *Server) nextTransactionID() string {
761         var b [binary.MaxVarintLen64]byte
762         n := binary.PutUvarint(b[:], s.transactionIDInt)
763         s.transactionIDInt++
764         return string(b[:n])
765 }
766
767 func (s *Server) deleteTransaction(t *Transaction) {
768         delete(s.transactions, t.key())
769 }
770
771 func (s *Server) addTransaction(t *Transaction) {
772         if _, ok := s.transactions[t.key()]; ok {
773                 panic("transaction not unique")
774         }
775         s.transactions[t.key()] = t
776 }
777
778 // Returns the 20-byte server ID. This is the ID used to communicate with the
779 // DHT network.
780 func (s *Server) ID() string {
781         if len(s.id) != 20 {
782                 panic("bad node id")
783         }
784         return s.id
785 }
786
787 func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
788         tid := s.nextTransactionID()
789         if a == nil {
790                 a = make(map[string]interface{}, 1)
791         }
792         a["id"] = s.ID()
793         d := map[string]interface{}{
794                 "t": tid,
795                 "y": "q",
796                 "q": q,
797                 "a": a,
798         }
799         b, err := bencode.Marshal(d)
800         if err != nil {
801                 return
802         }
803         t = &Transaction{
804                 remoteAddr:  node,
805                 t:           tid,
806                 response:    make(chan Msg, 1),
807                 done:        make(chan struct{}),
808                 queryPacket: b,
809                 s:           s,
810                 onResponse:  onResponse,
811         }
812         err = t.sendQuery()
813         if err != nil {
814                 return
815         }
816         s.getNode(node).lastSentQuery = time.Now()
817         t.startTimer()
818         s.addTransaction(t)
819         return
820 }
821
822 // The size in bytes of a NodeInfo in its compact binary representation.
823 const CompactNodeInfoLen = 26
824
825 type NodeInfo struct {
826         ID   [20]byte
827         Addr dHTAddr
828 }
829
830 // Writes the node info to its compact binary representation in b. See
831 // CompactNodeInfoLen.
832 func (ni *NodeInfo) PutCompact(b []byte) error {
833         if n := copy(b[:], ni.ID[:]); n != 20 {
834                 panic(n)
835         }
836         ip := util.AddrIP(ni.Addr).To4()
837         if len(ip) != 4 {
838                 return errors.New("expected ipv4 address")
839         }
840         if n := copy(b[20:], ip); n != 4 {
841                 panic(n)
842         }
843         binary.BigEndian.PutUint16(b[24:], uint16(util.AddrPort(ni.Addr)))
844         return nil
845 }
846
847 func (cni *NodeInfo) UnmarshalCompact(b []byte) error {
848         if len(b) != 26 {
849                 return errors.New("expected 26 bytes")
850         }
851         util.CopyExact(cni.ID[:], b[:20])
852         cni.Addr = newDHTAddr(&net.UDPAddr{
853                 IP:   net.IPv4(b[20], b[21], b[22], b[23]),
854                 Port: int(binary.BigEndian.Uint16(b[24:26])),
855         })
856         return nil
857 }
858
859 // Sends a ping query to the address given.
860 func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) {
861         s.mu.Lock()
862         defer s.mu.Unlock()
863         return s.query(newDHTAddr(node), "ping", nil, nil)
864 }
865
866 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) (err error) {
867         if port == 0 && !impliedPort {
868                 return errors.New("nothing to announce")
869         }
870         _, err = s.query(node, "announce_peer", map[string]interface{}{
871                 "implied_port": func() int {
872                         if impliedPort {
873                                 return 1
874                         } else {
875                                 return 0
876                         }
877                 }(),
878                 "info_hash": infoHash,
879                 "port":      port,
880                 "token":     token,
881         }, func(m Msg) {
882                 if err := m.Error(); err != nil {
883                         logonce.Stderr.Printf("announce_peer response: %s", err)
884                         return
885                 }
886                 s.numConfirmedAnnounces++
887         })
888         return
889 }
890
891 // Add response nodes to node table.
892 func (s *Server) liftNodes(d Msg) {
893         if d["y"] != "r" {
894                 return
895         }
896         for _, cni := range d.Nodes() {
897                 if util.AddrPort(cni.Addr) == 0 {
898                         // TODO: Why would people even do this?
899                         continue
900                 }
901                 if s.ipBlocked(util.AddrIP(cni.Addr)) {
902                         continue
903                 }
904                 n := s.getNode(cni.Addr)
905                 n.SetIDFromBytes(cni.ID[:])
906         }
907 }
908
909 // Sends a find_node query to addr. targetID is the node we're looking for.
910 func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) {
911         t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
912                 // Scrape peers from the response to put in the server's table before
913                 // handing the response back to the caller.
914                 s.liftNodes(d)
915         })
916         if err != nil {
917                 return
918         }
919         return
920 }
921
922 // In a get_peers response, the addresses of torrent clients involved with the
923 // queried info-hash.
924 func (m Msg) Values() (vs []util.CompactPeer) {
925         r, ok := m["r"]
926         if !ok {
927                 return
928         }
929         rd, ok := r.(map[string]interface{})
930         if !ok {
931                 return
932         }
933         v, ok := rd["values"]
934         if !ok {
935                 return
936         }
937         vl, ok := v.([]interface{})
938         if !ok {
939                 log.Printf("unexpected krpc values type: %T", v)
940                 return
941         }
942         vs = make([]util.CompactPeer, 0, len(vl))
943         for _, i := range vl {
944                 s, ok := i.(string)
945                 if !ok {
946                         panic(i)
947                 }
948                 var cp util.CompactPeer
949                 err := cp.UnmarshalBinary([]byte(s))
950                 if err != nil {
951                         log.Printf("error decoding values list element: %s", err)
952                         continue
953                 }
954                 vs = append(vs, cp)
955         }
956         return
957 }
958
959 func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) {
960         if len(infoHash) != 20 {
961                 err = fmt.Errorf("infohash has bad length")
962                 return
963         }
964         t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
965                 s.liftNodes(m)
966                 at, ok := m.AnnounceToken()
967                 if ok {
968                         s.getNode(addr).announceToken = at
969                 }
970         })
971         return
972 }
973
974 func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) {
975         bootstrapNodes := nodeAddrs
976         if len(bootstrapNodes) == 0 {
977                 bootstrapNodes = []string{
978                         "router.utorrent.com:6881",
979                         "router.bittorrent.com:6881",
980                 }
981         }
982         for _, addrStr := range bootstrapNodes {
983                 if addrStr != "" {
984                         udpAddr, err := net.ResolveUDPAddr("udp4", addrStr)
985                         if err != nil {
986                                 continue
987                         }
988                         addrs = append(addrs, udpAddr)
989                 }
990         }
991         if len(addrs) == 0 {
992                 err = errors.New("nothing resolved")
993         }
994         return
995 }
996
997 func (s *Server) addRootNodes() error {
998         addrs, err := bootstrapAddrs(s.bootstrapNodes)
999         if err != nil {
1000                 return err
1001         }
1002         for _, addr := range addrs {
1003                 s.nodes[addr.String()] = &node{
1004                         addr: newDHTAddr(addr),
1005                 }
1006         }
1007         return nil
1008 }
1009
1010 // Populates the node table.
1011 func (s *Server) bootstrap() (err error) {
1012         s.mu.Lock()
1013         defer s.mu.Unlock()
1014         if len(s.nodes) == 0 {
1015                 err = s.addRootNodes()
1016         }
1017         if err != nil {
1018                 return
1019         }
1020         for {
1021                 var outstanding sync.WaitGroup
1022                 for _, node := range s.nodes {
1023                         var t *Transaction
1024                         t, err = s.findNode(node.addr, s.id)
1025                         if err != nil {
1026                                 err = fmt.Errorf("error sending find_node: %s", err)
1027                                 return
1028                         }
1029                         outstanding.Add(1)
1030                         t.SetResponseHandler(func(Msg) {
1031                                 outstanding.Done()
1032                         })
1033                 }
1034                 noOutstanding := make(chan struct{})
1035                 go func() {
1036                         outstanding.Wait()
1037                         close(noOutstanding)
1038                 }()
1039                 s.mu.Unlock()
1040                 select {
1041                 case <-s.closed:
1042                         s.mu.Lock()
1043                         return
1044                 case <-time.After(15 * time.Second):
1045                 case <-noOutstanding:
1046                 }
1047                 s.mu.Lock()
1048                 // log.Printf("now have %d nodes", len(s.nodes))
1049                 if s.numGoodNodes() >= 160 {
1050                         break
1051                 }
1052         }
1053         return
1054 }
1055
1056 func (s *Server) numGoodNodes() (num int) {
1057         for _, n := range s.nodes {
1058                 if n.DefinitelyGood() {
1059                         num++
1060                 }
1061         }
1062         return
1063 }
1064
1065 // Returns how many nodes are in the node table.
1066 func (s *Server) NumNodes() int {
1067         s.mu.Lock()
1068         defer s.mu.Unlock()
1069         return len(s.nodes)
1070 }
1071
1072 // Exports the current node table.
1073 func (s *Server) Nodes() (nis []NodeInfo) {
1074         s.mu.Lock()
1075         defer s.mu.Unlock()
1076         for _, node := range s.nodes {
1077                 // if !node.Good() {
1078                 //      continue
1079                 // }
1080                 ni := NodeInfo{
1081                         Addr: node.addr,
1082                 }
1083                 if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
1084                         panic(n)
1085                 }
1086                 nis = append(nis, ni)
1087         }
1088         return
1089 }
1090
1091 // Stops the server network activity. This is all that's required to clean-up a Server.
1092 func (s *Server) Close() {
1093         s.mu.Lock()
1094         select {
1095         case <-s.closed:
1096         default:
1097                 close(s.closed)
1098                 s.socket.Close()
1099         }
1100         s.mu.Unlock()
1101 }
1102
1103 var maxDistance big.Int
1104
1105 func init() {
1106         var zero big.Int
1107         maxDistance.SetBit(&zero, 160, 1)
1108 }
1109
1110 func (s *Server) closestGoodNodes(k int, targetID string) []*node {
1111         return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() })
1112 }
1113
1114 func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node {
1115         sel := newKClosestNodesSelector(k, target)
1116         idNodes := make(map[string]*node, len(s.nodes))
1117         for _, node := range s.nodes {
1118                 if !filter(node) {
1119                         continue
1120                 }
1121                 sel.Push(node.id)
1122                 idNodes[node.idString()] = node
1123         }
1124         ids := sel.IDs()
1125         ret := make([]*node, 0, len(ids))
1126         for _, id := range ids {
1127                 ret = append(ret, idNodes[id.String()])
1128         }
1129         return ret
1130 }