1 // Package DHT implements a DHT for use with the BitTorrent protocol,
2 // described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html.
4 // Standard use involves creating a NewServer, and calling Announce on it with
5 // the details of your local torrent client and infohash of interest.
23 "github.com/anacrolix/missinggo"
24 "github.com/anacrolix/sync"
25 "github.com/tylertreat/BoomFilters"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/iplist"
29 "github.com/anacrolix/torrent/logonce"
30 "github.com/anacrolix/torrent/util"
35 queryResendEvery = 5 * time.Second
38 // Uniquely identifies a transaction to us.
39 type transactionKey struct {
40 RemoteAddr string // host:port
41 T string // The KRPC transaction ID.
47 transactions map[transactionKey]*Transaction
48 transactionIDInt uint64
49 nodes map[string]*node // Keyed by dHTAddr.String().
52 passive bool // Don't respond to queries.
53 ipBlockList *iplist.IPList
54 badNodes *boom.BloomFilter
56 numConfirmedAnnounces int
57 bootstrapNodes []string
61 type ServerConfig struct {
62 Addr string // Listen address. Used if Conn is nil.
64 // Don't respond to queries from other nodes.
66 // DHT Bootstrap nodes
67 BootstrapNodes []string
68 // Disable the DHT security extension:
69 // http://www.libtorrent.org/dht_sec.html.
71 // Initial IP blocklist to use. Applied before serving and bootstrapping
73 IPBlocklist *iplist.IPList
74 // Used to secure the server's ID. Defaults to the Conn's LocalAddr().
78 type ServerStats struct {
79 // Count of nodes in the node table that responded to our last query or
80 // haven't yet been queried.
82 // Count of nodes in the node table.
84 // Transactions awaiting a response.
85 OutstandingTransactions int
86 // Individual announce_peer requests that got a success response.
87 ConfirmedAnnounces int
88 // Nodes that have been blocked.
92 // Returns statistics for the server.
93 func (s *Server) Stats() (ss ServerStats) {
96 for _, n := range s.nodes {
97 if n.DefinitelyGood() {
101 ss.Nodes = len(s.nodes)
102 ss.OutstandingTransactions = len(s.transactions)
103 ss.ConfirmedAnnounces = s.numConfirmedAnnounces
104 ss.BadNodes = s.badNodes.Count()
108 // Returns the listen address for the server. Packets arriving to this address
109 // are processed by the server (unless aliens are involved).
110 func (s *Server) Addr() net.Addr {
111 return s.socket.LocalAddr()
114 func makeSocket(addr string) (socket *net.UDPConn, err error) {
115 addr_, err := net.ResolveUDPAddr("", addr)
119 socket, err = net.ListenUDP("udp", addr_)
123 // Create a new DHT server.
124 func NewServer(c *ServerConfig) (s *Server, err error) {
130 ipBlockList: c.IPBlocklist,
131 badNodes: boom.NewBloomFilter(1000, 0.1),
136 s.socket, err = makeSocket(c.Addr)
141 s.passive = c.Passive
142 s.bootstrapNodes = c.BootstrapNodes
164 log.Printf("error bootstrapping DHT: %s", err)
171 // Returns a description of the Server. Python repr-style.
172 func (s *Server) String() string {
173 return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
181 func (nid *nodeID) IsUnset() bool {
185 func nodeIDFromString(s string) (ret nodeID) {
189 ret.i.SetBytes([]byte(s))
194 func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) {
195 if nid0.IsUnset() != nid1.IsUnset() {
199 ret.Xor(&nid0.i, &nid1.i)
203 func (nid *nodeID) ByteString() string {
206 copy(buf[20-len(b):], b)
207 return string(buf[:])
215 lastGotQuery time.Time
216 lastGotResponse time.Time
217 lastSentQuery time.Time
220 func (n *node) IsSecure() bool {
224 return NodeIdSecure(n.id.ByteString(), n.addr.IP())
227 func (n *node) idString() string {
228 return n.id.ByteString()
231 func (n *node) SetIDFromBytes(b []byte) {
239 func (n *node) SetIDFromString(s string) {
240 n.SetIDFromBytes([]byte(s))
243 func (n *node) IDNotSet() bool {
244 return n.id.i.Int64() == 0
247 func (n *node) NodeInfo() (ret NodeInfo) {
249 if n := copy(ret.ID[:], n.idString()); n != 20 {
255 func (n *node) DefinitelyGood() bool {
256 if len(n.idString()) != 20 {
259 // No reason to think ill of them if they've never been queried.
260 if n.lastSentQuery.IsZero() {
263 // They answered our last query.
264 if n.lastSentQuery.Before(n.lastGotResponse) {
270 // A wrapper around the unmarshalled KRPC dict that constitutes messages in
271 // the DHT. There are various helpers for extracting common data from the
272 // message. In normal use, Msg is abstracted away for you, but it can be of
274 type Msg map[string]interface{}
276 var _ fmt.Stringer = Msg{}
278 func (m Msg) String() string {
279 return fmt.Sprintf("%#v", m)
282 func (m Msg) T() (t string) {
291 func (m Msg) ID() string {
295 return m[m["y"].(string)].(map[string]interface{})["id"].(string)
298 // Suggested nodes in a response.
299 func (m Msg) Nodes() (nodes []NodeInfo) {
304 return m["r"].(map[string]interface{})["nodes"].(string)
309 for i := 0; i < len(b); i += 26 {
311 err := n.UnmarshalCompact([]byte(b[i : i+26]))
315 nodes = append(nodes, n)
320 type KRPCError struct {
325 func (me KRPCError) Error() string {
326 return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
329 var _ error = KRPCError{}
331 func (m Msg) Error() (ret *KRPCError) {
336 switch e := m["e"].(type) {
338 ret.Code = int(e[0].(int64))
339 ret.Msg = e[1].(string)
343 logonce.Stderr.Printf(`KRPC error "e" value has unexpected type: %T`, e)
348 // Returns the token given in response to a get_peers request for future
349 // announce_peer requests to that node.
350 func (m Msg) AnnounceToken() (token string, ok bool) {
351 defer func() { recover() }()
352 token, ok = m["r"].(map[string]interface{})["token"].(string)
356 type Transaction struct {
361 onResponse func(Msg) // Called with the server locked.
368 userOnResponse func(Msg)
371 // Set a function to be called with the response.
372 func (t *Transaction) SetResponseHandler(f func(Msg)) {
376 t.tryHandleResponse()
379 func (t *Transaction) tryHandleResponse() {
380 if t.userOnResponse == nil {
384 case r := <-t.response:
386 // Shouldn't be called more than once.
387 t.userOnResponse = nil
392 func (t *Transaction) key() transactionKey {
393 return transactionKey{
394 t.remoteAddr.String(),
399 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
400 return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
403 func (t *Transaction) startTimer() {
404 t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
407 func (t *Transaction) timerCallback() {
421 if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
422 panic("timer should have fired to get here")
426 func (t *Transaction) sendQuery() error {
427 err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
431 t.lastSend = time.Now()
435 func (t *Transaction) timeout() {
438 defer t.s.mu.Unlock()
439 t.s.nodeTimedOut(t.remoteAddr)
444 func (t *Transaction) close() {
450 t.tryHandleResponse()
455 defer t.s.mu.Unlock()
456 t.s.deleteTransaction(t)
460 func (t *Transaction) closing() bool {
469 // Abandon the transaction.
470 func (t *Transaction) Close() {
476 func (t *Transaction) handleResponse(m Msg) {
484 if t.onResponse != nil {
491 case t.response <- m:
493 panic("blocked handling response")
496 t.tryHandleResponse()
499 func maskForIP(ip net.IP) []byte {
501 case ip.To4() != nil:
502 return []byte{0x03, 0x0f, 0x3f, 0xff}
504 return []byte{0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff}
508 // Generate the CRC used to make or validate secure node ID.
509 func crcIP(ip net.IP, rand uint8) uint32 {
510 if ip4 := ip.To4(); ip4 != nil {
513 // Copy IP so we can make changes. Go sux at this.
514 ip = append(make(net.IP, 0, len(ip)), ip...)
515 mask := maskForIP(ip)
516 for i := range mask {
521 return crc32.Checksum(ip[:len(mask)], crc32.MakeTable(crc32.Castagnoli))
524 // Makes a node ID secure, in-place. The ID is 20 raw bytes.
525 // http://www.libtorrent.org/dht_sec.html
526 func SecureNodeId(id []byte, ip net.IP) {
527 crc := crcIP(ip, id[19])
528 id[0] = byte(crc >> 24 & 0xff)
529 id[1] = byte(crc >> 16 & 0xff)
530 id[2] = byte(crc>>8&0xf8) | id[2]&7
533 // Returns whether the node ID is considered secure. The id is the 20 raw
534 // bytes. http://www.libtorrent.org/dht_sec.html
535 func NodeIdSecure(id string, ip net.IP) bool {
537 panic(fmt.Sprintf("%q", id))
539 if ip4 := ip.To4(); ip4 != nil {
542 crc := crcIP(ip, id[19])
543 if id[0] != byte(crc>>24&0xff) {
546 if id[1] != byte(crc>>16&0xff) {
549 if id[2]&0xf8 != byte(crc>>8&0xf8) {
555 func (s *Server) setDefaults() (err error) {
558 h := crypto.SHA1.New()
559 ss, err := os.Hostname()
563 ss += s.socket.LocalAddr().String()
565 if b := h.Sum(id[:0:20]); len(b) != 20 {
571 publicIP := func() net.IP {
572 if s.config.PublicIP != nil {
573 return s.config.PublicIP
575 return missinggo.AddrIP(s.socket.LocalAddr())
578 SecureNodeId(id[:], publicIP)
581 s.nodes = make(map[string]*node, maxNodes)
585 // Packets to and from any address matching a range in the list are dropped.
586 func (s *Server) SetIPBlockList(list *iplist.IPList) {
592 func (s *Server) IPBlocklist() *iplist.IPList {
596 func (s *Server) init() (err error) {
597 err = s.setDefaults()
601 s.closed = make(chan struct{})
602 s.transactions = make(map[transactionKey]*Transaction)
606 func (s *Server) processPacket(b []byte, addr dHTAddr) {
607 if len(b) < 2 || b[0] != 'd' || b[len(b)-1] != 'e' {
608 // KRPC messages are bencoded dicts.
609 readNotKRPCDict.Add(1)
613 err := bencode.Unmarshal(b, &d)
615 readUnmarshalError.Add(1)
617 if se, ok := err.(*bencode.SyntaxError); ok {
618 // The message was truncated.
619 if int(se.Offset) == len(b) {
622 // Some messages seem to drop to nul chars abrubtly.
623 if int(se.Offset) < len(b) && b[se.Offset] == 0 {
626 // The message isn't bencode from the first.
631 if missinggo.CryHeard() {
632 log.Printf("%s: received bad krpc message from %s: %s: %+q", s, addr, err, b)
641 s.handleQuery(addr, d)
644 t := s.findResponseTransaction(d.T(), addr)
646 //log.Printf("unexpected message: %#v", d)
649 node := s.getNode(addr, d.ID())
650 node.lastGotResponse = time.Now()
651 // TODO: Update node ID as this is an authoritative packet.
652 go t.handleResponse(d)
653 s.deleteTransaction(t)
656 func (s *Server) serve() error {
659 n, addr, err := s.socket.ReadFrom(b[:])
665 logonce.Stderr.Printf("received dht packet exceeds buffer size")
669 blocked := s.ipBlocked(missinggo.AddrIP(addr))
675 s.processPacket(b[:n], newDHTAddr(addr))
679 func (s *Server) ipBlocked(ip net.IP) bool {
680 if s.ipBlockList == nil {
683 return s.ipBlockList.Lookup(ip) != nil
686 // Adds directly to the node table.
687 func (s *Server) AddNode(ni NodeInfo) {
691 s.nodes = make(map[string]*node)
693 s.getNode(ni.Addr, string(ni.ID[:]))
696 func (s *Server) nodeByID(id string) *node {
697 for _, node := range s.nodes {
698 if node.idString() == id {
705 func (s *Server) handleQuery(source dHTAddr, m Msg) {
706 args := m["a"].(map[string]interface{})
707 node := s.getNode(source, m.ID())
708 node.SetIDFromString(args["id"].(string))
709 node.lastGotQuery = time.Now()
716 s.reply(source, m["t"].(string), nil)
717 case "get_peers": // TODO: Extract common behaviour with find_node.
718 targetID := args["info_hash"].(string)
719 if len(targetID) != 20 {
722 var rNodes []NodeInfo
723 // TODO: Reply with "values" list if we have peers instead.
724 for _, node := range s.closestGoodNodes(8, targetID) {
725 rNodes = append(rNodes, node.NodeInfo())
727 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
728 for i, ni := range rNodes {
729 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
734 s.reply(source, m["t"].(string), map[string]interface{}{
735 "nodes": string(nodesBytes),
738 case "find_node": // TODO: Extract common behaviour with get_peers.
739 targetID := args["target"].(string)
740 if len(targetID) != 20 {
741 log.Printf("bad DHT query: %v", m)
744 var rNodes []NodeInfo
745 if node := s.nodeByID(targetID); node != nil {
746 rNodes = append(rNodes, node.NodeInfo())
748 for _, node := range s.closestGoodNodes(8, targetID) {
749 rNodes = append(rNodes, node.NodeInfo())
752 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
753 for i, ni := range rNodes {
754 // TODO: Put IPv6 nodes into the correct dict element.
755 if ni.Addr.UDPAddr().IP.To4() == nil {
758 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
760 log.Printf("error compacting %#v: %s", ni, err)
764 s.reply(source, m["t"].(string), map[string]interface{}{
765 "nodes": string(nodesBytes),
767 case "announce_peer":
768 // TODO(anacrolix): Implement this lolz.
771 // TODO(anacrolix): Or reject, I don't think I want this.
773 log.Printf("%s: not handling received query: q=%s", s, m["q"])
778 func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) {
780 r = make(map[string]interface{}, 1)
783 m := map[string]interface{}{
788 b, err := bencode.Marshal(m)
792 err = s.writeToNode(b, addr)
794 log.Printf("error replying to %s: %s", addr, err)
798 // Returns a node struct for the addr. It is taken from the table or created
799 // and possibly added if required and meets validity constraints.
800 func (s *Server) getNode(addr dHTAddr, id string) (n *node) {
801 addrStr := addr.String()
805 n.SetIDFromString(id)
813 n.SetIDFromString(id)
815 if len(s.nodes) >= maxNodes {
818 if !s.config.NoSecurity && !n.IsSecure() {
821 if s.badNodes.Test([]byte(addrStr)) {
828 func (s *Server) nodeTimedOut(addr dHTAddr) {
829 node, ok := s.nodes[addr.String()]
833 if node.DefinitelyGood() {
836 if len(s.nodes) < maxNodes {
839 delete(s.nodes, addr.String())
842 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
843 if list := s.ipBlockList; list != nil {
844 if r := list.Lookup(missinggo.AddrIP(node.UDPAddr())); r != nil {
845 err = fmt.Errorf("write to %s blocked: %s", node, r.Description)
849 n, err := s.socket.WriteTo(b, node.UDPAddr())
851 err = fmt.Errorf("error writing %d bytes to %s: %#v", len(b), node, err)
855 err = io.ErrShortWrite
861 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction {
862 return s.transactions[transactionKey{
867 func (s *Server) nextTransactionID() string {
868 var b [binary.MaxVarintLen64]byte
869 n := binary.PutUvarint(b[:], s.transactionIDInt)
874 func (s *Server) deleteTransaction(t *Transaction) {
875 delete(s.transactions, t.key())
878 func (s *Server) addTransaction(t *Transaction) {
879 if _, ok := s.transactions[t.key()]; ok {
880 panic("transaction not unique")
882 s.transactions[t.key()] = t
885 // Returns the 20-byte server ID. This is the ID used to communicate with the
887 func (s *Server) ID() string {
894 func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
895 tid := s.nextTransactionID()
897 a = make(map[string]interface{}, 1)
900 d := map[string]interface{}{
906 b, err := bencode.Marshal(d)
913 response: make(chan Msg, 1),
914 done: make(chan struct{}),
917 onResponse: onResponse,
923 s.getNode(node, "").lastSentQuery = time.Now()
929 // The size in bytes of a NodeInfo in its compact binary representation.
930 const CompactNodeInfoLen = 26
932 type NodeInfo struct {
937 // Writes the node info to its compact binary representation in b. See
938 // CompactNodeInfoLen.
939 func (ni *NodeInfo) PutCompact(b []byte) error {
940 if n := copy(b[:], ni.ID[:]); n != 20 {
943 ip := missinggo.AddrIP(ni.Addr).To4()
945 return errors.New("expected ipv4 address")
947 if n := copy(b[20:], ip); n != 4 {
950 binary.BigEndian.PutUint16(b[24:], uint16(missinggo.AddrPort(ni.Addr)))
954 func (cni *NodeInfo) UnmarshalCompact(b []byte) error {
956 return errors.New("expected 26 bytes")
958 missinggo.CopyExact(cni.ID[:], b[:20])
959 cni.Addr = newDHTAddr(&net.UDPAddr{
960 IP: net.IPv4(b[20], b[21], b[22], b[23]),
961 Port: int(binary.BigEndian.Uint16(b[24:26])),
966 // Sends a ping query to the address given.
967 func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) {
970 return s.query(newDHTAddr(node), "ping", nil, nil)
973 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) (err error) {
974 if port == 0 && !impliedPort {
975 return errors.New("nothing to announce")
977 _, err = s.query(node, "announce_peer", map[string]interface{}{
978 "implied_port": func() int {
985 "info_hash": infoHash,
989 if err := m.Error(); err != nil {
990 announceErrors.Add(1)
992 // logonce.Stderr.Printf("announce_peer response: %s", err)
995 s.numConfirmedAnnounces++
1000 // Add response nodes to node table.
1001 func (s *Server) liftNodes(d Msg) {
1005 for _, cni := range d.Nodes() {
1006 if missinggo.AddrPort(cni.Addr) == 0 {
1007 // TODO: Why would people even do this?
1010 if s.ipBlocked(missinggo.AddrIP(cni.Addr)) {
1013 n := s.getNode(cni.Addr, string(cni.ID[:]))
1014 n.SetIDFromBytes(cni.ID[:])
1018 // Sends a find_node query to addr. targetID is the node we're looking for.
1019 func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) {
1020 t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
1021 // Scrape peers from the response to put in the server's table before
1022 // handing the response back to the caller.
1031 // In a get_peers response, the addresses of torrent clients involved with the
1032 // queried info-hash.
1033 func (m Msg) Values() (vs []util.CompactPeer) {
1038 rd, ok := r.(map[string]interface{})
1042 v, ok := rd["values"]
1046 vl, ok := v.([]interface{})
1048 if missinggo.CryHeard() {
1049 log.Printf(`unexpected krpc "values" field: %#v`, v)
1053 vs = make([]util.CompactPeer, 0, len(vl))
1054 for _, i := range vl {
1059 var cp util.CompactPeer
1060 err := cp.UnmarshalBinary([]byte(s))
1062 log.Printf("error decoding values list element: %s", err)
1070 func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) {
1071 if len(infoHash) != 20 {
1072 err = fmt.Errorf("infohash has bad length")
1075 t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
1077 at, ok := m.AnnounceToken()
1079 s.getNode(addr, m.ID()).announceToken = at
1085 func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) {
1086 bootstrapNodes := nodeAddrs
1087 if len(bootstrapNodes) == 0 {
1088 bootstrapNodes = []string{
1089 "router.utorrent.com:6881",
1090 "router.bittorrent.com:6881",
1093 for _, addrStr := range bootstrapNodes {
1094 udpAddr, err := net.ResolveUDPAddr("udp4", addrStr)
1098 addrs = append(addrs, udpAddr)
1100 if len(addrs) == 0 {
1101 err = errors.New("nothing resolved")
1106 // Adds bootstrap nodes directly to table, if there's room. Node ID security
1107 // is bypassed, but the IP blocklist is not.
1108 func (s *Server) addRootNodes() error {
1109 addrs, err := bootstrapAddrs(s.bootstrapNodes)
1113 for _, addr := range addrs {
1114 if len(s.nodes) >= maxNodes {
1117 if s.nodes[addr.String()] != nil {
1120 if s.ipBlocked(addr.IP) {
1121 log.Printf("dht root node is in the blocklist: %s", addr.IP)
1124 s.nodes[addr.String()] = &node{
1125 addr: newDHTAddr(addr),
1131 // Populates the node table.
1132 func (s *Server) bootstrap() (err error) {
1135 if len(s.nodes) == 0 {
1136 err = s.addRootNodes()
1142 var outstanding sync.WaitGroup
1143 for _, node := range s.nodes {
1145 t, err = s.findNode(node.addr, s.id)
1147 err = fmt.Errorf("error sending find_node: %s", err)
1151 t.SetResponseHandler(func(Msg) {
1155 noOutstanding := make(chan struct{})
1158 close(noOutstanding)
1165 case <-time.After(15 * time.Second):
1166 case <-noOutstanding:
1169 // log.Printf("now have %d nodes", len(s.nodes))
1170 if s.numGoodNodes() >= 160 {
1177 func (s *Server) numGoodNodes() (num int) {
1178 for _, n := range s.nodes {
1179 if n.DefinitelyGood() {
1186 // Returns how many nodes are in the node table.
1187 func (s *Server) NumNodes() int {
1193 // Exports the current node table.
1194 func (s *Server) Nodes() (nis []NodeInfo) {
1197 for _, node := range s.nodes {
1198 // if !node.Good() {
1204 if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
1207 nis = append(nis, ni)
1212 // Stops the server network activity. This is all that's required to clean-up a Server.
1213 func (s *Server) Close() {
1224 var maxDistance big.Int
1228 maxDistance.SetBit(&zero, 160, 1)
1231 func (s *Server) closestGoodNodes(k int, targetID string) []*node {
1232 return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() })
1235 func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node {
1236 sel := newKClosestNodesSelector(k, target)
1237 idNodes := make(map[string]*node, len(s.nodes))
1238 for _, node := range s.nodes {
1243 idNodes[node.idString()] = node
1246 ret := make([]*node, 0, len(ids))
1247 for _, id := range ids {
1248 ret = append(ret, idNodes[id.ByteString()])
1253 func (me *Server) badNode(addr dHTAddr) {
1254 me.badNodes.Add([]byte(addr.String()))
1255 delete(me.nodes, addr.String())