"sort"
"strconv"
"sync"
+
+ "github.com/anacrolix/missinggo"
)
func is_empty_value(v reflect.Value) bool {
- switch v.Kind() {
- case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
- return v.Len() == 0
- case reflect.Bool:
- return !v.Bool()
- case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
- return v.Int() == 0
- case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
- return v.Uint() == 0
- case reflect.Float32, reflect.Float64:
- return v.Float() == 0
- case reflect.Interface, reflect.Ptr:
- return v.IsNil()
- }
- return false
+ return missinggo.IsEmptyValue(v)
}
type encoder struct {
)
type pingResponse struct {
- addr string
- krpc dht.Msg
- rtt time.Duration
+ addr string
+ krpc dht.Msg
+ msgOk bool
+ rtt time.Duration
}
func main() {
log.Fatal(err)
}
start := time.Now()
- t.SetResponseHandler(func(addr string) func(dht.Msg) {
- return func(resp dht.Msg) {
+ t.SetResponseHandler(func(addr string) func(dht.Msg, bool) {
+ return func(resp dht.Msg, ok bool) {
pingResponses <- pingResponse{
- addr: addr,
- krpc: resp,
- rtt: time.Now().Sub(start),
+ addr: addr,
+ krpc: resp,
+ rtt: time.Now().Sub(start),
+ msgOk: ok,
}
}
}(netloc))
for _ = range pingStrAddrs {
select {
case resp := <-pingResponses:
+ if !resp.msgOk {
+ break
+ }
responses++
- fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.R.ID, resp.addr), resp.rtt)
+ fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.SenderID(), resp.addr), resp.rtt)
case <-timeoutChan:
break pingResponses
}
"github.com/anacrolix/missinggo"
"github.com/anacrolix/sync"
- "github.com/willf/bloom"
-
"github.com/anacrolix/torrent/logonce"
+ "github.com/willf/bloom"
)
// Maintains state for an ongoing Announce operation. An Announce is started
if err != nil {
return err
}
- t.SetResponseHandler(func(m Msg) {
+ t.SetResponseHandler(func(m Msg, ok bool) {
// Register suggested nodes closer to the target info-hash.
if m.R != nil {
me.mu.Lock()
"github.com/anacrolix/missinggo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
-
- "github.com/anacrolix/torrent/bencode"
- "github.com/anacrolix/torrent/util"
)
func TestSetNilBigInt(t *testing.T) {
}
}
-func TestUnmarshalGetPeersResponse(t *testing.T) {
- var msg Msg
- err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg)
- require.NoError(t, err)
- assert.Len(t, msg.R.Values, 2)
- assert.Len(t, msg.R.Nodes, 2)
- assert.Nil(t, msg.E)
-}
-
func TestDHTDefaultConfig(t *testing.T) {
s, err := NewServer(nil)
if err != nil {
}
defer tn.Close()
ok := make(chan bool)
- tn.SetResponseHandler(func(msg Msg) {
+ tn.SetResponseHandler(func(msg Msg, msgOk bool) {
ok <- msg.SenderID() == srv0.ID()
})
if !<-ok {
}
}
-func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) {
- b, err := bencode.Marshal(m)
- require.NoError(t, err)
- assert.Equal(t, expected, string(b))
- var _m Msg
- err = bencode.Unmarshal([]byte(expected), &_m)
- assert.NoError(t, err)
- assert.EqualValues(t, m, _m)
- assert.EqualValues(t, m.R, _m.R)
-}
-
-func TestMarshalUnmarshalMsg(t *testing.T) {
- testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e")
- testMarshalUnmarshalMsg(t, Msg{
- Y: "q",
- Q: "ping",
- T: "hi",
- }, "d1:q4:ping1:t2:hi1:y1:qe")
- testMarshalUnmarshalMsg(t, Msg{
- Y: "e",
- T: "42",
- E: &KRPCError{Code: 200, Msg: "fuck"},
- }, "d1:eli200e4:fucke1:t2:421:y1:ee")
- testMarshalUnmarshalMsg(t, Msg{
- Y: "r",
- T: "\x8c%",
- R: &Return{},
- }, "d1:rd2:id0:5:token0:e1:t2:\x8c%1:y1:re")
- testMarshalUnmarshalMsg(t, Msg{
- Y: "r",
- T: "\x8c%",
- R: &Return{
- Nodes: CompactIPv4NodeInfo{
- NodeInfo{
- Addr: newDHTAddr(&net.UDPAddr{
- IP: net.IPv4(1, 2, 3, 4),
- Port: 0x1234,
- }),
- },
- },
- },
- }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x1245:token0:e1:t2:\x8c%1:y1:re")
- testMarshalUnmarshalMsg(t, Msg{
- Y: "r",
- T: "\x8c%",
- R: &Return{
- Values: []util.CompactPeer{
- util.CompactPeer{
- IP: net.IPv4(1, 2, 3, 4).To4(),
- Port: 0x5678,
- },
- },
- },
- }, "d1:rd2:id0:5:token0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re")
-}
-
func TestAnnounceTimeout(t *testing.T) {
s, err := NewServer(&ServerConfig{
BootstrapNodes: []string{"1.2.3.4:5"},
// may be correlated with multiple queries to the same node. The transaction ID should be encoded as a short string of binary numbers, typically 2 characters are enough as they cover 2^16 outstanding queries. The other key contained in every KRPC message is "y" with a single character value describing the type of message. The value of the "y" key is one of "q" for query, "r" for response, or "e" for error.
// 3 message types: QUERY, RESPONSE, ERROR
type Msg struct {
- Q string `bencode:"q,omitempty"` // method name of the query (on of 4: "ping", "find_node", "get_peers", "announce_peer")
- A *struct {
- ID string `bencode:"id"` // ID of the quirying Node
- InfoHash string `bencode:"info_hash"` // InfoHash of the torrent
- Target string `bencode:"target"` // ID of the node sought
- } `bencode:"a,omitempty"` // named arguments sent with a query
- T string `bencode:"t"` // required: transaction ID
- Y string `bencode:"y"` // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR
- R *Return `bencode:"r,omitempty"` // RESPONSE type only
- E *KRPCError `bencode:"e,omitempty"` // ERROR type only
+ Q string `bencode:"q,omitempty"` // Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer")
+ A *MsgArgs `bencode:"a,omitempty"` // named arguments sent with a query
+ T string `bencode:"t"` // required: transaction ID
+ Y string `bencode:"y"` // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR
+ R *Return `bencode:"r,omitempty"` // RESPONSE type only
+ E *KRPCError `bencode:"e,omitempty"` // ERROR type only
+ IP util.CompactPeer `bencode:"ip,omitempty"`
+}
+
+type MsgArgs struct {
+ ID string `bencode:"id"` // ID of the quirying Node
+ InfoHash string `bencode:"info_hash"` // InfoHash of the torrent
+ Target string `bencode:"target"` // ID of the node sought
}
type Return struct {
ID string `bencode:"id"` // ID of the querying node
Nodes CompactIPv4NodeInfo `bencode:"nodes,omitempty"`
- Token string `bencode:"token"`
+ Token string `bencode:"token,omitempty"`
Values []util.CompactPeer `bencode:"values,omitempty"`
}
--- /dev/null
+package dht
+
+import (
+ "net"
+ "testing"
+
+ "github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/util"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) {
+ b, err := bencode.Marshal(m)
+ require.NoError(t, err)
+ assert.Equal(t, expected, string(b))
+ var _m Msg
+ err = bencode.Unmarshal([]byte(expected), &_m)
+ assert.NoError(t, err)
+ assert.EqualValues(t, m, _m)
+ assert.EqualValues(t, m.A, _m.A)
+ assert.EqualValues(t, m.R, _m.R)
+}
+
+func TestMarshalUnmarshalMsg(t *testing.T) {
+ testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "q",
+ Q: "ping",
+ T: "hi",
+ }, "d1:q4:ping1:t2:hi1:y1:qe")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "e",
+ T: "42",
+ E: &KRPCError{Code: 200, Msg: "fuck"},
+ }, "d1:eli200e4:fucke1:t2:421:y1:ee")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "r",
+ T: "\x8c%",
+ R: &Return{},
+ }, "d1:rd2:id0:e1:t2:\x8c%1:y1:re")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "r",
+ T: "\x8c%",
+ R: &Return{
+ Nodes: CompactIPv4NodeInfo{
+ NodeInfo{
+ Addr: newDHTAddr(&net.UDPAddr{
+ IP: net.IPv4(1, 2, 3, 4).To4(),
+ Port: 0x1234,
+ }),
+ },
+ },
+ },
+ }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x124e1:t2:\x8c%1:y1:re")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "r",
+ T: "\x8c%",
+ R: &Return{
+ Values: []util.CompactPeer{
+ util.CompactPeer{
+ IP: net.IPv4(1, 2, 3, 4).To4(),
+ Port: 0x5678,
+ },
+ },
+ },
+ }, "d1:rd2:id0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re")
+ testMarshalUnmarshalMsg(t, Msg{
+ Y: "r",
+ T: "\x03",
+ R: &Return{
+ ID: "\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2g",
+ },
+ IP: util.CompactPeer{net.IPv4(124, 168, 180, 8).To4(), 62844},
+ }, "d2:ip6:|\xa8\xb4\b\xf5|1:rd2:id20:\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2ge1:t1:\x031:y1:re")
+}
+
+func TestUnmarshalGetPeersResponse(t *testing.T) {
+ var msg Msg
+ err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg)
+ require.NoError(t, err)
+ assert.Len(t, msg.R.Values, 2)
+ assert.Len(t, msg.R.Nodes, 2)
+ assert.Nil(t, msg.E)
+}
}
missinggo.CopyExact(cni.ID[:], b[:20])
cni.Addr = newDHTAddr(&net.UDPAddr{
- IP: net.IPv4(b[20], b[21], b[22], b[23]),
+ IP: append(make([]byte, 0, 4), b[20:24]...),
Port: int(binary.BigEndian.Uint16(b[24:26])),
})
return nil
return
}
outstanding.Add(1)
- t.SetResponseHandler(func(Msg) {
+ t.SetResponseHandler(func(Msg, bool) {
outstanding.Done()
})
}
"time"
)
-// Transaction keeps track of a message exchange between nodes,
-// such as a query message and a response message
+// Transaction keeps track of a message exchange between nodes, such as a
+// query message and a response message.
type Transaction struct {
mu sync.Mutex
remoteAddr dHTAddr
s *Server
retries int
lastSend time.Time
- userOnResponse func(Msg)
+ userOnResponse func(Msg, bool)
}
// SetResponseHandler sets up a function to be called when query response
-// arrives
-func (t *Transaction) SetResponseHandler(f func(Msg)) {
+// arrives.
+func (t *Transaction) SetResponseHandler(f func(Msg, bool)) {
t.mu.Lock()
defer t.mu.Unlock()
t.userOnResponse = f
}
select {
case r, ok := <-t.response:
- if !ok {
- // TODO: I think some assumption is broken. This isn't supposed to
- // happen.
- break
- }
- t.userOnResponse(r)
+ t.userOnResponse(r, ok)
// Shouldn't be called more than once.
t.userOnResponse = nil
default: