From b68d7cd08e83ede9af5a43b6b2fd2eda9f58aba2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 14 Dec 2016 11:43:37 +1100 Subject: [PATCH] =?utf8?q?dht/=E2=80=A6=20moved=20to=20github.com/anacroli?= =?utf8?q?x/dht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- client.go | 4 +- client_test.go | 2 +- cmd/dht-get-peers/main.go | 4 +- cmd/dht-ping/main.go | 4 +- cmd/dht-secure-id/main.go | 2 +- cmd/dht-server/main.go | 4 +- config.go | 2 +- dht/addr.go | 33 -- dht/announce.go | 264 ------------ dht/bitcount_test.go | 55 --- dht/closest_nodes.go | 51 --- dht/dht.go | 226 ---------- dht/dht_test.go | 233 ---------- dht/doc.go | 22 - dht/expvar.go | 19 - dht/krpc/compact_node_info.go | 49 --- dht/krpc/error.go | 50 --- dht/krpc/msg.go | 69 --- dht/krpc/msg_test.go | 89 ---- dht/krpc/nodeinfo.go | 46 -- dht/security.go | 106 ----- dht/security_test.go | 64 --- dht/server.go | 772 ---------------------------------- dht/tokens.go | 54 --- dht/tokens_test.go | 54 --- dht/transaction.go | 150 ------- torrent.go | 2 +- 27 files changed, 12 insertions(+), 2418 deletions(-) delete mode 100644 dht/addr.go delete mode 100644 dht/announce.go delete mode 100644 dht/bitcount_test.go delete mode 100644 dht/closest_nodes.go delete mode 100644 dht/dht.go delete mode 100644 dht/dht_test.go delete mode 100644 dht/doc.go delete mode 100644 dht/expvar.go delete mode 100644 dht/krpc/compact_node_info.go delete mode 100644 dht/krpc/error.go delete mode 100644 dht/krpc/msg.go delete mode 100644 dht/krpc/msg_test.go delete mode 100644 dht/krpc/nodeinfo.go delete mode 100644 dht/security.go delete mode 100644 dht/security_test.go delete mode 100644 dht/server.go delete mode 100644 dht/tokens.go delete mode 100644 dht/tokens_test.go delete mode 100644 dht/transaction.go diff --git a/client.go b/client.go index b12dbf69..80cbb32a 100644 --- a/client.go +++ b/client.go @@ -25,8 +25,8 @@ import ( "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/dht" - "github.com/anacrolix/torrent/dht/krpc" + "github.com/anacrolix/dht" + "github.com/anacrolix/dht/krpc" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" diff --git a/client_test.go b/client_test.go index 7aad22f8..55db6b39 100644 --- a/client_test.go +++ b/client_test.go @@ -27,7 +27,7 @@ import ( "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/dht" + "github.com/anacrolix/dht" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" diff --git a/cmd/dht-get-peers/main.go b/cmd/dht-get-peers/main.go index 32e9fe61..1c0a2776 100644 --- a/cmd/dht-get-peers/main.go +++ b/cmd/dht-get-peers/main.go @@ -11,8 +11,8 @@ import ( _ "github.com/anacrolix/envpprof" - "github.com/anacrolix/torrent/dht" - "github.com/anacrolix/torrent/dht/krpc" + "github.com/anacrolix/dht" + "github.com/anacrolix/dht/krpc" ) var ( diff --git a/cmd/dht-ping/main.go b/cmd/dht-ping/main.go index 97edf72d..f85f5760 100644 --- a/cmd/dht-ping/main.go +++ b/cmd/dht-ping/main.go @@ -12,8 +12,8 @@ import ( "github.com/anacrolix/tagflag" "github.com/bradfitz/iter" - "github.com/anacrolix/torrent/dht" - "github.com/anacrolix/torrent/dht/krpc" + "github.com/anacrolix/dht" + "github.com/anacrolix/dht/krpc" ) func main() { diff --git a/cmd/dht-secure-id/main.go b/cmd/dht-secure-id/main.go index cf6c3a08..0b393ea4 100644 --- a/cmd/dht-secure-id/main.go +++ b/cmd/dht-secure-id/main.go @@ -8,7 +8,7 @@ import ( "github.com/docopt/docopt-go" - "github.com/anacrolix/torrent/dht" + "github.com/anacrolix/dht" ) func main() { diff --git a/cmd/dht-server/main.go b/cmd/dht-server/main.go index 0bb03509..c31bcd8e 100644 --- a/cmd/dht-server/main.go +++ b/cmd/dht-server/main.go @@ -8,8 +8,8 @@ import ( "os" "os/signal" - "github.com/anacrolix/torrent/dht" - "github.com/anacrolix/torrent/dht/krpc" + "github.com/anacrolix/dht" + "github.com/anacrolix/dht/krpc" ) var ( diff --git a/config.go b/config.go index 11315b0e..8e60a39e 100644 --- a/config.go +++ b/config.go @@ -3,7 +3,7 @@ package torrent import ( "golang.org/x/time/rate" - "github.com/anacrolix/torrent/dht" + "github.com/anacrolix/dht" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/storage" ) diff --git a/dht/addr.go b/dht/addr.go deleted file mode 100644 index 5121ffc5..00000000 --- a/dht/addr.go +++ /dev/null @@ -1,33 +0,0 @@ -package dht - -import "net" - -// Used internally to refer to node network addresses. String() is called a -// lot, and so can be optimized. Network() is not exposed, so that the -// interface does not satisfy net.Addr, as the underlying type must be passed -// to any OS-level function that take net.Addr. -type Addr interface { - UDPAddr() *net.UDPAddr - String() string -} - -// Speeds up some of the commonly called Addr methods. -type cachedAddr struct { - ua net.UDPAddr - s string -} - -func (ca cachedAddr) String() string { - return ca.s -} - -func (ca cachedAddr) UDPAddr() *net.UDPAddr { - return &ca.ua -} - -func NewAddr(ua *net.UDPAddr) Addr { - return cachedAddr{ - ua: *ua, - s: ua.String(), - } -} diff --git a/dht/announce.go b/dht/announce.go deleted file mode 100644 index 4f8284e7..00000000 --- a/dht/announce.go +++ /dev/null @@ -1,264 +0,0 @@ -package dht - -// get_peers and announce_peers. - -import ( - "log" - "time" - - "github.com/anacrolix/sync" - "github.com/willf/bloom" - - "github.com/anacrolix/torrent/dht/krpc" - "github.com/anacrolix/torrent/logonce" -) - -// Maintains state for an ongoing Announce operation. An Announce is started -// by calling Server.Announce. -type Announce struct { - mu sync.Mutex - Peers chan PeersValues - // Inner chan is set to nil when on close. - values chan PeersValues - stop chan struct{} - triedAddrs *bloom.BloomFilter - // True when contact with all starting addrs has been initiated. This - // prevents a race where the first transaction finishes before the rest - // have been opened, sees no other transactions are pending and ends the - // announce. - contactedStartAddrs bool - // How many transactions are still ongoing. - pending int - server *Server - infoHash string - // Count of (probably) distinct addresses we've sent get_peers requests - // to. - numContacted int - // The torrent port that we're announcing. - announcePort int - // The torrent port should be determined by the receiver in case we're - // being NATed. - announcePortImplied bool -} - -// Returns the number of distinct remote addresses the announce has queried. -func (a *Announce) NumContacted() int { - a.mu.Lock() - defer a.mu.Unlock() - return a.numContacted -} - -// This is kind of the main thing you want to do with DHT. It traverses the -// graph toward nodes that store peers for the infohash, streaming them to the -// caller, and announcing the local node to each node if allowed and -// specified. -func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announce, error) { - s.mu.Lock() - startAddrs := func() (ret []Addr) { - for _, n := range s.closestGoodNodes(160, infoHash) { - ret = append(ret, n.addr) - } - return - }() - s.mu.Unlock() - if len(startAddrs) == 0 && !s.config.NoDefaultBootstrap { - addrs, err := bootstrapAddrs(s.bootstrapNodes) - if err != nil { - return nil, err - } - for _, addr := range addrs { - startAddrs = append(startAddrs, NewAddr(addr)) - } - } - disc := &Announce{ - Peers: make(chan PeersValues, 100), - stop: make(chan struct{}), - values: make(chan PeersValues), - triedAddrs: bloom.NewWithEstimates(1000, 0.5), - server: s, - infoHash: infoHash, - announcePort: port, - announcePortImplied: impliedPort, - } - // Function ferries from values to Values until discovery is halted. - go func() { - defer close(disc.Peers) - for { - select { - case psv := <-disc.values: - select { - case disc.Peers <- psv: - case <-disc.stop: - return - } - case <-disc.stop: - return - } - } - }() - go func() { - disc.mu.Lock() - defer disc.mu.Unlock() - for i, addr := range startAddrs { - if i != 0 { - disc.mu.Unlock() - time.Sleep(time.Millisecond) - disc.mu.Lock() - } - disc.contact(addr) - } - disc.contactedStartAddrs = true - // If we failed to contact any of the starting addrs, no transactions - // will complete triggering a check that there are no pending - // responses. - disc.maybeClose() - }() - return disc, nil -} - -func validNodeAddr(addr Addr) bool { - ua := addr.UDPAddr() - if ua.Port == 0 { - return false - } - if ip4 := ua.IP.To4(); ip4 != nil && ip4[0] == 0 { - return false - } - return true -} - -// TODO: Merge this with maybeGetPeersFromAddr. -func (a *Announce) gotNodeAddr(addr Addr) { - if !validNodeAddr(addr) { - return - } - if a.triedAddrs.Test([]byte(addr.String())) { - return - } - if a.server.ipBlocked(addr.UDPAddr().IP) { - return - } - a.server.mu.Lock() - if a.server.badNodes.Test([]byte(addr.String())) { - a.server.mu.Unlock() - return - } - a.server.mu.Unlock() - a.contact(addr) -} - -// TODO: Merge this with maybeGetPeersFromAddr. -func (a *Announce) contact(addr Addr) { - a.numContacted++ - a.triedAddrs.Add([]byte(addr.String())) - if err := a.getPeers(addr); err != nil { - log.Printf("error sending get_peers request to %s: %#v", addr, err) - return - } - a.pending++ -} - -func (a *Announce) maybeClose() { - if a.contactedStartAddrs && a.pending == 0 { - a.close() - } -} - -func (a *Announce) transactionClosed() { - a.pending-- - a.maybeClose() -} - -func (a *Announce) responseNode(node krpc.NodeInfo) { - a.gotNodeAddr(NewAddr(node.Addr)) -} - -func (a *Announce) closingCh() chan struct{} { - return a.stop -} - -// Announce to a peer, if appropriate. -func (a *Announce) maybeAnnouncePeer(to Addr, token, peerId string) { - a.server.mu.Lock() - defer a.server.mu.Unlock() - if !a.server.config.NoSecurity { - if len(peerId) != 20 { - return - } - if !NodeIdSecure(peerId, to.UDPAddr().IP) { - return - } - } - err := a.server.announcePeer(to, a.infoHash, a.announcePort, token, a.announcePortImplied) - if err != nil { - logonce.Stderr.Printf("error announcing peer: %s", err) - } -} - -func (a *Announce) getPeers(addr Addr) error { - a.server.mu.Lock() - defer a.server.mu.Unlock() - t, err := a.server.getPeers(addr, a.infoHash) - if err != nil { - return err - } - t.SetResponseHandler(func(m krpc.Msg, ok bool) { - // Register suggested nodes closer to the target info-hash. - if m.R != nil { - a.mu.Lock() - for _, n := range m.R.Nodes { - a.responseNode(n) - } - a.mu.Unlock() - - if vs := m.R.Values; len(vs) != 0 { - nodeInfo := krpc.NodeInfo{ - Addr: t.remoteAddr.UDPAddr(), - } - copy(nodeInfo.ID[:], m.SenderID()) - select { - case a.values <- PeersValues{ - Peers: func() (ret []Peer) { - for _, cp := range vs { - ret = append(ret, Peer(cp)) - } - return - }(), - NodeInfo: nodeInfo, - }: - case <-a.stop: - } - } - - a.maybeAnnouncePeer(addr, m.R.Token, m.SenderID()) - } - - a.mu.Lock() - a.transactionClosed() - a.mu.Unlock() - }) - return nil -} - -// Corresponds to the "values" key in a get_peers KRPC response. A list of -// peers that a node has reported as being in the swarm for a queried info -// hash. -type PeersValues struct { - Peers []Peer // Peers given in get_peers response. - krpc.NodeInfo // The node that gave the response. -} - -// Stop the announce. -func (a *Announce) Close() { - a.mu.Lock() - defer a.mu.Unlock() - a.close() -} - -func (a *Announce) close() { - select { - case <-a.stop: - default: - close(a.stop) - } -} diff --git a/dht/bitcount_test.go b/dht/bitcount_test.go deleted file mode 100644 index a21c8c0b..00000000 --- a/dht/bitcount_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package dht - -import ( - "math/big" -) - -// TODO: The bitcounting is a relic of the old and incorrect distance -// calculation. It is still useful in some tests but should eventually be -// replaced with actual distances. - -// How many bits? -func bitCount(n big.Int) int { - var count int = 0 - for _, b := range n.Bytes() { - count += int(bitCounts[b]) - } - return count -} - -// The bit counts for each byte value (0 - 255). -var bitCounts = []int8{ - // Generated by Java BitCount of all values from 0 to 255 - 0, 1, 1, 2, 1, 2, 2, 3, - 1, 2, 2, 3, 2, 3, 3, 4, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 4, 5, 5, 6, 5, 6, 6, 7, - 5, 6, 6, 7, 6, 7, 7, 8, -} diff --git a/dht/closest_nodes.go b/dht/closest_nodes.go deleted file mode 100644 index 657fa917..00000000 --- a/dht/closest_nodes.go +++ /dev/null @@ -1,51 +0,0 @@ -package dht - -import ( - "container/heap" -) - -type nodeMaxHeap struct { - IDs []nodeID - Target nodeID -} - -func (mh nodeMaxHeap) Len() int { return len(mh.IDs) } - -func (mh nodeMaxHeap) Less(i, j int) bool { - m := mh.IDs[i].Distance(&mh.Target) - n := mh.IDs[j].Distance(&mh.Target) - return m.Cmp(&n) > 0 -} - -func (mh *nodeMaxHeap) Pop() (ret interface{}) { - ret, mh.IDs = mh.IDs[len(mh.IDs)-1], mh.IDs[:len(mh.IDs)-1] - return -} -func (mh *nodeMaxHeap) Push(val interface{}) { - mh.IDs = append(mh.IDs, val.(nodeID)) -} -func (mh nodeMaxHeap) Swap(i, j int) { - mh.IDs[i], mh.IDs[j] = mh.IDs[j], mh.IDs[i] -} - -type closestNodesSelector struct { - closest nodeMaxHeap - k int -} - -func (cns *closestNodesSelector) Push(id nodeID) { - heap.Push(&cns.closest, id) - if cns.closest.Len() > cns.k { - heap.Pop(&cns.closest) - } -} - -func (cns *closestNodesSelector) IDs() []nodeID { - return cns.closest.IDs -} - -func newKClosestNodesSelector(k int, targetID nodeID) (ret closestNodesSelector) { - ret.k = k - ret.closest.Target = targetID - return -} diff --git a/dht/dht.go b/dht/dht.go deleted file mode 100644 index edb19395..00000000 --- a/dht/dht.go +++ /dev/null @@ -1,226 +0,0 @@ -package dht - -import ( - _ "crypto/sha1" - "errors" - "math/big" - "math/rand" - "net" - "strconv" - "time" - - "github.com/anacrolix/torrent/dht/krpc" - "github.com/anacrolix/torrent/iplist" - "github.com/anacrolix/torrent/metainfo" -) - -const ( - maxNodes = 320 -) - -var ( - queryResendEvery = 5 * time.Second -) - -var maxDistance big.Int - -func init() { - var zero big.Int - maxDistance.SetBit(&zero, 160, 1) -} - -// Uniquely identifies a transaction to us. -type transactionKey struct { - RemoteAddr string // host:port - T string // The KRPC transaction ID. -} - -// ServerConfig allows to set up a configuration of the `Server` instance -// to be created with NewServer -type ServerConfig struct { - // Listen address. Used if Conn is nil. - Addr string - - // Set NodeId Manually. Caller must ensure that, if NodeId does not - // conform to DHT Security Extensions, that NoSecurity is also set. This - // should be given as a HEX string. - NodeIdHex string - - Conn net.PacketConn - // Don't respond to queries from other nodes. - Passive bool - // DHT Bootstrap nodes - BootstrapNodes []string - // Disable bootstrapping from global servers even if given no BootstrapNodes. - // This creates a solitary node that awaits other nodes; it's only useful if - // you're creating your own DHT and want to avoid accidental crossover, without - // spoofing a bootstrap node and filling your logs with connection errors. - NoDefaultBootstrap bool - - // Disable the DHT security extension: - // http://www.libtorrent.org/dht_sec.html. - NoSecurity bool - // Initial IP blocklist to use. Applied before serving and bootstrapping - // begins. - IPBlocklist iplist.Ranger - // Used to secure the server's ID. Defaults to the Conn's LocalAddr(). - PublicIP net.IP - - // Hook received queries. Return true if you don't want to propagate to - // the default handlers. - OnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) - // Called when a peer successfully announces to us. - OnAnnouncePeer func(infoHash metainfo.Hash, peer Peer) -} - -// ServerStats instance is returned by Server.Stats() and stores Server metrics -type ServerStats struct { - // Count of nodes in the node table that responded to our last query or - // haven't yet been queried. - GoodNodes int - // Count of nodes in the node table. - Nodes int - // Transactions awaiting a response. - OutstandingTransactions int - // Individual announce_peer requests that got a success response. - ConfirmedAnnounces int - // Nodes that have been blocked. - BadNodes uint -} - -func makeSocket(addr string) (socket *net.UDPConn, err error) { - addr_, err := net.ResolveUDPAddr("", addr) - if err != nil { - return - } - socket, err = net.ListenUDP("udp", addr_) - return -} - -type nodeID struct { - i big.Int - set bool -} - -func (nid *nodeID) IsUnset() bool { - return !nid.set -} - -func nodeIDFromString(s string) (ret nodeID) { - if s == "" { - return - } - ret.i.SetBytes([]byte(s)) - ret.set = true - return -} - -func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) { - if nid0.IsUnset() != nid1.IsUnset() { - ret = maxDistance - return - } - ret.Xor(&nid0.i, &nid1.i) - return -} - -func (nid *nodeID) ByteString() string { - var buf [20]byte - b := nid.i.Bytes() - copy(buf[20-len(b):], b) - return string(buf[:]) -} - -type node struct { - addr Addr - id nodeID - announceToken string - - lastGotQuery time.Time - lastGotResponse time.Time - lastSentQuery time.Time -} - -func (n *node) IsSecure() bool { - if n.id.IsUnset() { - return false - } - return NodeIdSecure(n.id.ByteString(), n.addr.UDPAddr().IP) -} - -func (n *node) idString() string { - return n.id.ByteString() -} - -func (n *node) SetIDFromBytes(b []byte) { - if len(b) != 20 { - panic(b) - } - n.id.i.SetBytes(b) - n.id.set = true -} - -func (n *node) SetIDFromString(s string) { - n.SetIDFromBytes([]byte(s)) -} - -func (n *node) IDNotSet() bool { - return n.id.i.Int64() == 0 -} - -func (n *node) NodeInfo() (ret krpc.NodeInfo) { - ret.Addr = n.addr.UDPAddr() - if n := copy(ret.ID[:], n.idString()); n != 20 { - panic(n) - } - return -} - -func (n *node) DefinitelyGood() bool { - if len(n.idString()) != 20 { - return false - } - // No reason to think ill of them if they've never been queried. - if n.lastSentQuery.IsZero() { - return true - } - // They answered our last query. - if n.lastSentQuery.Before(n.lastGotResponse) { - return true - } - return true -} - -func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration { - return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus))) -} - -type Peer struct { - IP net.IP - Port int -} - -func (p *Peer) String() string { - return net.JoinHostPort(p.IP.String(), strconv.FormatInt(int64(p.Port), 10)) -} - -func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) { - bootstrapNodes := nodeAddrs - if len(bootstrapNodes) == 0 { - bootstrapNodes = []string{ - "router.utorrent.com:6881", - "router.bittorrent.com:6881", - } - } - for _, addrStr := range bootstrapNodes { - udpAddr, err := net.ResolveUDPAddr("udp4", addrStr) - if err != nil { - continue - } - addrs = append(addrs, udpAddr) - } - if len(addrs) == 0 { - err = errors.New("nothing resolved") - } - return -} diff --git a/dht/dht_test.go b/dht/dht_test.go deleted file mode 100644 index 1f394fde..00000000 --- a/dht/dht_test.go +++ /dev/null @@ -1,233 +0,0 @@ -package dht - -import ( - "encoding/hex" - "log" - "math/big" - "math/rand" - "net" - "testing" - "time" - - _ "github.com/anacrolix/envpprof" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/anacrolix/torrent/dht/krpc" -) - -func TestSetNilBigInt(t *testing.T) { - i := new(big.Int) - i.SetBytes(make([]byte, 2)) -} - -func TestMarshalCompactNodeInfo(t *testing.T) { - cni := krpc.NodeInfo{ - ID: [20]byte{'a', 'b', 'c'}, - } - addr, err := net.ResolveUDPAddr("udp4", "1.2.3.4:5") - require.NoError(t, err) - cni.Addr = addr - var b [krpc.CompactIPv4NodeInfoLen]byte - err = cni.PutCompact(b[:]) - require.NoError(t, err) - var bb [26]byte - copy(bb[:], []byte("abc")) - copy(bb[20:], []byte("\x01\x02\x03\x04\x00\x05")) - assert.EqualValues(t, bb, b) -} - -func recoverPanicOrDie(t *testing.T, f func()) { - defer func() { - r := recover() - if r == nil { - t.Fatal("expected panic") - } - }() - f() -} - -const zeroID = "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - -var testIDs []nodeID - -func init() { - log.SetFlags(log.Flags() | log.Lshortfile) - for _, s := range []string{ - zeroID, - "\x03" + zeroID[1:], - "\x03" + zeroID[1:18] + "\x55\xf0", - "\x55" + zeroID[1:17] + "\xff\x55\x0f", - "\x54" + zeroID[1:18] + "\x50\x0f", - "", - } { - testIDs = append(testIDs, nodeIDFromString(s)) - } -} - -func TestDistances(t *testing.T) { - expectBitcount := func(i big.Int, count int) { - if bitCount(i) != count { - t.Fatalf("expected bitcount of %d: got %d", count, bitCount(i)) - } - } - expectBitcount(testIDs[3].Distance(&testIDs[0]), 4+8+4+4) - expectBitcount(testIDs[3].Distance(&testIDs[1]), 4+8+4+4) - expectBitcount(testIDs[3].Distance(&testIDs[2]), 4+8+8) - for i := 0; i < 5; i++ { - dist := testIDs[i].Distance(&testIDs[5]) - if dist.Cmp(&maxDistance) != 0 { - t.Fatal("expected max distance for comparison with unset node id") - } - } -} - -func TestMaxDistanceString(t *testing.T) { - if string(maxDistance.Bytes()) != "\x01"+zeroID { - t.FailNow() - } -} - -func TestClosestNodes(t *testing.T) { - cn := newKClosestNodesSelector(2, testIDs[3]) - for _, i := range rand.Perm(len(testIDs)) { - cn.Push(testIDs[i]) - } - if len(cn.IDs()) != 2 { - t.FailNow() - } - m := map[string]bool{} - for _, id := range cn.IDs() { - m[id.ByteString()] = true - } - if !m[testIDs[3].ByteString()] || !m[testIDs[4].ByteString()] { - t.FailNow() - } -} - -func TestDHTDefaultConfig(t *testing.T) { - s, err := NewServer(nil) - assert.NoError(t, err) - s.Close() -} - -func TestPing(t *testing.T) { - srv, err := NewServer(&ServerConfig{ - Addr: "127.0.0.1:5680", - NoDefaultBootstrap: true, - }) - require.NoError(t, err) - defer srv.Close() - srv0, err := NewServer(&ServerConfig{ - Addr: "127.0.0.1:5681", - BootstrapNodes: []string{"127.0.0.1:5680"}, - }) - require.NoError(t, err) - defer srv0.Close() - tn, err := srv.Ping(&net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: srv0.Addr().(*net.UDPAddr).Port, - }) - require.NoError(t, err) - defer tn.Close() - ok := make(chan bool) - tn.SetResponseHandler(func(msg krpc.Msg, msgOk bool) { - ok <- msg.SenderID() == srv0.ID() - }) - if !<-ok { - t.FailNow() - } -} - -func TestServerCustomNodeId(t *testing.T) { - customId := "5a3ce1c14e7a08645677bbd1cfe7d8f956d53256" - id, err := hex.DecodeString(customId) - assert.NoError(t, err) - // How to test custom *secure* Id when tester computers will have - // different Ids? Generate custom ids for local IPs and use - // mini-Id? - s, err := NewServer(&ServerConfig{ - NodeIdHex: customId, - NoDefaultBootstrap: true, - }) - require.NoError(t, err) - defer s.Close() - assert.Equal(t, string(id), s.ID()) -} - -func TestAnnounceTimeout(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - s, err := NewServer(&ServerConfig{ - BootstrapNodes: []string{"1.2.3.4:5"}, - }) - require.NoError(t, err) - a, err := s.Announce("12341234123412341234", 0, true) - assert.NoError(t, err) - <-a.Peers - a.Close() - s.Close() -} - -func TestEqualPointers(t *testing.T) { - assert.EqualValues(t, &krpc.Msg{R: &krpc.Return{}}, &krpc.Msg{R: &krpc.Return{}}) -} - -func TestHook(t *testing.T) { - t.Log("TestHook: Starting with Ping intercept/passthrough") - srv, err := NewServer(&ServerConfig{ - Addr: "127.0.0.1:5678", - NoDefaultBootstrap: true, - }) - require.NoError(t, err) - defer srv.Close() - // Establish server with a hook attached to "ping" - hookCalled := make(chan bool) - srv0, err := NewServer(&ServerConfig{ - Addr: "127.0.0.1:5679", - BootstrapNodes: []string{"127.0.0.1:5678"}, - OnQuery: func(m *krpc.Msg, addr net.Addr) bool { - if m.Q == "ping" { - hookCalled <- true - } - return true - }, - }) - require.NoError(t, err) - defer srv0.Close() - // Ping srv0 from srv to trigger hook. Should also receive a response. - t.Log("TestHook: Servers created, hook for ping established. Calling Ping.") - tn, err := srv.Ping(&net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: srv0.Addr().(*net.UDPAddr).Port, - }) - assert.NoError(t, err) - defer tn.Close() - // Await response from hooked server - tn.SetResponseHandler(func(msg krpc.Msg, b bool) { - t.Log("TestHook: Sender received response from pinged hook server, so normal execution resumed.") - }) - // Await signal that hook has been called. - select { - case <-hookCalled: - { - // Success, hook was triggered. Todo: Ensure that "ok" channel - // receives, also, indicating normal handling proceeded also. - t.Log("TestHook: Received ping, hook called and returned to normal execution!") - return - } - case <-time.After(time.Second * 1): - { - t.Error("Failed to see evidence of ping hook being called after 2 seconds.") - } - } -} - -// Check that address resolution doesn't rat out invalid SendTo addr -// arguments. -func TestResolveBadAddr(t *testing.T) { - ua, err := net.ResolveUDPAddr("udp", "0.131.255.145:33085") - require.NoError(t, err) - assert.False(t, validNodeAddr(NewAddr(ua))) -} diff --git a/dht/doc.go b/dht/doc.go deleted file mode 100644 index 3e65bc1d..00000000 --- a/dht/doc.go +++ /dev/null @@ -1,22 +0,0 @@ -// Package dht implements a Distributed Hash Table (DHT) part of -// the BitTorrent protocol, -// as specified by BEP 5: http://www.bittorrent.org/beps/bep_0005.html -// -// BitTorrent uses a "distributed hash table" (DHT) -// for storing peer contact information for "trackerless" torrents. -// In effect, each peer becomes a tracker. -// The protocol is based on Kademila DHT protocol and is implemented over UDP. -// -// Please note the terminology used to avoid confusion. -// A "peer" is a client/server listening on a TCP port that -// implements the BitTorrent protocol. -// A "node" is a client/server listening on a UDP port implementing -// the distributed hash table protocol. -// The DHT is composed of nodes and stores the location of peers. -// BitTorrent clients include a DHT node, which is used to contact other nodes -// in the DHT to get the location of peers to -// download from using the BitTorrent protocol. - -// Standard use involves creating a Server, and calling Announce on it with -// the details of your local torrent client and infohash of interest. -package dht diff --git a/dht/expvar.go b/dht/expvar.go deleted file mode 100644 index cf12ca68..00000000 --- a/dht/expvar.go +++ /dev/null @@ -1,19 +0,0 @@ -package dht - -import ( - "expvar" -) - -var ( - read = expvar.NewInt("dhtRead") - readBlocked = expvar.NewInt("dhtReadBlocked") - readNotKRPCDict = expvar.NewInt("dhtReadNotKRPCDict") - readUnmarshalError = expvar.NewInt("dhtReadUnmarshalError") - readQuery = expvar.NewInt("dhtReadQuery") - readQueryBad = expvar.NewInt("dhtQueryBad") - readAnnouncePeer = expvar.NewInt("dhtReadAnnouncePeer") - announceErrors = expvar.NewInt("dhtAnnounceErrors") - writeErrors = expvar.NewInt("dhtWriteErrors") - writes = expvar.NewInt("dhtWrites") - readInvalidToken = expvar.NewInt("dhtReadInvalidToken") -) diff --git a/dht/krpc/compact_node_info.go b/dht/krpc/compact_node_info.go deleted file mode 100644 index 82f36ff7..00000000 --- a/dht/krpc/compact_node_info.go +++ /dev/null @@ -1,49 +0,0 @@ -package krpc - -import ( - "bytes" - "encoding/binary" - "errors" - "fmt" - - "github.com/anacrolix/torrent/bencode" -) - -type CompactIPv4NodeInfo []NodeInfo - -var _ bencode.Unmarshaler = &CompactIPv4NodeInfo{} - -func (i *CompactIPv4NodeInfo) UnmarshalBencode(_b []byte) (err error) { - var b []byte - err = bencode.Unmarshal(_b, &b) - if err != nil { - return - } - if len(b)%CompactIPv4NodeInfoLen != 0 { - err = fmt.Errorf("bad length: %d", len(b)) - return - } - for k := 0; k < len(b); k += CompactIPv4NodeInfoLen { - var ni NodeInfo - err = ni.UnmarshalCompactIPv4(b[k : k+CompactIPv4NodeInfoLen]) - if err != nil { - return - } - *i = append(*i, ni) - } - return -} - -func (i CompactIPv4NodeInfo) MarshalBencode() (ret []byte, err error) { - var buf bytes.Buffer - for _, ni := range i { - buf.Write(ni.ID[:]) - if ni.Addr == nil { - err = errors.New("nil addr in node info") - return - } - buf.Write(ni.Addr.IP.To4()) - binary.Write(&buf, binary.BigEndian, uint16(ni.Addr.Port)) - } - return bencode.Marshal(buf.Bytes()) -} diff --git a/dht/krpc/error.go b/dht/krpc/error.go deleted file mode 100644 index 3196c94f..00000000 --- a/dht/krpc/error.go +++ /dev/null @@ -1,50 +0,0 @@ -package krpc - -import ( - "fmt" - - "github.com/anacrolix/torrent/bencode" -) - -var ErrorMethodUnknown = Error{ - Code: 204, - Msg: "Method Unknown", -} - -// Represented as a string or list in bencode. -type Error struct { - Code int - Msg string -} - -var ( - _ bencode.Unmarshaler = &Error{} - _ bencode.Marshaler = &Error{} - _ error = Error{} -) - -func (e *Error) UnmarshalBencode(_b []byte) (err error) { - var _v interface{} - err = bencode.Unmarshal(_b, &_v) - if err != nil { - return - } - switch v := _v.(type) { - case []interface{}: - e.Code = int(v[0].(int64)) - e.Msg = v[1].(string) - case string: - e.Msg = v - default: - err = fmt.Errorf(`KRPC error bencode value has unexpected type: %T`, _v) - } - return -} - -func (e Error) MarshalBencode() (ret []byte, err error) { - return bencode.Marshal([]interface{}{e.Code, e.Msg}) -} - -func (e Error) Error() string { - return fmt.Sprintf("KRPC error %d: %s", e.Code, e.Msg) -} diff --git a/dht/krpc/msg.go b/dht/krpc/msg.go deleted file mode 100644 index 9c171497..00000000 --- a/dht/krpc/msg.go +++ /dev/null @@ -1,69 +0,0 @@ -package krpc - -import ( - "fmt" - - "github.com/anacrolix/torrent/util" -) - -// Msg represents messages that nodes in the network send to each other as specified by the protocol. -// They are also refered to as the KRPC messages. -// There are three types of messages: QUERY, RESPONSE, ERROR -// The message is a dictonary that is then -// "bencoded" (serialization & compression format adopted by the BitTorrent) -// and sent via the UDP connection to peers. -// -// A KRPC message is a single dictionary with two keys common to every message and additional keys depending on the type of message. -// Every message has a key "t" with a string value representing a transaction ID. -// This transaction ID is generated by the querying node and is echoed in the response, so responses -// may be correlated with multiple queries to the same node. The transaction ID should be encoded as a short string of binary numbers, typically 2 characters are enough as they cover 2^16 outstanding queries. The other key contained in every KRPC message is "y" with a single character value describing the type of message. The value of the "y" key is one of "q" for query, "r" for response, or "e" for error. -// 3 message types: QUERY, RESPONSE, ERROR -type Msg struct { - Q string `bencode:"q,omitempty"` // Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer") - A *MsgArgs `bencode:"a,omitempty"` // named arguments sent with a query - T string `bencode:"t"` // required: transaction ID - Y string `bencode:"y"` // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR - R *Return `bencode:"r,omitempty"` // RESPONSE type only - E *Error `bencode:"e,omitempty"` // ERROR type only - IP util.CompactPeer `bencode:"ip,omitempty"` -} - -type MsgArgs struct { - ID string `bencode:"id"` // ID of the quirying Node - InfoHash string `bencode:"info_hash"` // InfoHash of the torrent - Target string `bencode:"target"` // ID of the node sought - Token string `bencode:"token"` // Token received from an earlier get_peers query - Port int `bencode:"port"` // Senders torrent port - ImpliedPort int `bencode:"implied_port"` // Use senders apparent DHT port -} - -type Return struct { - ID string `bencode:"id"` // ID of the querying node - Nodes CompactIPv4NodeInfo `bencode:"nodes,omitempty"` // K closest nodes to the requested target - Token string `bencode:"token,omitempty"` // Token for future announce_peer - Values []util.CompactPeer `bencode:"values,omitempty"` // Torrent peers -} - -var _ fmt.Stringer = Msg{} - -func (m Msg) String() string { - return fmt.Sprintf("%#v", m) -} - -// The node ID of the source of this Msg. -func (m Msg) SenderID() string { - switch m.Y { - case "q": - return m.A.ID - case "r": - return m.R.ID - } - return "" -} - -func (m Msg) Error() *Error { - if m.Y != "e" { - return nil - } - return m.E -} diff --git a/dht/krpc/msg_test.go b/dht/krpc/msg_test.go deleted file mode 100644 index 6854d899..00000000 --- a/dht/krpc/msg_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package krpc - -import ( - "net" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/util" -) - -func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) { - b, err := bencode.Marshal(m) - require.NoError(t, err) - assert.Equal(t, expected, string(b)) - var _m Msg - err = bencode.Unmarshal([]byte(expected), &_m) - assert.NoError(t, err) - assert.EqualValues(t, m, _m) - assert.EqualValues(t, m.A, _m.A) - assert.EqualValues(t, m.R, _m.R) -} - -func TestMarshalUnmarshalMsg(t *testing.T) { - testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e") - testMarshalUnmarshalMsg(t, Msg{ - Y: "q", - Q: "ping", - T: "hi", - }, "d1:q4:ping1:t2:hi1:y1:qe") - testMarshalUnmarshalMsg(t, Msg{ - Y: "e", - T: "42", - E: &Error{Code: 200, Msg: "fuck"}, - }, "d1:eli200e4:fucke1:t2:421:y1:ee") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{}, - }, "d1:rd2:id0:e1:t2:\x8c%1:y1:re") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{ - Nodes: CompactIPv4NodeInfo{ - NodeInfo{ - Addr: &net.UDPAddr{ - IP: net.IPv4(1, 2, 3, 4).To4(), - Port: 0x1234, - }, - }, - }, - }, - }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x124e1:t2:\x8c%1:y1:re") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{ - Values: []util.CompactPeer{ - { - IP: net.IPv4(1, 2, 3, 4).To4(), - Port: 0x5678, - }, - }, - }, - }, "d1:rd2:id0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x03", - R: &Return{ - ID: "\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2g", - }, - IP: util.CompactPeer{ - IP: net.IPv4(124, 168, 180, 8).To4(), - Port: 62844, - }, - }, "d2:ip6:|\xa8\xb4\b\xf5|1:rd2:id20:\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2ge1:t1:\x031:y1:re") -} - -func TestUnmarshalGetPeersResponse(t *testing.T) { - var msg Msg - err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg) - require.NoError(t, err) - assert.Len(t, msg.R.Values, 2) - assert.Len(t, msg.R.Nodes, 2) - assert.Nil(t, msg.E) -} diff --git a/dht/krpc/nodeinfo.go b/dht/krpc/nodeinfo.go deleted file mode 100644 index a7e0afa1..00000000 --- a/dht/krpc/nodeinfo.go +++ /dev/null @@ -1,46 +0,0 @@ -package krpc - -import ( - "encoding/binary" - "errors" - "net" - - "github.com/anacrolix/missinggo" -) - -// The size in bytes of a NodeInfo in its compact binary representation. -const CompactIPv4NodeInfoLen = 26 - -type NodeInfo struct { - ID [20]byte - Addr *net.UDPAddr -} - -// Writes the node info to its compact binary representation in b. See -// CompactNodeInfoLen. -func (ni *NodeInfo) PutCompact(b []byte) error { - if n := copy(b[:], ni.ID[:]); n != 20 { - panic(n) - } - ip := ni.Addr.IP.To4() - if len(ip) != 4 { - return errors.New("expected ipv4 address") - } - if n := copy(b[20:], ip); n != 4 { - panic(n) - } - binary.BigEndian.PutUint16(b[24:], uint16(ni.Addr.Port)) - return nil -} - -func (ni *NodeInfo) UnmarshalCompactIPv4(b []byte) error { - if len(b) != CompactIPv4NodeInfoLen { - return errors.New("expected 26 bytes") - } - missinggo.CopyExact(ni.ID[:], b[:20]) - ni.Addr = &net.UDPAddr{ - IP: append(make([]byte, 0, 4), b[20:24]...), - Port: int(binary.BigEndian.Uint16(b[24:26])), - } - return nil -} diff --git a/dht/security.go b/dht/security.go deleted file mode 100644 index 9355b7b5..00000000 --- a/dht/security.go +++ /dev/null @@ -1,106 +0,0 @@ -package dht - -import ( - "fmt" - "hash/crc32" - "net" -) - -func maskForIP(ip net.IP) []byte { - switch { - case ip.To4() != nil: - return []byte{0x03, 0x0f, 0x3f, 0xff} - default: - return []byte{0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff} - } -} - -// Generate the CRC used to make or validate secure node ID. -func crcIP(ip net.IP, rand uint8) uint32 { - if ip4 := ip.To4(); ip4 != nil { - ip = ip4 - } - // Copy IP so we can make changes. Go sux at this. - ip = append(make(net.IP, 0, len(ip)), ip...) - mask := maskForIP(ip) - for i := range mask { - ip[i] &= mask[i] - } - r := rand & 7 - ip[0] |= r << 5 - return crc32.Checksum(ip[:len(mask)], crc32.MakeTable(crc32.Castagnoli)) -} - -// Makes a node ID secure, in-place. The ID is 20 raw bytes. -// http://www.libtorrent.org/dht_sec.html -func SecureNodeId(id []byte, ip net.IP) { - crc := crcIP(ip, id[19]) - id[0] = byte(crc >> 24 & 0xff) - id[1] = byte(crc >> 16 & 0xff) - id[2] = byte(crc>>8&0xf8) | id[2]&7 -} - -// Returns whether the node ID is considered secure. The id is the 20 raw -// bytes. http://www.libtorrent.org/dht_sec.html -func NodeIdSecure(id string, ip net.IP) bool { - if isLocalNetwork(ip) { - return true - } - if len(id) != 20 { - panic(fmt.Sprintf("%q", id)) - } - if ip4 := ip.To4(); ip4 != nil { - ip = ip4 - } - crc := crcIP(ip, id[19]) - if id[0] != byte(crc>>24&0xff) { - return false - } - if id[1] != byte(crc>>16&0xff) { - return false - } - if id[2]&0xf8 != byte(crc>>8&0xf8) { - return false - } - return true -} - -var ( - classA, classB, classC *net.IPNet -) - -func mustParseCIDRIPNet(s string) *net.IPNet { - _, ret, err := net.ParseCIDR(s) - if err != nil { - panic(err) - } - return ret -} - -func init() { - classA = mustParseCIDRIPNet("10.0.0.0/8") - classB = mustParseCIDRIPNet("172.16.0.0/12") - classC = mustParseCIDRIPNet("192.168.0.0/16") -} - -// Per http://www.libtorrent.org/dht_sec.html#enforcement, the IP is -// considered a local network address and should be exempted from node ID -// verification. -func isLocalNetwork(ip net.IP) bool { - if classA.Contains(ip) { - return true - } - if classB.Contains(ip) { - return true - } - if classC.Contains(ip) { - return true - } - if ip.IsLinkLocalUnicast() { - return true - } - if ip.IsLoopback() { - return true - } - return false -} diff --git a/dht/security_test.go b/dht/security_test.go deleted file mode 100644 index 9cd84470..00000000 --- a/dht/security_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package dht - -import ( - "encoding/hex" - "net" - "testing" - - "github.com/anacrolix/missinggo" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDHTSec(t *testing.T) { - for _, case_ := range []struct { - ipStr string - nodeIDHex string - valid bool - }{ - // These 5 are from the spec example. They are all valid. - {"124.31.75.21", "5fbfbff10c5d6a4ec8a88e4c6ab4c28b95eee401", true}, - {"21.75.31.124", "5a3ce9c14e7a08645677bbd1cfe7d8f956d53256", true}, - {"65.23.51.170", "a5d43220bc8f112a3d426c84764f8c2a1150e616", true}, - {"84.124.73.14", "1b0321dd1bb1fe518101ceef99462b947a01ff41", true}, - {"43.213.53.83", "e56f6cbf5b7c4be0237986d5243b87aa6d51305a", true}, - // spec[0] with one of the rand() bytes changed. Valid. - {"124.31.75.21", "5fbfbff10c5d7a4ec8a88e4c6ab4c28b95eee401", true}, - // spec[1] with the 21st leading bit changed. Not Valid. - {"21.75.31.124", "5a3ce1c14e7a08645677bbd1cfe7d8f956d53256", false}, - // spec[2] with the 22nd leading bit changed. Valid. - {"65.23.51.170", "a5d43620bc8f112a3d426c84764f8c2a1150e616", true}, - // spec[3] with the 4th last bit changed. Valid. - {"84.124.73.14", "1b0321dd1bb1fe518101ceef99462b947a01fe01", true}, - // spec[4] with the 3rd last bit changed. Not valid. - {"43.213.53.83", "e56f6cbf5b7c4be0237986d5243b87aa6d51303e", false}, - // Because class A network. - {"10.213.53.83", "e56f6cbf5b7c4be0237986d5243b87aa6d51305a", true}, - // Because not class A, and id[0]&3 does not match. - {"12.213.53.83", "e56f6cbf5b7c4be0237986d5243b87aa6d51305a", false}, - // Because class C. - {"192.168.53.83", "e56f6cbf5b7c4be0237986d5243b87aa6d51305a", true}, - } { - ip := net.ParseIP(case_.ipStr) - id, err := hex.DecodeString(case_.nodeIDHex) - require.NoError(t, err) - secure := NodeIdSecure(string(id), ip) - assert.Equal(t, case_.valid, secure, "%v", case_) - if !secure { - // It's not secure, so secure it in place and then check it again. - SecureNodeId(id, ip) - assert.True(t, NodeIdSecure(string(id), ip), "%v", case_) - } - } -} - -func TestServerDefaultNodeIdSecure(t *testing.T) { - s, err := NewServer(&ServerConfig{ - NoDefaultBootstrap: true, - }) - require.NoError(t, err) - defer s.Close() - if !NodeIdSecure(s.ID(), missinggo.AddrIP(s.Addr())) { - t.Fatal("not secure") - } -} diff --git a/dht/server.go b/dht/server.go deleted file mode 100644 index e541e45c..00000000 --- a/dht/server.go +++ /dev/null @@ -1,772 +0,0 @@ -package dht - -import ( - "crypto" - "crypto/rand" - "encoding/binary" - "encoding/hex" - "errors" - "fmt" - "io" - "log" - "net" - "os" - "sync" - "time" - - "github.com/anacrolix/missinggo" - "github.com/tylertreat/BoomFilters" - - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/dht/krpc" - "github.com/anacrolix/torrent/iplist" - "github.com/anacrolix/torrent/logonce" - "github.com/anacrolix/torrent/metainfo" -) - -// A Server defines parameters for a DHT node server that is able to -// send queries, and respond to the ones from the network. -// Each node has a globally unique identifier known as the "node ID." -// Node IDs are chosen at random from the same 160-bit space -// as BitTorrent infohashes and define the behaviour of the node. -// Zero valued Server does not have a valid ID and thus -// is unable to function properly. Use `NewServer(nil)` -// to initialize a default node. -type Server struct { - id string - socket net.PacketConn - transactions map[transactionKey]*Transaction - transactionIDInt uint64 - nodes map[string]*node // Keyed by dHTAddr.String(). - mu sync.Mutex - closed missinggo.Event - ipBlockList iplist.Ranger - badNodes *boom.BloomFilter - tokenServer tokenServer - - numConfirmedAnnounces int - bootstrapNodes []string - config ServerConfig -} - -// Stats returns statistics for the server. -func (s *Server) Stats() (ss ServerStats) { - s.mu.Lock() - defer s.mu.Unlock() - for _, n := range s.nodes { - if n.DefinitelyGood() { - ss.GoodNodes++ - } - } - ss.Nodes = len(s.nodes) - ss.OutstandingTransactions = len(s.transactions) - ss.ConfirmedAnnounces = s.numConfirmedAnnounces - ss.BadNodes = s.badNodes.Count() - return -} - -// Addr returns the listen address for the server. Packets arriving to this address -// are processed by the server (unless aliens are involved). -func (s *Server) Addr() net.Addr { - return s.socket.LocalAddr() -} - -// NewServer initializes a new DHT node server. -func NewServer(c *ServerConfig) (s *Server, err error) { - if c == nil { - c = &ServerConfig{} - } - s = &Server{ - config: *c, - ipBlockList: c.IPBlocklist, - badNodes: boom.NewBloomFilter(1000, 0.1), - tokenServer: tokenServer{ - maxIntervalDelta: 2, - interval: 5 * time.Minute, - secret: make([]byte, 20), - }, - } - rand.Read(s.tokenServer.secret) - if c.Conn != nil { - s.socket = c.Conn - } else { - s.socket, err = makeSocket(c.Addr) - if err != nil { - return - } - } - s.bootstrapNodes = c.BootstrapNodes - if c.NodeIdHex != "" { - var rawID []byte - rawID, err = hex.DecodeString(c.NodeIdHex) - if err != nil { - return - } - s.id = string(rawID) - } - err = s.init() - if err != nil { - return - } - go func() { - err := s.serve() - s.mu.Lock() - defer s.mu.Unlock() - if s.closed.IsSet() { - return - } - if err != nil { - panic(err) - } - }() - go func() { - err := s.bootstrap() - if err != nil { - s.mu.Lock() - if !s.closed.IsSet() { - log.Printf("error bootstrapping DHT: %s", err) - } - s.mu.Unlock() - } - }() - return -} - -// Returns a description of the Server. Python repr-style. -func (s *Server) String() string { - return fmt.Sprintf("dht server on %s", s.socket.LocalAddr()) -} - -// Packets to and from any address matching a range in the list are dropped. -func (s *Server) SetIPBlockList(list iplist.Ranger) { - s.mu.Lock() - defer s.mu.Unlock() - s.ipBlockList = list -} - -func (s *Server) IPBlocklist() iplist.Ranger { - return s.ipBlockList -} - -func (s *Server) init() (err error) { - err = s.setDefaults() - if err != nil { - return - } - s.transactions = make(map[transactionKey]*Transaction) - return -} - -func (s *Server) processPacket(b []byte, addr Addr) { - if len(b) < 2 || b[0] != 'd' || b[len(b)-1] != 'e' { - // KRPC messages are bencoded dicts. - readNotKRPCDict.Add(1) - return - } - var d krpc.Msg - err := bencode.Unmarshal(b, &d) - if err != nil { - readUnmarshalError.Add(1) - func() { - if se, ok := err.(*bencode.SyntaxError); ok { - // The message was truncated. - if int(se.Offset) == len(b) { - return - } - // Some messages seem to drop to nul chars abrubtly. - if int(se.Offset) < len(b) && b[se.Offset] == 0 { - return - } - // The message isn't bencode from the first. - if se.Offset == 0 { - return - } - } - // if missinggo.CryHeard() { - // log.Printf("%s: received bad krpc message from %s: %s: %+q", s, addr, err, b) - // } - }() - return - } - s.mu.Lock() - defer s.mu.Unlock() - if s.closed.IsSet() { - return - } - if d.Y == "q" { - readQuery.Add(1) - s.handleQuery(addr, d) - return - } - t := s.findResponseTransaction(d.T, addr) - if t == nil { - //log.Printf("unexpected message: %#v", d) - return - } - node := s.getNode(addr, d.SenderID()) - node.lastGotResponse = time.Now() - // TODO: Update node ID as this is an authoritative packet. - go t.handleResponse(d) - s.deleteTransaction(t) -} - -func (s *Server) serve() error { - var b [0x10000]byte - for { - n, addr, err := s.socket.ReadFrom(b[:]) - if err != nil { - return err - } - read.Add(1) - if n == len(b) { - logonce.Stderr.Printf("received dht packet exceeds buffer size") - continue - } - s.mu.Lock() - blocked := s.ipBlocked(missinggo.AddrIP(addr)) - s.mu.Unlock() - if blocked { - readBlocked.Add(1) - continue - } - s.processPacket(b[:n], NewAddr(addr.(*net.UDPAddr))) - } -} - -func (s *Server) ipBlocked(ip net.IP) (blocked bool) { - if s.ipBlockList == nil { - return - } - _, blocked = s.ipBlockList.Lookup(ip) - return -} - -// Adds directly to the node table. -func (s *Server) AddNode(ni krpc.NodeInfo) { - s.mu.Lock() - defer s.mu.Unlock() - if s.nodes == nil { - s.nodes = make(map[string]*node) - } - s.getNode(NewAddr(ni.Addr), string(ni.ID[:])) -} - -func (s *Server) nodeByID(id string) *node { - for _, node := range s.nodes { - if node.idString() == id { - return node - } - } - return nil -} - -// TODO: Probably should write error messages back to senders if something is -// wrong. -func (s *Server) handleQuery(source Addr, m krpc.Msg) { - node := s.getNode(source, m.SenderID()) - node.lastGotQuery = time.Now() - if s.config.OnQuery != nil { - propagate := s.config.OnQuery(&m, source.UDPAddr()) - if !propagate { - return - } - } - // Don't respond. - if s.config.Passive { - return - } - args := m.A - switch m.Q { - case "ping": - s.reply(source, m.T, krpc.Return{}) - case "get_peers": // TODO: Extract common behaviour with find_node. - targetID := args.InfoHash - if len(targetID) != 20 { - break - } - var rNodes []krpc.NodeInfo - // TODO: Reply with "values" list if we have peers instead. - for _, node := range s.closestGoodNodes(8, targetID) { - rNodes = append(rNodes, node.NodeInfo()) - } - s.reply(source, m.T, krpc.Return{ - Nodes: rNodes, - Token: s.createToken(source), - }) - case "find_node": // TODO: Extract common behaviour with get_peers. - targetID := args.Target - if len(targetID) != 20 { - log.Printf("bad DHT query: %v", m) - return - } - var rNodes []krpc.NodeInfo - if node := s.nodeByID(targetID); node != nil { - rNodes = append(rNodes, node.NodeInfo()) - } else { - // This will probably cause a crash for IPv6, but meh. - for _, node := range s.closestGoodNodes(8, targetID) { - rNodes = append(rNodes, node.NodeInfo()) - } - } - s.reply(source, m.T, krpc.Return{ - Nodes: rNodes, - }) - case "announce_peer": - readAnnouncePeer.Add(1) - if !s.validToken(args.Token, source) { - readInvalidToken.Add(1) - return - } - if len(args.InfoHash) != 20 { - readQueryBad.Add(1) - return - } - if h := s.config.OnAnnouncePeer; h != nil { - var ih metainfo.Hash - copy(ih[:], args.InfoHash) - p := Peer{ - IP: source.UDPAddr().IP, - Port: args.Port, - } - if args.ImpliedPort != 0 { - p.Port = source.UDPAddr().Port - } - go h(ih, p) - } - default: - s.sendError(source, m.T, krpc.ErrorMethodUnknown) - } -} - -func (s *Server) sendError(addr Addr, t string, e krpc.Error) { - m := krpc.Msg{ - T: t, - Y: "e", - E: &e, - } - b, err := bencode.Marshal(m) - if err != nil { - panic(err) - } - err = s.writeToNode(b, addr) - if err != nil { - log.Printf("error replying to %s: %s", addr, err) - } -} - -func (s *Server) reply(addr Addr, t string, r krpc.Return) { - r.ID = s.ID() - m := krpc.Msg{ - T: t, - Y: "r", - R: &r, - } - b, err := bencode.Marshal(m) - if err != nil { - panic(err) - } - err = s.writeToNode(b, addr) - if err != nil { - log.Printf("error replying to %s: %s", addr, err) - } -} - -// Returns a node struct for the addr. It is taken from the table or created -// and possibly added if required and meets validity constraints. -func (s *Server) getNode(addr Addr, id string) (n *node) { - addrStr := addr.String() - n = s.nodes[addrStr] - if n != nil { - if id != "" { - n.SetIDFromString(id) - } - return - } - n = &node{ - addr: addr, - } - if len(id) == 20 { - n.SetIDFromString(id) - } - if len(s.nodes) >= maxNodes { - return - } - // Exclude insecure nodes from the node table. - if !s.config.NoSecurity && !n.IsSecure() { - return - } - if s.badNodes.Test([]byte(addrStr)) { - return - } - s.nodes[addrStr] = n - return -} - -func (s *Server) nodeTimedOut(addr Addr) { - node, ok := s.nodes[addr.String()] - if !ok { - return - } - if node.DefinitelyGood() { - return - } - if len(s.nodes) < maxNodes { - return - } - delete(s.nodes, addr.String()) -} - -func (s *Server) writeToNode(b []byte, node Addr) (err error) { - if list := s.ipBlockList; list != nil { - if r, ok := list.Lookup(missinggo.AddrIP(node.UDPAddr())); ok { - err = fmt.Errorf("write to %s blocked: %s", node, r.Description) - return - } - } - n, err := s.socket.WriteTo(b, node.UDPAddr()) - writes.Add(1) - if err != nil { - writeErrors.Add(1) - err = fmt.Errorf("error writing %d bytes to %s: %s", len(b), node, err) - return - } - if n != len(b) { - err = io.ErrShortWrite - return - } - return -} - -func (s *Server) findResponseTransaction(transactionID string, sourceNode Addr) *Transaction { - return s.transactions[transactionKey{ - sourceNode.String(), - transactionID}] -} - -func (s *Server) nextTransactionID() string { - var b [binary.MaxVarintLen64]byte - n := binary.PutUvarint(b[:], s.transactionIDInt) - s.transactionIDInt++ - return string(b[:n]) -} - -func (s *Server) deleteTransaction(t *Transaction) { - delete(s.transactions, t.key()) -} - -func (s *Server) addTransaction(t *Transaction) { - if _, ok := s.transactions[t.key()]; ok { - panic("transaction not unique") - } - s.transactions[t.key()] = t -} - -// ID returns the 20-byte server ID. This is the ID used to communicate with the -// DHT network. -func (s *Server) ID() string { - if len(s.id) != 20 { - panic("bad node id") - } - return s.id -} - -func (s *Server) createToken(addr Addr) string { - return s.tokenServer.CreateToken(addr) -} - -func (s *Server) validToken(token string, addr Addr) bool { - return s.tokenServer.ValidToken(token, addr) -} - -func (s *Server) query(node Addr, q string, a map[string]interface{}, onResponse func(krpc.Msg)) (t *Transaction, err error) { - tid := s.nextTransactionID() - if a == nil { - a = make(map[string]interface{}, 1) - } - a["id"] = s.ID() - d := map[string]interface{}{ - "t": tid, - "y": "q", - "q": q, - "a": a, - } - // BEP 43. Outgoing queries from uncontactiable nodes should contain - // "ro":1 in the top level dictionary. - if s.config.Passive { - d["ro"] = 1 - } - b, err := bencode.Marshal(d) - if err != nil { - return - } - _t := &Transaction{ - remoteAddr: node, - t: tid, - response: make(chan krpc.Msg, 1), - done: make(chan struct{}), - queryPacket: b, - s: s, - onResponse: onResponse, - } - err = _t.sendQuery() - if err != nil { - return - } - s.getNode(node, "").lastSentQuery = time.Now() - _t.mu.Lock() - _t.startTimer() - _t.mu.Unlock() - s.addTransaction(_t) - t = _t - return -} - -// Sends a ping query to the address given. -func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.query(NewAddr(node), "ping", nil, nil) -} - -func (s *Server) announcePeer(node Addr, infoHash string, port int, token string, impliedPort bool) (err error) { - if port == 0 && !impliedPort { - return errors.New("nothing to announce") - } - _, err = s.query(node, "announce_peer", map[string]interface{}{ - "implied_port": func() int { - if impliedPort { - return 1 - } else { - return 0 - } - }(), - "info_hash": infoHash, - "port": port, - "token": token, - }, func(m krpc.Msg) { - if err := m.Error(); err != nil { - announceErrors.Add(1) - // log.Print(token) - // logonce.Stderr.Printf("announce_peer response: %s", err) - return - } - s.numConfirmedAnnounces++ - }) - return -} - -// Add response nodes to node table. -func (s *Server) liftNodes(d krpc.Msg) { - if d.Y != "r" { - return - } - for _, cni := range d.R.Nodes { - if cni.Addr.Port == 0 { - // TODO: Why would people even do this? - continue - } - if s.ipBlocked(cni.Addr.IP) { - continue - } - n := s.getNode(NewAddr(cni.Addr), string(cni.ID[:])) - n.SetIDFromBytes(cni.ID[:]) - } -} - -// Sends a find_node query to addr. targetID is the node we're looking for. -func (s *Server) findNode(addr Addr, targetID string) (t *Transaction, err error) { - t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d krpc.Msg) { - // Scrape peers from the response to put in the server's table before - // handing the response back to the caller. - s.liftNodes(d) - }) - if err != nil { - return - } - return -} - -// Adds bootstrap nodes directly to table, if there's room. Node ID security -// is bypassed, but the IP blocklist is not. -func (s *Server) addRootNodes() error { - addrs, err := bootstrapAddrs(s.bootstrapNodes) - if err != nil { - return err - } - for _, addr := range addrs { - if len(s.nodes) >= maxNodes { - break - } - if s.nodes[addr.String()] != nil { - continue - } - if s.ipBlocked(addr.IP) { - log.Printf("dht root node is in the blocklist: %s", addr.IP) - continue - } - s.nodes[addr.String()] = &node{ - addr: NewAddr(addr), - } - } - return nil -} - -// Populates the node table. -func (s *Server) bootstrap() (err error) { - s.mu.Lock() - defer s.mu.Unlock() - if len(s.nodes) == 0 && !s.config.NoDefaultBootstrap { - err = s.addRootNodes() - } - if err != nil { - return - } - for { - var outstanding sync.WaitGroup - for _, node := range s.nodes { - var t *Transaction - t, err = s.findNode(node.addr, s.id) - if err != nil { - err = fmt.Errorf("error sending find_node: %s", err) - return - } - outstanding.Add(1) - t.SetResponseHandler(func(krpc.Msg, bool) { - outstanding.Done() - }) - } - noOutstanding := make(chan struct{}) - go func() { - outstanding.Wait() - close(noOutstanding) - }() - s.mu.Unlock() - select { - case <-s.closed.LockedChan(&s.mu): - s.mu.Lock() - return - case <-time.After(15 * time.Second): - case <-noOutstanding: - } - s.mu.Lock() - // log.Printf("now have %d nodes", len(s.nodes)) - if s.numGoodNodes() >= 160 { - break - } - } - return -} - -func (s *Server) numGoodNodes() (num int) { - for _, n := range s.nodes { - if n.DefinitelyGood() { - num++ - } - } - return -} - -// Returns how many nodes are in the node table. -func (s *Server) NumNodes() int { - s.mu.Lock() - defer s.mu.Unlock() - return len(s.nodes) -} - -// Exports the current node table. -func (s *Server) Nodes() (nis []krpc.NodeInfo) { - s.mu.Lock() - defer s.mu.Unlock() - for _, node := range s.nodes { - // if !node.Good() { - // continue - // } - ni := krpc.NodeInfo{ - Addr: node.addr.UDPAddr(), - } - if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 { - panic(n) - } - nis = append(nis, ni) - } - return -} - -// Stops the server network activity. This is all that's required to clean-up a Server. -func (s *Server) Close() { - s.mu.Lock() - defer s.mu.Unlock() - s.closed.Set() - s.socket.Close() -} - -func (s *Server) setDefaults() (err error) { - if s.id == "" { - var id [20]byte - h := crypto.SHA1.New() - ss, err := os.Hostname() - if err != nil { - log.Print(err) - } - ss += s.socket.LocalAddr().String() - h.Write([]byte(ss)) - if b := h.Sum(id[:0:20]); len(b) != 20 { - panic(len(b)) - } - if len(id) != 20 { - panic(len(id)) - } - publicIP := func() net.IP { - if s.config.PublicIP != nil { - return s.config.PublicIP - } else { - return missinggo.AddrIP(s.socket.LocalAddr()) - } - }() - SecureNodeId(id[:], publicIP) - s.id = string(id[:]) - } - s.nodes = make(map[string]*node, maxNodes) - return -} - -func (s *Server) getPeers(addr Addr, infoHash string) (t *Transaction, err error) { - if len(infoHash) != 20 { - err = fmt.Errorf("infohash has bad length") - return - } - t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m krpc.Msg) { - s.liftNodes(m) - if m.R != nil && m.R.Token != "" { - s.getNode(addr, m.SenderID()).announceToken = m.R.Token - } - }) - return -} - -func (s *Server) closestGoodNodes(k int, targetID string) []*node { - return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() }) -} - -func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node { - sel := newKClosestNodesSelector(k, target) - idNodes := make(map[string]*node, len(s.nodes)) - for _, node := range s.nodes { - if !filter(node) { - continue - } - sel.Push(node.id) - idNodes[node.idString()] = node - } - ids := sel.IDs() - ret := make([]*node, 0, len(ids)) - for _, id := range ids { - ret = append(ret, idNodes[id.ByteString()]) - } - return ret -} - -func (s *Server) badNode(addr Addr) { - s.badNodes.Add([]byte(addr.String())) - delete(s.nodes, addr.String()) -} diff --git a/dht/tokens.go b/dht/tokens.go deleted file mode 100644 index 9ecb6e3e..00000000 --- a/dht/tokens.go +++ /dev/null @@ -1,54 +0,0 @@ -package dht - -import ( - "crypto/sha1" - "encoding/binary" - "time" - - "github.com/bradfitz/iter" -) - -// Manages creation and validation of tokens issued to querying nodes. -type tokenServer struct { - secret []byte - interval time.Duration - maxIntervalDelta int - timeNow func() time.Time -} - -func (me tokenServer) CreateToken(addr Addr) string { - return me.createToken(addr, me.getTimeNow()) -} - -func (me tokenServer) createToken(addr Addr, t time.Time) string { - h := sha1.New() - ip := addr.UDPAddr().IP.To16() - if len(ip) != 16 { - panic(ip) - } - h.Write(ip) - ti := t.UnixNano() / int64(me.interval) - var b [8]byte - binary.BigEndian.PutUint64(b[:], uint64(ti)) - h.Write(b[:]) - h.Write(me.secret) - return string(h.Sum(nil)) -} - -func (me *tokenServer) ValidToken(token string, addr Addr) bool { - t := me.getTimeNow() - for range iter.N(me.maxIntervalDelta + 1) { - if me.createToken(addr, t) == token { - return true - } - t = t.Add(-me.interval) - } - return false -} - -func (me *tokenServer) getTimeNow() time.Time { - if me.timeNow == nil { - return time.Now() - } - return me.timeNow() -} diff --git a/dht/tokens_test.go b/dht/tokens_test.go deleted file mode 100644 index ae19e663..00000000 --- a/dht/tokens_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package dht - -import ( - "net" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestTokenServer(t *testing.T) { - addr1 := NewAddr(&net.UDPAddr{ - IP: []byte{1, 2, 3, 4}, - }) - addr2 := NewAddr(&net.UDPAddr{ - IP: []byte{1, 2, 3, 3}, - }) - ts := tokenServer{ - secret: []byte("42"), - interval: 5 * time.Minute, - maxIntervalDelta: 2, - } - tok := ts.CreateToken(addr1) - assert.Len(t, tok, 20) - assert.True(t, ts.ValidToken(tok, addr1)) - assert.False(t, ts.ValidToken(tok[1:], addr1)) - assert.False(t, ts.ValidToken(tok, addr2)) - func() { - ts0 := ts - ts0.secret = nil - assert.False(t, ts0.ValidToken(tok, addr1)) - }() - now := time.Now() - setTime := func(t time.Time) { - ts.timeNow = func() time.Time { - return t - } - } - setTime(now) - tok = ts.CreateToken(addr1) - assert.True(t, ts.ValidToken(tok, addr1)) - setTime(time.Time{}) - assert.False(t, ts.ValidToken(tok, addr1)) - setTime(now.Add(-5 * time.Minute)) - assert.False(t, ts.ValidToken(tok, addr1)) - setTime(now) - assert.True(t, ts.ValidToken(tok, addr1)) - setTime(now.Add(5 * time.Minute)) - assert.True(t, ts.ValidToken(tok, addr1)) - setTime(now.Add(2 * 5 * time.Minute)) - assert.True(t, ts.ValidToken(tok, addr1)) - setTime(now.Add(3 * 5 * time.Minute)) - assert.False(t, ts.ValidToken(tok, addr1)) -} diff --git a/dht/transaction.go b/dht/transaction.go deleted file mode 100644 index 3698a262..00000000 --- a/dht/transaction.go +++ /dev/null @@ -1,150 +0,0 @@ -package dht - -import ( - "sync" - "time" - - "github.com/anacrolix/torrent/dht/krpc" -) - -// Transaction keeps track of a message exchange between nodes, such as a -// query message and a response message. -type Transaction struct { - mu sync.Mutex - remoteAddr Addr - t string - response chan krpc.Msg - onResponse func(krpc.Msg) // Called with the server locked. - done chan struct{} - queryPacket []byte - timer *time.Timer - s *Server - retries int - lastSend time.Time - userOnResponse func(krpc.Msg, bool) -} - -// SetResponseHandler sets up a function to be called when the query response -// is available. -func (t *Transaction) SetResponseHandler(f func(krpc.Msg, bool)) { - t.mu.Lock() - defer t.mu.Unlock() - t.userOnResponse = f - t.tryHandleResponse() -} - -func (t *Transaction) tryHandleResponse() { - if t.userOnResponse == nil { - return - } - select { - case r, ok := <-t.response: - t.userOnResponse(r, ok) - // Shouldn't be called more than once. - t.userOnResponse = nil - default: - } -} - -func (t *Transaction) key() transactionKey { - return transactionKey{ - t.remoteAddr.String(), - t.t, - } -} - -func (t *Transaction) startTimer() { - t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback) -} - -func (t *Transaction) timerCallback() { - t.mu.Lock() - defer t.mu.Unlock() - select { - case <-t.done: - return - default: - } - if t.retries == 2 { - t.timeout() - return - } - t.retries++ - t.sendQuery() - if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) { - panic("timer should have fired to get here") - } -} - -func (t *Transaction) sendQuery() error { - err := t.s.writeToNode(t.queryPacket, t.remoteAddr) - if err != nil { - return err - } - t.lastSend = time.Now() - return nil -} - -func (t *Transaction) timeout() { - go func() { - t.s.mu.Lock() - defer t.s.mu.Unlock() - t.s.nodeTimedOut(t.remoteAddr) - }() - t.close() -} - -func (t *Transaction) close() { - if t.closing() { - return - } - t.queryPacket = nil - close(t.response) - t.tryHandleResponse() - close(t.done) - t.timer.Stop() - go func() { - t.s.mu.Lock() - defer t.s.mu.Unlock() - t.s.deleteTransaction(t) - }() -} - -func (t *Transaction) closing() bool { - select { - case <-t.done: - return true - default: - return false - } -} - -// Close (abandon) the transaction. -func (t *Transaction) Close() { - t.mu.Lock() - defer t.mu.Unlock() - t.close() -} - -func (t *Transaction) handleResponse(m krpc.Msg) { - t.mu.Lock() - if t.closing() { - t.mu.Unlock() - return - } - close(t.done) - t.mu.Unlock() - if t.onResponse != nil { - t.s.mu.Lock() - t.onResponse(m) - t.s.mu.Unlock() - } - t.queryPacket = nil - select { - case t.response <- m: - default: - panic("blocked handling response") - } - close(t.response) - t.tryHandleResponse() -} diff --git a/torrent.go b/torrent.go index 2b9e4531..5f5f77fb 100644 --- a/torrent.go +++ b/torrent.go @@ -23,7 +23,7 @@ import ( "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/dht" + "github.com/anacrolix/dht" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" -- 2.48.1