"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/dht"
+ "github.com/anacrolix/torrent/dht/krpc"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
continue
}
- ni := dht.NodeInfo{
- Addr: dht.NewAddr(&net.UDPAddr{
+ ni := krpc.NodeInfo{
+ Addr: &net.UDPAddr{
IP: ip,
Port: hmp.Port,
- }),
+ },
}
cl.DHT().AddNode(ni)
}
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent/dht"
+ "github.com/anacrolix/torrent/dht/krpc"
)
var (
defer f.Close()
added := 0
for {
- b := make([]byte, dht.CompactIPv4NodeInfoLen)
+ b := make([]byte, krpc.CompactIPv4NodeInfoLen)
_, err := io.ReadFull(f, b)
if err == io.EOF {
break
if err != nil {
return fmt.Errorf("error reading table file: %s", err)
}
- var ni dht.NodeInfo
+ var ni krpc.NodeInfo
err = ni.UnmarshalCompactIPv4(b)
if err != nil {
return fmt.Errorf("error unmarshaling compact node info: %s", err)
}
defer f.Close()
for _, nodeInfo := range goodNodes {
- var b [dht.CompactIPv4NodeInfoLen]byte
+ var b [krpc.CompactIPv4NodeInfoLen]byte
err := nodeInfo.PutCompact(b[:])
if err != nil {
return fmt.Errorf("error compacting node info: %s", err)
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/dht"
+ "github.com/anacrolix/torrent/dht/krpc"
)
func main() {
type pong struct {
addr string
- krpc dht.Msg
+ krpc krpc.Msg
msgOk bool
rtt time.Duration
}
log.Fatal(err)
}
start := time.Now()
- t.SetResponseHandler(func(addr string) func(dht.Msg, bool) {
- return func(resp dht.Msg, ok bool) {
+ t.SetResponseHandler(func(addr string) func(krpc.Msg, bool) {
+ return func(resp krpc.Msg, ok bool) {
pongChan <- pong{
addr: addr,
krpc: resp,
"os/signal"
"github.com/anacrolix/torrent/dht"
+ "github.com/anacrolix/torrent/dht/krpc"
)
var (
defer f.Close()
added := 0
for {
- b := make([]byte, dht.CompactIPv4NodeInfoLen)
+ b := make([]byte, krpc.CompactIPv4NodeInfoLen)
_, err := io.ReadFull(f, b)
if err == io.EOF {
break
if err != nil {
return fmt.Errorf("error reading table file: %s", err)
}
- var ni dht.NodeInfo
+ var ni krpc.NodeInfo
err = ni.UnmarshalCompactIPv4(b)
if err != nil {
return fmt.Errorf("error unmarshaling compact node info: %s", err)
}
defer f.Close()
for _, nodeInfo := range goodNodes {
- var b [dht.CompactIPv4NodeInfoLen]byte
+ var b [krpc.CompactIPv4NodeInfoLen]byte
err := nodeInfo.PutCompact(b[:])
if err != nil {
return fmt.Errorf("error compacting node info: %s", err)
import "net"
-// Used internally to refer to node network addresses.
+// Used internally to refer to node network addresses. String() is called a
+// lot, and so can be optimized. Network() is not exposed, so that the
+// interface does not satisfy net.Addr, as the underlying type must be passed
+// to any OS-level function that take net.Addr.
type Addr interface {
UDPAddr() *net.UDPAddr
String() string
"github.com/anacrolix/sync"
"github.com/willf/bloom"
+ "github.com/anacrolix/torrent/dht/krpc"
"github.com/anacrolix/torrent/logonce"
)
a.maybeClose()
}
-func (a *Announce) responseNode(node NodeInfo) {
- a.gotNodeAddr(node.Addr)
+func (a *Announce) responseNode(node krpc.NodeInfo) {
+ a.gotNodeAddr(NewAddr(node.Addr))
}
func (a *Announce) closingCh() chan struct{} {
if err != nil {
return err
}
- t.SetResponseHandler(func(m Msg, ok bool) {
+ t.SetResponseHandler(func(m krpc.Msg, ok bool) {
// Register suggested nodes closer to the target info-hash.
if m.R != nil {
a.mu.Lock()
a.mu.Unlock()
if vs := m.R.Values; len(vs) != 0 {
- nodeInfo := NodeInfo{
- Addr: t.remoteAddr,
+ nodeInfo := krpc.NodeInfo{
+ Addr: t.remoteAddr.UDPAddr(),
}
copy(nodeInfo.ID[:], m.SenderID())
select {
// peers that a node has reported as being in the swarm for a queried info
// hash.
type PeersValues struct {
- Peers []Peer // Peers given in get_peers response.
- NodeInfo // The node that gave the response.
+ Peers []Peer // Peers given in get_peers response.
+ krpc.NodeInfo // The node that gave the response.
}
// Stop the announce.
"strconv"
"time"
+ "github.com/anacrolix/torrent/dht/krpc"
"github.com/anacrolix/torrent/iplist"
)
// Used to secure the server's ID. Defaults to the Conn's LocalAddr().
PublicIP net.IP
- OnQuery func(*Msg, net.Addr) bool
+ OnQuery func(*krpc.Msg, net.Addr) bool
}
// ServerStats instance is returned by Server.Stats() and stores Server metrics
return n.id.i.Int64() == 0
}
-func (n *node) NodeInfo() (ret NodeInfo) {
- ret.Addr = n.addr
+func (n *node) NodeInfo() (ret krpc.NodeInfo) {
+ ret.Addr = n.addr.UDPAddr()
if n := copy(ret.ID[:], n.idString()); n != 20 {
panic(n)
}
_ "github.com/anacrolix/envpprof"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "github.com/anacrolix/torrent/dht/krpc"
)
func TestSetNilBigInt(t *testing.T) {
}
func TestMarshalCompactNodeInfo(t *testing.T) {
- cni := NodeInfo{
+ cni := krpc.NodeInfo{
ID: [20]byte{'a', 'b', 'c'},
}
addr, err := net.ResolveUDPAddr("udp4", "1.2.3.4:5")
require.NoError(t, err)
- cni.Addr = NewAddr(addr)
- var b [CompactIPv4NodeInfoLen]byte
+ cni.Addr = addr
+ var b [krpc.CompactIPv4NodeInfoLen]byte
err = cni.PutCompact(b[:])
require.NoError(t, err)
var bb [26]byte
require.NoError(t, err)
defer tn.Close()
ok := make(chan bool)
- tn.SetResponseHandler(func(msg Msg, msgOk bool) {
+ tn.SetResponseHandler(func(msg krpc.Msg, msgOk bool) {
ok <- msg.SenderID() == srv0.ID()
})
if !<-ok {
}
func TestEqualPointers(t *testing.T) {
- assert.EqualValues(t, &Msg{R: &Return{}}, &Msg{R: &Return{}})
+ assert.EqualValues(t, &krpc.Msg{R: &krpc.Return{}}, &krpc.Msg{R: &krpc.Return{}})
}
func TestHook(t *testing.T) {
srv0, err := NewServer(&ServerConfig{
Addr: "127.0.0.1:5679",
BootstrapNodes: []string{"127.0.0.1:5678"},
- OnQuery: func(m *Msg, addr net.Addr) bool {
+ OnQuery: func(m *krpc.Msg, addr net.Addr) bool {
if m.Q == "ping" {
hookCalled <- true
}
assert.NoError(t, err)
defer tn.Close()
// Await response from hooked server
- tn.SetResponseHandler(func(msg Msg, b bool) {
+ tn.SetResponseHandler(func(msg krpc.Msg, b bool) {
t.Log("TestHook: Sender received response from pinged hook server, so normal execution resumed.")
})
// Await signal that hook has been called.
-package dht
+package krpc
import (
"bytes"
err = errors.New("nil addr in node info")
return
}
- buf.Write(ni.Addr.UDPAddr().IP.To4())
- binary.Write(&buf, binary.BigEndian, uint16(ni.Addr.UDPAddr().Port))
+ buf.Write(ni.Addr.IP.To4())
+ binary.Write(&buf, binary.BigEndian, uint16(ni.Addr.Port))
}
return bencode.Marshal(buf.Bytes())
}
-package dht
+package krpc
import (
"fmt"
-package dht
+package krpc
import (
"fmt"
-package dht
+package krpc
import (
"net"
R: &Return{
Nodes: CompactIPv4NodeInfo{
NodeInfo{
- Addr: NewAddr(&net.UDPAddr{
+ Addr: &net.UDPAddr{
IP: net.IPv4(1, 2, 3, 4).To4(),
Port: 0x1234,
- }),
+ },
},
},
},
-package dht
+package krpc
import (
"encoding/binary"
type NodeInfo struct {
ID [20]byte
- Addr Addr
+ Addr *net.UDPAddr
}
// Writes the node info to its compact binary representation in b. See
if n := copy(b[:], ni.ID[:]); n != 20 {
panic(n)
}
- ip := ni.Addr.UDPAddr().IP.To4()
+ ip := ni.Addr.IP.To4()
if len(ip) != 4 {
return errors.New("expected ipv4 address")
}
if n := copy(b[20:], ip); n != 4 {
panic(n)
}
- binary.BigEndian.PutUint16(b[24:], uint16(ni.Addr.UDPAddr().Port))
+ binary.BigEndian.PutUint16(b[24:], uint16(ni.Addr.Port))
return nil
}
return errors.New("expected 26 bytes")
}
missinggo.CopyExact(ni.ID[:], b[:20])
- ni.Addr = NewAddr(&net.UDPAddr{
+ ni.Addr = &net.UDPAddr{
IP: append(make([]byte, 0, 4), b[20:24]...),
Port: int(binary.BigEndian.Uint16(b[24:26])),
- })
+ }
return nil
}
"github.com/tylertreat/BoomFilters"
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/dht/krpc"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/logonce"
)
readNotKRPCDict.Add(1)
return
}
- var d Msg
+ var d krpc.Msg
err := bencode.Unmarshal(b, &d)
if err != nil {
readUnmarshalError.Add(1)
}
// Adds directly to the node table.
-func (s *Server) AddNode(ni NodeInfo) {
+func (s *Server) AddNode(ni krpc.NodeInfo) {
s.mu.Lock()
defer s.mu.Unlock()
if s.nodes == nil {
s.nodes = make(map[string]*node)
}
- s.getNode(ni.Addr, string(ni.ID[:]))
+ s.getNode(NewAddr(ni.Addr), string(ni.ID[:]))
}
func (s *Server) nodeByID(id string) *node {
return nil
}
-func (s *Server) handleQuery(source Addr, m Msg) {
+func (s *Server) handleQuery(source Addr, m krpc.Msg) {
node := s.getNode(source, m.SenderID())
node.lastGotQuery = time.Now()
if s.config.OnQuery != nil {
args := m.A
switch m.Q {
case "ping":
- s.reply(source, m.T, Return{})
+ s.reply(source, m.T, krpc.Return{})
case "get_peers": // TODO: Extract common behaviour with find_node.
targetID := args.InfoHash
if len(targetID) != 20 {
break
}
- var rNodes []NodeInfo
+ var rNodes []krpc.NodeInfo
// TODO: Reply with "values" list if we have peers instead.
for _, node := range s.closestGoodNodes(8, targetID) {
rNodes = append(rNodes, node.NodeInfo())
}
- s.reply(source, m.T, Return{
+ s.reply(source, m.T, krpc.Return{
Nodes: rNodes,
// TODO: Generate this dynamically, and store it for the source.
Token: "hi",
log.Printf("bad DHT query: %v", m)
return
}
- var rNodes []NodeInfo
+ var rNodes []krpc.NodeInfo
if node := s.nodeByID(targetID); node != nil {
rNodes = append(rNodes, node.NodeInfo())
} else {
rNodes = append(rNodes, node.NodeInfo())
}
}
- s.reply(source, m.T, Return{
+ s.reply(source, m.T, krpc.Return{
Nodes: rNodes,
})
case "announce_peer":
}
}
-func (s *Server) reply(addr Addr, t string, r Return) {
+func (s *Server) reply(addr Addr, t string, r krpc.Return) {
r.ID = s.ID()
- m := Msg{
+ m := krpc.Msg{
T: t,
Y: "r",
R: &r,
return s.id
}
-func (s *Server) query(node Addr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
+func (s *Server) query(node Addr, q string, a map[string]interface{}, onResponse func(krpc.Msg)) (t *Transaction, err error) {
tid := s.nextTransactionID()
if a == nil {
a = make(map[string]interface{}, 1)
_t := &Transaction{
remoteAddr: node,
t: tid,
- response: make(chan Msg, 1),
+ response: make(chan krpc.Msg, 1),
done: make(chan struct{}),
queryPacket: b,
s: s,
"info_hash": infoHash,
"port": port,
"token": token,
- }, func(m Msg) {
+ }, func(m krpc.Msg) {
if err := m.Error(); err != nil {
announceErrors.Add(1)
// log.Print(token)
}
// Add response nodes to node table.
-func (s *Server) liftNodes(d Msg) {
+func (s *Server) liftNodes(d krpc.Msg) {
if d.Y != "r" {
return
}
for _, cni := range d.R.Nodes {
- if cni.Addr.UDPAddr().Port == 0 {
+ if cni.Addr.Port == 0 {
// TODO: Why would people even do this?
continue
}
- if s.ipBlocked(cni.Addr.UDPAddr().IP) {
+ if s.ipBlocked(cni.Addr.IP) {
continue
}
- n := s.getNode(cni.Addr, string(cni.ID[:]))
+ n := s.getNode(NewAddr(cni.Addr), string(cni.ID[:]))
n.SetIDFromBytes(cni.ID[:])
}
}
// Sends a find_node query to addr. targetID is the node we're looking for.
func (s *Server) findNode(addr Addr, targetID string) (t *Transaction, err error) {
- t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
+ t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d krpc.Msg) {
// Scrape peers from the response to put in the server's table before
// handing the response back to the caller.
s.liftNodes(d)
return
}
outstanding.Add(1)
- t.SetResponseHandler(func(Msg, bool) {
+ t.SetResponseHandler(func(krpc.Msg, bool) {
outstanding.Done()
})
}
}
// Exports the current node table.
-func (s *Server) Nodes() (nis []NodeInfo) {
+func (s *Server) Nodes() (nis []krpc.NodeInfo) {
s.mu.Lock()
defer s.mu.Unlock()
for _, node := range s.nodes {
// if !node.Good() {
// continue
// }
- ni := NodeInfo{
- Addr: node.addr,
+ ni := krpc.NodeInfo{
+ Addr: node.addr.UDPAddr(),
}
if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
panic(n)
err = fmt.Errorf("infohash has bad length")
return
}
- t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
+ t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m krpc.Msg) {
s.liftNodes(m)
if m.R != nil && m.R.Token != "" {
s.getNode(addr, m.SenderID()).announceToken = m.R.Token
import (
"sync"
"time"
+
+ "github.com/anacrolix/torrent/dht/krpc"
)
// Transaction keeps track of a message exchange between nodes, such as a
mu sync.Mutex
remoteAddr Addr
t string
- response chan Msg
- onResponse func(Msg) // Called with the server locked.
+ response chan krpc.Msg
+ onResponse func(krpc.Msg) // Called with the server locked.
done chan struct{}
queryPacket []byte
timer *time.Timer
s *Server
retries int
lastSend time.Time
- userOnResponse func(Msg, bool)
+ userOnResponse func(krpc.Msg, bool)
}
// SetResponseHandler sets up a function to be called when the query response
// is available.
-func (t *Transaction) SetResponseHandler(f func(Msg, bool)) {
+func (t *Transaction) SetResponseHandler(f func(krpc.Msg, bool)) {
t.mu.Lock()
defer t.mu.Unlock()
t.userOnResponse = f
t.close()
}
-func (t *Transaction) handleResponse(m Msg) {
+func (t *Transaction) handleResponse(m krpc.Msg) {
t.mu.Lock()
if t.closing() {
t.mu.Unlock()