"github.com/anacrolix/torrent/util"
)
-type peerDiscovery struct {
- *peerStream
+// Maintains state for an ongoing Announce operation. An Announce is started
+// by calling Server.Announce.
+type Announce struct {
+ mu sync.Mutex
+ Peers chan PeersValues
+ // Inner chan is set to nil when on close.
+ values chan PeersValues
+ stop chan struct{}
triedAddrs *bloom.BloomFilter
pending int
server *Server
announcePortImplied bool
}
-func (pd *peerDiscovery) NumContacted() int {
- pd.mu.Lock()
- defer pd.mu.Unlock()
- return pd.numContacted
+// Returns the number of distinct remote addresses the announce has queried.
+func (me *Announce) NumContacted() int {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ return me.numContacted
}
-func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) {
+// This is kind of the main thing you want to do with DHT. It traverses the
+// graph toward nodes that store peers for the infohash, streaming them to the
+// caller, and announcing the local node to each node if allowed and
+// specified.
+func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announce, error) {
s.mu.Lock()
startAddrs := func() (ret []dHTAddr) {
for _, n := range s.closestGoodNodes(160, infoHash) {
startAddrs = append(startAddrs, newDHTAddr(addr))
}
}
- disc := &peerDiscovery{
- peerStream: &peerStream{
- Values: make(chan peerStreamValue, 100),
- stop: make(chan struct{}),
- values: make(chan peerStreamValue),
- },
+ disc := &Announce{
+ Peers: make(chan PeersValues, 100),
+ stop: make(chan struct{}),
+ values: make(chan PeersValues),
triedAddrs: bloom.NewWithEstimates(1000, 0.5),
server: s,
infoHash: infoHash,
}
// Function ferries from values to Values until discovery is halted.
go func() {
- defer close(disc.Values)
+ defer close(disc.Peers)
for {
select {
case psv := <-disc.values:
select {
- case disc.Values <- psv:
+ case disc.Peers <- psv:
case <-disc.stop:
return
}
return disc, nil
}
-func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
+func (me *Announce) gotNodeAddr(addr dHTAddr) {
if util.AddrPort(addr) == 0 {
// Not a contactable address.
return
me.contact(addr)
}
-func (me *peerDiscovery) contact(addr dHTAddr) {
+func (me *Announce) contact(addr dHTAddr) {
me.numContacted++
me.triedAddrs.Add([]byte(addr.String()))
if err := me.getPeers(addr); err != nil {
me.pending++
}
-func (me *peerDiscovery) transactionClosed() {
+func (me *Announce) transactionClosed() {
me.pending--
if me.pending == 0 {
me.close()
}
}
-func (me *peerDiscovery) responseNode(node NodeInfo) {
+func (me *Announce) responseNode(node NodeInfo) {
me.gotNodeAddr(node.Addr)
}
-func (me *peerDiscovery) closingCh() chan struct{} {
- return me.peerStream.stop
+func (me *Announce) closingCh() chan struct{} {
+ return me.stop
}
-func (me *peerDiscovery) announcePeer(to dHTAddr, token string) {
+func (me *Announce) announcePeer(to dHTAddr, token string) {
me.server.mu.Lock()
err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
me.server.mu.Unlock()
}
}
-func (me *peerDiscovery) getPeers(addr dHTAddr) error {
+func (me *Announce) getPeers(addr dHTAddr) error {
me.server.mu.Lock()
defer me.server.mu.Unlock()
t, err := me.server.getPeers(addr, me.infoHash)
}
copy(nodeInfo.ID[:], m.ID())
select {
- case me.peerStream.values <- peerStreamValue{
+ case me.values <- PeersValues{
Peers: vs,
NodeInfo: nodeInfo,
}:
- case <-me.peerStream.stop:
+ case <-me.stop:
}
}
return nil
}
-type peerStreamValue struct {
+// Corresponds to the "values" key in a get_peers KRPC response. A list of
+// peers that a node has reported as being in the swarm for a queried info
+// hash.
+type PeersValues struct {
Peers []util.CompactPeer // Peers given in get_peers response.
NodeInfo // The node that gave the response.
}
-// TODO: This was to be the shared publicly accessible part returned by DHT
-// functions that stream peers. Possibly not necessary anymore.
-type peerStream struct {
- mu sync.Mutex
- Values chan peerStreamValue
- // Inner chan is set to nil when on close.
- values chan peerStreamValue
- stop chan struct{}
-}
-
-func (ps *peerStream) Close() {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- ps.close()
+// Stop the announce.
+func (me *Announce) Close() {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ me.close()
}
-func (ps *peerStream) close() {
+func (ps *Announce) close() {
select {
case <-ps.stop:
default:
+// Package DHT implements a DHT for use with the BitTorrent protocol,
+// described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html.
+//
+// Standard use involves creating a NewServer, and calling Announce on it with
+// the details of your local torrent client and infohash of interest.
package dht
import (
type Server struct {
id string
socket net.PacketConn
- transactions map[transactionKey]*transaction
+ transactions map[transactionKey]*Transaction
transactionIDInt uint64
- nodes map[string]*Node // Keyed by dHTAddr.String().
+ nodes map[string]*node // Keyed by dHTAddr.String().
mu sync.Mutex
closed chan struct{}
passive bool // Don't respond to queries.
ipBlockList *iplist.IPList
- NumConfirmedAnnounces int
+ numConfirmedAnnounces int
}
type dHTAddr interface {
}
type ServerConfig struct {
- Addr string
- Conn net.PacketConn
- Passive bool // Don't respond to queries.
-}
-
-type serverStats struct {
- NumGoodNodes int
- NumNodes int
- NumOutstandingTransactions int
-}
-
-func (s *Server) Stats() (ss serverStats) {
+ Addr string // Listen address. Used if Conn is nil.
+ Conn net.PacketConn
+ // Don't respond to queries from other nodes.
+ Passive bool
+}
+
+type ServerStats struct {
+ // Count of nodes in the node table that responded to our last query or
+ // haven't yet been queried.
+ GoodNodes int
+ // Count of nodes in the node table.
+ Nodes int
+ // Transactions awaiting a response.
+ OutstandingTransactions int
+ // Individual announce_peer requests that got a success response.
+ ConfirmedAnnounces int
+}
+
+// Returns statistics for the server.
+func (s *Server) Stats() (ss ServerStats) {
s.mu.Lock()
defer s.mu.Unlock()
for _, n := range s.nodes {
if n.DefinitelyGood() {
- ss.NumGoodNodes++
+ ss.GoodNodes++
}
}
- ss.NumNodes = len(s.nodes)
- ss.NumOutstandingTransactions = len(s.transactions)
+ ss.Nodes = len(s.nodes)
+ ss.OutstandingTransactions = len(s.transactions)
+ ss.ConfirmedAnnounces = s.numConfirmedAnnounces
return
}
-func (s *Server) LocalAddr() net.Addr {
+// Returns the listen address for the server. Packets arriving to this address
+// are processed by the server (unless aliens are involved).
+func (s *Server) Addr() net.Addr {
return s.socket.LocalAddr()
}
return
}
+// Create a new DHT server.
func NewServer(c *ServerConfig) (s *Server, err error) {
if c == nil {
c = &ServerConfig{}
return
}
+// Returns a description of the Server. Python repr-style.
func (s *Server) String() string {
return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
}
return string(nid.i.Bytes())
}
-type Node struct {
+type node struct {
addr dHTAddr
id nodeID
announceToken string
lastSentQuery time.Time
}
-func (n *Node) idString() string {
+func (n *node) idString() string {
return n.id.String()
}
-func (n *Node) SetIDFromBytes(b []byte) {
+func (n *node) SetIDFromBytes(b []byte) {
n.id.i.SetBytes(b)
n.id.set = true
}
-func (n *Node) SetIDFromString(s string) {
+func (n *node) SetIDFromString(s string) {
n.id.i.SetBytes([]byte(s))
}
-func (n *Node) IDNotSet() bool {
+func (n *node) IDNotSet() bool {
return n.id.i.Int64() == 0
}
-func (n *Node) NodeInfo() (ret NodeInfo) {
+func (n *node) NodeInfo() (ret NodeInfo) {
ret.Addr = n.addr
if n := copy(ret.ID[:], n.idString()); n != 20 {
panic(n)
return
}
-func (n *Node) DefinitelyGood() bool {
+func (n *node) DefinitelyGood() bool {
if len(n.idString()) != 20 {
return false
}
return true
}
+// A wrapper around the unmarshalled KRPC dict that constitutes messages in
+// the DHT. There are various helpers for extracting common data from the
+// message. In normal use, Msg is abstracted away for you, but it can be of
+// interest.
type Msg map[string]interface{}
var _ fmt.Stringer = Msg{}
return
}
-type transaction struct {
+type Transaction struct {
mu sync.Mutex
remoteAddr dHTAddr
t string
userOnResponse func(Msg)
}
-func (t *transaction) SetResponseHandler(f func(Msg)) {
+// Set a function to be called with the response.
+func (t *Transaction) SetResponseHandler(f func(Msg)) {
t.mu.Lock()
defer t.mu.Unlock()
t.userOnResponse = f
t.tryHandleResponse()
}
-func (t *transaction) tryHandleResponse() {
+func (t *Transaction) tryHandleResponse() {
if t.userOnResponse == nil {
return
}
}
}
-func (t *transaction) Key() transactionKey {
+func (t *Transaction) key() transactionKey {
return transactionKey{
t.remoteAddr.String(),
t.t,
return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
}
-func (t *transaction) startTimer() {
+func (t *Transaction) startTimer() {
t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
}
-func (t *transaction) timerCallback() {
+func (t *Transaction) timerCallback() {
t.mu.Lock()
defer t.mu.Unlock()
select {
}
}
-func (t *transaction) sendQuery() error {
+func (t *Transaction) sendQuery() error {
err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
if err != nil {
return err
return nil
}
-func (t *transaction) timeout() {
+func (t *Transaction) timeout() {
go func() {
t.s.mu.Lock()
defer t.s.mu.Unlock()
t.close()
}
-func (t *transaction) close() {
+func (t *Transaction) close() {
if t.closing() {
return
}
}()
}
-func (t *transaction) closing() bool {
+func (t *Transaction) closing() bool {
select {
case <-t.done:
return true
}
}
-func (t *transaction) Close() {
+// Abandon the transaction.
+func (t *Transaction) Close() {
t.mu.Lock()
defer t.mu.Unlock()
t.close()
}
-func (t *transaction) handleResponse(m Msg) {
+func (t *Transaction) handleResponse(m Msg) {
t.mu.Lock()
if t.closing() {
t.mu.Unlock()
}
s.id = string(id[:])
}
- s.nodes = make(map[string]*Node, 10000)
+ s.nodes = make(map[string]*node, 10000)
return
}
+// Packets to and from any address matching a range in the list are dropped.
func (s *Server) SetIPBlockList(list *iplist.IPList) {
s.mu.Lock()
defer s.mu.Unlock()
return
}
s.closed = make(chan struct{})
- s.transactions = make(map[transactionKey]*transaction)
+ s.transactions = make(map[transactionKey]*Transaction)
return
}
return s.ipBlockList.Lookup(ip) != nil
}
+// Adds directly to the node table.
func (s *Server) AddNode(ni NodeInfo) {
s.mu.Lock()
defer s.mu.Unlock()
if s.nodes == nil {
- s.nodes = make(map[string]*Node)
+ s.nodes = make(map[string]*node)
}
n := s.getNode(ni.Addr)
if n.IDNotSet() {
}
}
-func (s *Server) nodeByID(id string) *Node {
+func (s *Server) nodeByID(id string) *node {
for _, node := range s.nodes {
if node.idString() == id {
return node
if r == nil {
r = make(map[string]interface{}, 1)
}
- r["id"] = s.IDString()
+ r["id"] = s.ID()
m := map[string]interface{}{
"t": t,
"y": "r",
}
}
-func (s *Server) getNode(addr dHTAddr) (n *Node) {
+func (s *Server) getNode(addr dHTAddr) (n *node) {
addrStr := addr.String()
n = s.nodes[addrStr]
if n == nil {
- n = &Node{
+ n = &node{
addr: addr,
}
if len(s.nodes) < maxNodes {
return
}
-func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *transaction {
+func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction {
return s.transactions[transactionKey{
sourceNode.String(),
transactionID}]
return string(b[:n])
}
-func (s *Server) deleteTransaction(t *transaction) {
- delete(s.transactions, t.Key())
+func (s *Server) deleteTransaction(t *Transaction) {
+ delete(s.transactions, t.key())
}
-func (s *Server) addTransaction(t *transaction) {
- if _, ok := s.transactions[t.Key()]; ok {
+func (s *Server) addTransaction(t *Transaction) {
+ if _, ok := s.transactions[t.key()]; ok {
panic("transaction not unique")
}
- s.transactions[t.Key()] = t
+ s.transactions[t.key()] = t
}
-func (s *Server) IDString() string {
+// Returns the 20-byte server ID. This is the ID used to communicate with the
+// DHT network.
+func (s *Server) ID() string {
if len(s.id) != 20 {
panic("bad node id")
}
return s.id
}
-func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *transaction, err error) {
+func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
tid := s.nextTransactionID()
if a == nil {
a = make(map[string]interface{}, 1)
}
- a["id"] = s.IDString()
+ a["id"] = s.ID()
d := map[string]interface{}{
"t": tid,
"y": "q",
if err != nil {
return
}
- t = &transaction{
+ t = &Transaction{
remoteAddr: node,
t: tid,
response: make(chan Msg, 1),
return
}
+// The size in bytes of a NodeInfo in its compact binary representation.
const CompactNodeInfoLen = 26
type NodeInfo struct {
Addr dHTAddr
}
+// Writes the node info to its compact binary representation in b. See
+// CompactNodeInfoLen.
func (ni *NodeInfo) PutCompact(b []byte) error {
if n := copy(b[:], ni.ID[:]); n != 20 {
panic(n)
return nil
}
-func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) {
+// Sends a ping query to the address given.
+func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.query(newDHTAddr(node), "ping", nil, nil)
logonce.Stderr.Printf("announce_peer response: %s", err)
return
}
- s.NumConfirmedAnnounces++
+ s.numConfirmedAnnounces++
})
return
}
}
// Sends a find_node query to addr. targetID is the node we're looking for.
-func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err error) {
+func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) {
t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
// Scrape peers from the response to put in the server's table before
// handing the response back to the caller.
return
}
-func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err error) {
+func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) {
if len(infoHash) != 20 {
err = fmt.Errorf("infohash has bad length")
return
return err
}
for _, addr := range addrs {
- s.nodes[addr.String()] = &Node{
+ s.nodes[addr.String()] = &node{
addr: newDHTAddr(addr),
}
}
for {
var outstanding sync.WaitGroup
for _, node := range s.nodes {
- var t *transaction
+ var t *Transaction
t, err = s.findNode(node.addr, s.id)
if err != nil {
err = fmt.Errorf("error sending find_node: %s", err)
return
}
+// Returns how many nodes are in the node table.
func (s *Server) NumNodes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.nodes)
}
+// Exports the current node table.
func (s *Server) Nodes() (nis []NodeInfo) {
s.mu.Lock()
defer s.mu.Unlock()
return
}
+// Stops the server network activity. This is all that's required to clean-up a Server.
func (s *Server) Close() {
s.mu.Lock()
select {
maxDistance.SetBit(&zero, 160, 1)
}
-func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
- return s.closestNodes(k, nodeIDFromString(targetID), func(n *Node) bool { return n.DefinitelyGood() })
+func (s *Server) closestGoodNodes(k int, targetID string) []*node {
+ return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() })
}
-func (s *Server) closestNodes(k int, target nodeID, filter func(*Node) bool) []*Node {
+func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node {
sel := newKClosestNodesSelector(k, target)
- idNodes := make(map[string]*Node, len(s.nodes))
+ idNodes := make(map[string]*node, len(s.nodes))
for _, node := range s.nodes {
if !filter(node) {
continue
idNodes[node.idString()] = node
}
ids := sel.IDs()
- ret := make([]*Node, 0, len(ids))
+ ret := make([]*node, 0, len(ids))
for _, id := range ids {
ret = append(ret, idNodes[id.String()])
}