1 // Package DHT implements a DHT for use with the BitTorrent protocol,
2 // described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html.
4 // Standard use involves creating a NewServer, and calling Announce on it with
5 // the details of your local torrent client and infohash of interest.
24 "github.com/anacrolix/missinggo"
25 "github.com/anacrolix/sync"
26 "github.com/tylertreat/BoomFilters"
28 "github.com/anacrolix/torrent/bencode"
29 "github.com/anacrolix/torrent/iplist"
30 "github.com/anacrolix/torrent/logonce"
38 queryResendEvery = 5 * time.Second
41 // Uniquely identifies a transaction to us.
42 type transactionKey struct {
43 RemoteAddr string // host:port
44 T string // The KRPC transaction ID.
50 transactions map[transactionKey]*Transaction
51 transactionIDInt uint64
52 nodes map[string]*node // Keyed by dHTAddr.String().
55 ipBlockList iplist.Ranger
56 badNodes *boom.BloomFilter
58 numConfirmedAnnounces int
59 bootstrapNodes []string
63 type ServerConfig struct {
64 Addr string // Listen address. Used if Conn is nil.
66 // Don't respond to queries from other nodes.
68 // DHT Bootstrap nodes
69 BootstrapNodes []string
70 // Disable the DHT security extension:
71 // http://www.libtorrent.org/dht_sec.html.
73 // Initial IP blocklist to use. Applied before serving and bootstrapping
75 IPBlocklist iplist.Ranger
76 // Used to secure the server's ID. Defaults to the Conn's LocalAddr().
80 type ServerStats struct {
81 // Count of nodes in the node table that responded to our last query or
82 // haven't yet been queried.
84 // Count of nodes in the node table.
86 // Transactions awaiting a response.
87 OutstandingTransactions int
88 // Individual announce_peer requests that got a success response.
89 ConfirmedAnnounces int
90 // Nodes that have been blocked.
94 // Returns statistics for the server.
95 func (s *Server) Stats() (ss ServerStats) {
98 for _, n := range s.nodes {
99 if n.DefinitelyGood() {
103 ss.Nodes = len(s.nodes)
104 ss.OutstandingTransactions = len(s.transactions)
105 ss.ConfirmedAnnounces = s.numConfirmedAnnounces
106 ss.BadNodes = s.badNodes.Count()
110 // Returns the listen address for the server. Packets arriving to this address
111 // are processed by the server (unless aliens are involved).
112 func (s *Server) Addr() net.Addr {
113 return s.socket.LocalAddr()
116 func makeSocket(addr string) (socket *net.UDPConn, err error) {
117 addr_, err := net.ResolveUDPAddr("", addr)
121 socket, err = net.ListenUDP("udp", addr_)
125 // Create a new DHT server.
126 func NewServer(c *ServerConfig) (s *Server, err error) {
132 ipBlockList: c.IPBlocklist,
133 badNodes: boom.NewBloomFilter(1000, 0.1),
138 s.socket, err = makeSocket(c.Addr)
143 s.bootstrapNodes = c.BootstrapNodes
165 log.Printf("error bootstrapping DHT: %s", err)
172 // Returns a description of the Server. Python repr-style.
173 func (s *Server) String() string {
174 return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
182 func (nid *nodeID) IsUnset() bool {
186 func nodeIDFromString(s string) (ret nodeID) {
190 ret.i.SetBytes([]byte(s))
195 func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) {
196 if nid0.IsUnset() != nid1.IsUnset() {
200 ret.Xor(&nid0.i, &nid1.i)
204 func (nid *nodeID) ByteString() string {
207 copy(buf[20-len(b):], b)
208 return string(buf[:])
216 lastGotQuery time.Time
217 lastGotResponse time.Time
218 lastSentQuery time.Time
221 func (n *node) IsSecure() bool {
225 return NodeIdSecure(n.id.ByteString(), n.addr.IP())
228 func (n *node) idString() string {
229 return n.id.ByteString()
232 func (n *node) SetIDFromBytes(b []byte) {
240 func (n *node) SetIDFromString(s string) {
241 n.SetIDFromBytes([]byte(s))
244 func (n *node) IDNotSet() bool {
245 return n.id.i.Int64() == 0
248 func (n *node) NodeInfo() (ret NodeInfo) {
250 if n := copy(ret.ID[:], n.idString()); n != 20 {
256 func (n *node) DefinitelyGood() bool {
257 if len(n.idString()) != 20 {
260 // No reason to think ill of them if they've never been queried.
261 if n.lastSentQuery.IsZero() {
264 // They answered our last query.
265 if n.lastSentQuery.Before(n.lastGotResponse) {
271 type Transaction struct {
276 onResponse func(Msg) // Called with the server locked.
283 userOnResponse func(Msg)
286 // Set a function to be called with the response.
287 func (t *Transaction) SetResponseHandler(f func(Msg)) {
291 t.tryHandleResponse()
294 func (t *Transaction) tryHandleResponse() {
295 if t.userOnResponse == nil {
299 case r := <-t.response:
301 // Shouldn't be called more than once.
302 t.userOnResponse = nil
307 func (t *Transaction) key() transactionKey {
308 return transactionKey{
309 t.remoteAddr.String(),
314 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
315 return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
318 func (t *Transaction) startTimer() {
319 t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
322 func (t *Transaction) timerCallback() {
336 if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
337 panic("timer should have fired to get here")
341 func (t *Transaction) sendQuery() error {
342 err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
346 t.lastSend = time.Now()
350 func (t *Transaction) timeout() {
353 defer t.s.mu.Unlock()
354 t.s.nodeTimedOut(t.remoteAddr)
359 func (t *Transaction) close() {
365 t.tryHandleResponse()
370 defer t.s.mu.Unlock()
371 t.s.deleteTransaction(t)
375 func (t *Transaction) closing() bool {
384 // Abandon the transaction.
385 func (t *Transaction) Close() {
391 func (t *Transaction) handleResponse(m Msg) {
399 if t.onResponse != nil {
406 case t.response <- m:
408 panic("blocked handling response")
411 t.tryHandleResponse()
414 func maskForIP(ip net.IP) []byte {
416 case ip.To4() != nil:
417 return []byte{0x03, 0x0f, 0x3f, 0xff}
419 return []byte{0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff}
423 // Generate the CRC used to make or validate secure node ID.
424 func crcIP(ip net.IP, rand uint8) uint32 {
425 if ip4 := ip.To4(); ip4 != nil {
428 // Copy IP so we can make changes. Go sux at this.
429 ip = append(make(net.IP, 0, len(ip)), ip...)
430 mask := maskForIP(ip)
431 for i := range mask {
436 return crc32.Checksum(ip[:len(mask)], crc32.MakeTable(crc32.Castagnoli))
439 // Makes a node ID secure, in-place. The ID is 20 raw bytes.
440 // http://www.libtorrent.org/dht_sec.html
441 func SecureNodeId(id []byte, ip net.IP) {
442 crc := crcIP(ip, id[19])
443 id[0] = byte(crc >> 24 & 0xff)
444 id[1] = byte(crc >> 16 & 0xff)
445 id[2] = byte(crc>>8&0xf8) | id[2]&7
448 // Returns whether the node ID is considered secure. The id is the 20 raw
449 // bytes. http://www.libtorrent.org/dht_sec.html
450 func NodeIdSecure(id string, ip net.IP) bool {
452 panic(fmt.Sprintf("%q", id))
454 if ip4 := ip.To4(); ip4 != nil {
457 crc := crcIP(ip, id[19])
458 if id[0] != byte(crc>>24&0xff) {
461 if id[1] != byte(crc>>16&0xff) {
464 if id[2]&0xf8 != byte(crc>>8&0xf8) {
470 func (s *Server) setDefaults() (err error) {
473 h := crypto.SHA1.New()
474 ss, err := os.Hostname()
478 ss += s.socket.LocalAddr().String()
480 if b := h.Sum(id[:0:20]); len(b) != 20 {
486 publicIP := func() net.IP {
487 if s.config.PublicIP != nil {
488 return s.config.PublicIP
490 return missinggo.AddrIP(s.socket.LocalAddr())
493 SecureNodeId(id[:], publicIP)
496 s.nodes = make(map[string]*node, maxNodes)
500 // Packets to and from any address matching a range in the list are dropped.
501 func (s *Server) SetIPBlockList(list iplist.Ranger) {
507 func (s *Server) IPBlocklist() iplist.Ranger {
511 func (s *Server) init() (err error) {
512 err = s.setDefaults()
516 s.closed = make(chan struct{})
517 s.transactions = make(map[transactionKey]*Transaction)
521 func (s *Server) processPacket(b []byte, addr dHTAddr) {
522 if len(b) < 2 || b[0] != 'd' || b[len(b)-1] != 'e' {
523 // KRPC messages are bencoded dicts.
524 readNotKRPCDict.Add(1)
528 err := bencode.Unmarshal(b, &d)
530 readUnmarshalError.Add(1)
532 if se, ok := err.(*bencode.SyntaxError); ok {
533 // The message was truncated.
534 if int(se.Offset) == len(b) {
537 // Some messages seem to drop to nul chars abrubtly.
538 if int(se.Offset) < len(b) && b[se.Offset] == 0 {
541 // The message isn't bencode from the first.
546 if missinggo.CryHeard() {
547 log.Printf("%s: received bad krpc message from %s: %s: %+q", s, addr, err, b)
556 s.handleQuery(addr, d)
559 t := s.findResponseTransaction(d.T, addr)
561 //log.Printf("unexpected message: %#v", d)
564 node := s.getNode(addr, d.SenderID())
565 node.lastGotResponse = time.Now()
566 // TODO: Update node ID as this is an authoritative packet.
567 go t.handleResponse(d)
568 s.deleteTransaction(t)
571 func (s *Server) serve() error {
574 n, addr, err := s.socket.ReadFrom(b[:])
580 logonce.Stderr.Printf("received dht packet exceeds buffer size")
584 blocked := s.ipBlocked(missinggo.AddrIP(addr))
590 s.processPacket(b[:n], newDHTAddr(addr))
594 func (s *Server) ipBlocked(ip net.IP) (blocked bool) {
595 if s.ipBlockList == nil {
598 _, blocked = s.ipBlockList.Lookup(ip)
602 // Adds directly to the node table.
603 func (s *Server) AddNode(ni NodeInfo) {
607 s.nodes = make(map[string]*node)
609 s.getNode(ni.Addr, string(ni.ID[:]))
612 func (s *Server) nodeByID(id string) *node {
613 for _, node := range s.nodes {
614 if node.idString() == id {
621 func (s *Server) handleQuery(source dHTAddr, m Msg) {
622 node := s.getNode(source, m.SenderID())
623 node.lastGotQuery = time.Now()
625 if s.config.Passive {
631 s.reply(source, m.T, Return{})
632 case "get_peers": // TODO: Extract common behaviour with find_node.
633 targetID := args.InfoHash
634 if len(targetID) != 20 {
637 var rNodes []NodeInfo
638 // TODO: Reply with "values" list if we have peers instead.
639 for _, node := range s.closestGoodNodes(8, targetID) {
640 rNodes = append(rNodes, node.NodeInfo())
642 s.reply(source, m.T, Return{
644 // TODO: Generate this dynamically, and store it for the source.
647 case "find_node": // TODO: Extract common behaviour with get_peers.
648 targetID := args.Target
649 if len(targetID) != 20 {
650 log.Printf("bad DHT query: %v", m)
653 var rNodes []NodeInfo
654 if node := s.nodeByID(targetID); node != nil {
655 rNodes = append(rNodes, node.NodeInfo())
657 // This will probably cause a crash for IPv6, but meh.
658 for _, node := range s.closestGoodNodes(8, targetID) {
659 rNodes = append(rNodes, node.NodeInfo())
662 s.reply(source, m.T, Return{
665 case "announce_peer":
666 // TODO(anacrolix): Implement this lolz.
669 // TODO(anacrolix): Or reject, I don't think I want this.
671 log.Printf("%s: not handling received query: q=%s", s, m.Q)
676 func (s *Server) reply(addr dHTAddr, t string, r Return) {
683 b, err := bencode.Marshal(m)
687 err = s.writeToNode(b, addr)
689 log.Printf("error replying to %s: %s", addr, err)
693 // Returns a node struct for the addr. It is taken from the table or created
694 // and possibly added if required and meets validity constraints.
695 func (s *Server) getNode(addr dHTAddr, id string) (n *node) {
696 addrStr := addr.String()
700 n.SetIDFromString(id)
708 n.SetIDFromString(id)
710 if len(s.nodes) >= maxNodes {
713 if !s.config.NoSecurity && !n.IsSecure() {
716 if s.badNodes.Test([]byte(addrStr)) {
723 func (s *Server) nodeTimedOut(addr dHTAddr) {
724 node, ok := s.nodes[addr.String()]
728 if node.DefinitelyGood() {
731 if len(s.nodes) < maxNodes {
734 delete(s.nodes, addr.String())
737 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
738 if list := s.ipBlockList; list != nil {
739 if r, ok := list.Lookup(missinggo.AddrIP(node.UDPAddr())); ok {
740 err = fmt.Errorf("write to %s blocked: %s", node, r.Description)
744 n, err := s.socket.WriteTo(b, node.UDPAddr())
746 err = fmt.Errorf("error writing %d bytes to %s: %#v", len(b), node, err)
750 err = io.ErrShortWrite
756 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction {
757 return s.transactions[transactionKey{
762 func (s *Server) nextTransactionID() string {
763 var b [binary.MaxVarintLen64]byte
764 n := binary.PutUvarint(b[:], s.transactionIDInt)
769 func (s *Server) deleteTransaction(t *Transaction) {
770 delete(s.transactions, t.key())
773 func (s *Server) addTransaction(t *Transaction) {
774 if _, ok := s.transactions[t.key()]; ok {
775 panic("transaction not unique")
777 s.transactions[t.key()] = t
780 // Returns the 20-byte server ID. This is the ID used to communicate with the
782 func (s *Server) ID() string {
789 func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
790 tid := s.nextTransactionID()
792 a = make(map[string]interface{}, 1)
795 d := map[string]interface{}{
801 // BEP 43. Outgoing queries from uncontactiable nodes should contain
802 // "ro":1 in the top level dictionary.
803 if s.config.Passive {
806 b, err := bencode.Marshal(d)
813 response: make(chan Msg, 1),
814 done: make(chan struct{}),
817 onResponse: onResponse,
823 s.getNode(node, "").lastSentQuery = time.Now()
829 // The size in bytes of a NodeInfo in its compact binary representation.
830 const CompactIPv4NodeInfoLen = 26
832 type NodeInfo struct {
837 // Writes the node info to its compact binary representation in b. See
838 // CompactNodeInfoLen.
839 func (ni *NodeInfo) PutCompact(b []byte) error {
840 if n := copy(b[:], ni.ID[:]); n != 20 {
843 ip := missinggo.AddrIP(ni.Addr).To4()
845 return errors.New("expected ipv4 address")
847 if n := copy(b[20:], ip); n != 4 {
850 binary.BigEndian.PutUint16(b[24:], uint16(missinggo.AddrPort(ni.Addr)))
854 func (cni *NodeInfo) UnmarshalCompactIPv4(b []byte) error {
856 return errors.New("expected 26 bytes")
858 missinggo.CopyExact(cni.ID[:], b[:20])
859 cni.Addr = newDHTAddr(&net.UDPAddr{
860 IP: net.IPv4(b[20], b[21], b[22], b[23]),
861 Port: int(binary.BigEndian.Uint16(b[24:26])),
866 // Sends a ping query to the address given.
867 func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) {
870 return s.query(newDHTAddr(node), "ping", nil, nil)
873 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) (err error) {
874 if port == 0 && !impliedPort {
875 return errors.New("nothing to announce")
877 _, err = s.query(node, "announce_peer", map[string]interface{}{
878 "implied_port": func() int {
885 "info_hash": infoHash,
889 if err := m.Error(); err != nil {
890 announceErrors.Add(1)
892 // logonce.Stderr.Printf("announce_peer response: %s", err)
895 s.numConfirmedAnnounces++
900 // Add response nodes to node table.
901 func (s *Server) liftNodes(d Msg) {
905 for _, cni := range d.R.Nodes {
906 if missinggo.AddrPort(cni.Addr) == 0 {
907 // TODO: Why would people even do this?
910 if s.ipBlocked(missinggo.AddrIP(cni.Addr)) {
913 n := s.getNode(cni.Addr, string(cni.ID[:]))
914 n.SetIDFromBytes(cni.ID[:])
918 // Sends a find_node query to addr. targetID is the node we're looking for.
919 func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) {
920 t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
921 // Scrape peers from the response to put in the server's table before
922 // handing the response back to the caller.
936 func (me *Peer) String() string {
937 return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
940 func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) {
941 if len(infoHash) != 20 {
942 err = fmt.Errorf("infohash has bad length")
945 t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
947 s.getNode(addr, m.SenderID()).announceToken = m.R.Token
952 func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) {
953 bootstrapNodes := nodeAddrs
954 if len(bootstrapNodes) == 0 {
955 bootstrapNodes = []string{
956 "router.utorrent.com:6881",
957 "router.bittorrent.com:6881",
960 for _, addrStr := range bootstrapNodes {
961 udpAddr, err := net.ResolveUDPAddr("udp4", addrStr)
965 addrs = append(addrs, udpAddr)
968 err = errors.New("nothing resolved")
973 // Adds bootstrap nodes directly to table, if there's room. Node ID security
974 // is bypassed, but the IP blocklist is not.
975 func (s *Server) addRootNodes() error {
976 addrs, err := bootstrapAddrs(s.bootstrapNodes)
980 for _, addr := range addrs {
981 if len(s.nodes) >= maxNodes {
984 if s.nodes[addr.String()] != nil {
987 if s.ipBlocked(addr.IP) {
988 log.Printf("dht root node is in the blocklist: %s", addr.IP)
991 s.nodes[addr.String()] = &node{
992 addr: newDHTAddr(addr),
998 // Populates the node table.
999 func (s *Server) bootstrap() (err error) {
1002 if len(s.nodes) == 0 {
1003 err = s.addRootNodes()
1009 var outstanding sync.WaitGroup
1010 for _, node := range s.nodes {
1012 t, err = s.findNode(node.addr, s.id)
1014 err = fmt.Errorf("error sending find_node: %s", err)
1018 t.SetResponseHandler(func(Msg) {
1022 noOutstanding := make(chan struct{})
1025 close(noOutstanding)
1032 case <-time.After(15 * time.Second):
1033 case <-noOutstanding:
1036 // log.Printf("now have %d nodes", len(s.nodes))
1037 if s.numGoodNodes() >= 160 {
1044 func (s *Server) numGoodNodes() (num int) {
1045 for _, n := range s.nodes {
1046 if n.DefinitelyGood() {
1053 // Returns how many nodes are in the node table.
1054 func (s *Server) NumNodes() int {
1060 // Exports the current node table.
1061 func (s *Server) Nodes() (nis []NodeInfo) {
1064 for _, node := range s.nodes {
1065 // if !node.Good() {
1071 if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
1074 nis = append(nis, ni)
1079 // Stops the server network activity. This is all that's required to clean-up a Server.
1080 func (s *Server) Close() {
1091 var maxDistance big.Int
1095 maxDistance.SetBit(&zero, 160, 1)
1098 func (s *Server) closestGoodNodes(k int, targetID string) []*node {
1099 return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() })
1102 func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node {
1103 sel := newKClosestNodesSelector(k, target)
1104 idNodes := make(map[string]*node, len(s.nodes))
1105 for _, node := range s.nodes {
1110 idNodes[node.idString()] = node
1113 ret := make([]*node, 0, len(ids))
1114 for _, id := range ids {
1115 ret = append(ret, idNodes[id.ByteString()])
1120 func (me *Server) badNode(addr dHTAddr) {
1121 me.badNodes.Add([]byte(addr.String()))
1122 delete(me.nodes, addr.String())