From ff835db95500f3353d99751d807cadd7ae77d058 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 7 Dec 2015 03:28:28 +1100 Subject: [PATCH] Actually fix #41 properly; and several big changes bencode: * Handle omitempty on non-trivial types. cmd/dht-ping: * Handle timeouts in ping transactions. dht: * Propagate failed transaction responses properly. * Msg related tests are moved into their own file. * In some places, IPs in binary form are shorted to 4 bytes if IPv4. --- bencode/encode.go | 18 ++-------- cmd/dht-ping/main.go | 23 +++++++----- dht/announce.go | 5 ++- dht/dht_test.go | 70 +----------------------------------- dht/msg.go | 25 +++++++------ dht/msg_test.go | 85 ++++++++++++++++++++++++++++++++++++++++++++ dht/nodeinfo.go | 2 +- dht/server.go | 2 +- dht/transaction.go | 17 ++++----- 9 files changed, 127 insertions(+), 120 deletions(-) create mode 100644 dht/msg_test.go diff --git a/bencode/encode.go b/bencode/encode.go index 17da1def..54250af3 100644 --- a/bencode/encode.go +++ b/bencode/encode.go @@ -7,24 +7,12 @@ import ( "sort" "strconv" "sync" + + "github.com/anacrolix/missinggo" ) func is_empty_value(v reflect.Value) bool { - switch v.Kind() { - case reflect.Array, reflect.Map, reflect.Slice, reflect.String: - return v.Len() == 0 - case reflect.Bool: - return !v.Bool() - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - return v.Int() == 0 - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: - return v.Uint() == 0 - case reflect.Float32, reflect.Float64: - return v.Float() == 0 - case reflect.Interface, reflect.Ptr: - return v.IsNil() - } - return false + return missinggo.IsEmptyValue(v) } type encoder struct { diff --git a/cmd/dht-ping/main.go b/cmd/dht-ping/main.go index bae58431..a1c4b574 100644 --- a/cmd/dht-ping/main.go +++ b/cmd/dht-ping/main.go @@ -13,9 +13,10 @@ import ( ) type pingResponse struct { - addr string - krpc dht.Msg - rtt time.Duration + addr string + krpc dht.Msg + msgOk bool + rtt time.Duration } func main() { @@ -48,12 +49,13 @@ func main() { log.Fatal(err) } start := time.Now() - t.SetResponseHandler(func(addr string) func(dht.Msg) { - return func(resp dht.Msg) { + t.SetResponseHandler(func(addr string) func(dht.Msg, bool) { + return func(resp dht.Msg, ok bool) { pingResponses <- pingResponse{ - addr: addr, - krpc: resp, - rtt: time.Now().Sub(start), + addr: addr, + krpc: resp, + rtt: time.Now().Sub(start), + msgOk: ok, } } }(netloc)) @@ -68,8 +70,11 @@ pingResponses: for _ = range pingStrAddrs { select { case resp := <-pingResponses: + if !resp.msgOk { + break + } responses++ - fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.R.ID, resp.addr), resp.rtt) + fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.SenderID(), resp.addr), resp.rtt) case <-timeoutChan: break pingResponses } diff --git a/dht/announce.go b/dht/announce.go index f87b76f2..a41dbf9d 100644 --- a/dht/announce.go +++ b/dht/announce.go @@ -8,9 +8,8 @@ import ( "github.com/anacrolix/missinggo" "github.com/anacrolix/sync" - "github.com/willf/bloom" - "github.com/anacrolix/torrent/logonce" + "github.com/willf/bloom" ) // Maintains state for an ongoing Announce operation. An Announce is started @@ -158,7 +157,7 @@ func (me *Announce) getPeers(addr dHTAddr) error { if err != nil { return err } - t.SetResponseHandler(func(m Msg) { + t.SetResponseHandler(func(m Msg, ok bool) { // Register suggested nodes closer to the target info-hash. if m.R != nil { me.mu.Lock() diff --git a/dht/dht_test.go b/dht/dht_test.go index b8a8897c..53457c1a 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -10,9 +10,6 @@ import ( "github.com/anacrolix/missinggo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/util" ) func TestSetNilBigInt(t *testing.T) { @@ -109,15 +106,6 @@ func TestClosestNodes(t *testing.T) { } } -func TestUnmarshalGetPeersResponse(t *testing.T) { - var msg Msg - err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg) - require.NoError(t, err) - assert.Len(t, msg.R.Values, 2) - assert.Len(t, msg.R.Nodes, 2) - assert.Nil(t, msg.E) -} - func TestDHTDefaultConfig(t *testing.T) { s, err := NewServer(nil) if err != nil { @@ -146,7 +134,7 @@ func TestPing(t *testing.T) { } defer tn.Close() ok := make(chan bool) - tn.SetResponseHandler(func(msg Msg) { + tn.SetResponseHandler(func(msg Msg, msgOk bool) { ok <- msg.SenderID() == srv0.ID() }) if !<-ok { @@ -206,62 +194,6 @@ func TestServerDefaultNodeIdSecure(t *testing.T) { } } -func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) { - b, err := bencode.Marshal(m) - require.NoError(t, err) - assert.Equal(t, expected, string(b)) - var _m Msg - err = bencode.Unmarshal([]byte(expected), &_m) - assert.NoError(t, err) - assert.EqualValues(t, m, _m) - assert.EqualValues(t, m.R, _m.R) -} - -func TestMarshalUnmarshalMsg(t *testing.T) { - testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e") - testMarshalUnmarshalMsg(t, Msg{ - Y: "q", - Q: "ping", - T: "hi", - }, "d1:q4:ping1:t2:hi1:y1:qe") - testMarshalUnmarshalMsg(t, Msg{ - Y: "e", - T: "42", - E: &KRPCError{Code: 200, Msg: "fuck"}, - }, "d1:eli200e4:fucke1:t2:421:y1:ee") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{}, - }, "d1:rd2:id0:5:token0:e1:t2:\x8c%1:y1:re") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{ - Nodes: CompactIPv4NodeInfo{ - NodeInfo{ - Addr: newDHTAddr(&net.UDPAddr{ - IP: net.IPv4(1, 2, 3, 4), - Port: 0x1234, - }), - }, - }, - }, - }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x1245:token0:e1:t2:\x8c%1:y1:re") - testMarshalUnmarshalMsg(t, Msg{ - Y: "r", - T: "\x8c%", - R: &Return{ - Values: []util.CompactPeer{ - util.CompactPeer{ - IP: net.IPv4(1, 2, 3, 4).To4(), - Port: 0x5678, - }, - }, - }, - }, "d1:rd2:id0:5:token0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re") -} - func TestAnnounceTimeout(t *testing.T) { s, err := NewServer(&ServerConfig{ BootstrapNodes: []string{"1.2.3.4:5"}, diff --git a/dht/msg.go b/dht/msg.go index 3fdaccea..094653bd 100644 --- a/dht/msg.go +++ b/dht/msg.go @@ -19,22 +19,25 @@ import ( // may be correlated with multiple queries to the same node. The transaction ID should be encoded as a short string of binary numbers, typically 2 characters are enough as they cover 2^16 outstanding queries. The other key contained in every KRPC message is "y" with a single character value describing the type of message. The value of the "y" key is one of "q" for query, "r" for response, or "e" for error. // 3 message types: QUERY, RESPONSE, ERROR type Msg struct { - Q string `bencode:"q,omitempty"` // method name of the query (on of 4: "ping", "find_node", "get_peers", "announce_peer") - A *struct { - ID string `bencode:"id"` // ID of the quirying Node - InfoHash string `bencode:"info_hash"` // InfoHash of the torrent - Target string `bencode:"target"` // ID of the node sought - } `bencode:"a,omitempty"` // named arguments sent with a query - T string `bencode:"t"` // required: transaction ID - Y string `bencode:"y"` // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR - R *Return `bencode:"r,omitempty"` // RESPONSE type only - E *KRPCError `bencode:"e,omitempty"` // ERROR type only + Q string `bencode:"q,omitempty"` // Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer") + A *MsgArgs `bencode:"a,omitempty"` // named arguments sent with a query + T string `bencode:"t"` // required: transaction ID + Y string `bencode:"y"` // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR + R *Return `bencode:"r,omitempty"` // RESPONSE type only + E *KRPCError `bencode:"e,omitempty"` // ERROR type only + IP util.CompactPeer `bencode:"ip,omitempty"` +} + +type MsgArgs struct { + ID string `bencode:"id"` // ID of the quirying Node + InfoHash string `bencode:"info_hash"` // InfoHash of the torrent + Target string `bencode:"target"` // ID of the node sought } type Return struct { ID string `bencode:"id"` // ID of the querying node Nodes CompactIPv4NodeInfo `bencode:"nodes,omitempty"` - Token string `bencode:"token"` + Token string `bencode:"token,omitempty"` Values []util.CompactPeer `bencode:"values,omitempty"` } diff --git a/dht/msg_test.go b/dht/msg_test.go new file mode 100644 index 00000000..1f1aeb3e --- /dev/null +++ b/dht/msg_test.go @@ -0,0 +1,85 @@ +package dht + +import ( + "net" + "testing" + + "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) { + b, err := bencode.Marshal(m) + require.NoError(t, err) + assert.Equal(t, expected, string(b)) + var _m Msg + err = bencode.Unmarshal([]byte(expected), &_m) + assert.NoError(t, err) + assert.EqualValues(t, m, _m) + assert.EqualValues(t, m.A, _m.A) + assert.EqualValues(t, m.R, _m.R) +} + +func TestMarshalUnmarshalMsg(t *testing.T) { + testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e") + testMarshalUnmarshalMsg(t, Msg{ + Y: "q", + Q: "ping", + T: "hi", + }, "d1:q4:ping1:t2:hi1:y1:qe") + testMarshalUnmarshalMsg(t, Msg{ + Y: "e", + T: "42", + E: &KRPCError{Code: 200, Msg: "fuck"}, + }, "d1:eli200e4:fucke1:t2:421:y1:ee") + testMarshalUnmarshalMsg(t, Msg{ + Y: "r", + T: "\x8c%", + R: &Return{}, + }, "d1:rd2:id0:e1:t2:\x8c%1:y1:re") + testMarshalUnmarshalMsg(t, Msg{ + Y: "r", + T: "\x8c%", + R: &Return{ + Nodes: CompactIPv4NodeInfo{ + NodeInfo{ + Addr: newDHTAddr(&net.UDPAddr{ + IP: net.IPv4(1, 2, 3, 4).To4(), + Port: 0x1234, + }), + }, + }, + }, + }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x124e1:t2:\x8c%1:y1:re") + testMarshalUnmarshalMsg(t, Msg{ + Y: "r", + T: "\x8c%", + R: &Return{ + Values: []util.CompactPeer{ + util.CompactPeer{ + IP: net.IPv4(1, 2, 3, 4).To4(), + Port: 0x5678, + }, + }, + }, + }, "d1:rd2:id0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re") + testMarshalUnmarshalMsg(t, Msg{ + Y: "r", + T: "\x03", + R: &Return{ + ID: "\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2g", + }, + IP: util.CompactPeer{net.IPv4(124, 168, 180, 8).To4(), 62844}, + }, "d2:ip6:|\xa8\xb4\b\xf5|1:rd2:id20:\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2ge1:t1:\x031:y1:re") +} + +func TestUnmarshalGetPeersResponse(t *testing.T) { + var msg Msg + err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg) + require.NoError(t, err) + assert.Len(t, msg.R.Values, 2) + assert.Len(t, msg.R.Nodes, 2) + assert.Nil(t, msg.E) +} diff --git a/dht/nodeinfo.go b/dht/nodeinfo.go index 49f92321..84887c7e 100644 --- a/dht/nodeinfo.go +++ b/dht/nodeinfo.go @@ -39,7 +39,7 @@ func (cni *NodeInfo) UnmarshalCompactIPv4(b []byte) error { } missinggo.CopyExact(cni.ID[:], b[:20]) cni.Addr = newDHTAddr(&net.UDPAddr{ - IP: net.IPv4(b[20], b[21], b[22], b[23]), + IP: append(make([]byte, 0, 4), b[20:24]...), Port: int(binary.BigEndian.Uint16(b[24:26])), }) return nil diff --git a/dht/server.go b/dht/server.go index d203ed4b..843a389a 100644 --- a/dht/server.go +++ b/dht/server.go @@ -556,7 +556,7 @@ func (s *Server) bootstrap() (err error) { return } outstanding.Add(1) - t.SetResponseHandler(func(Msg) { + t.SetResponseHandler(func(Msg, bool) { outstanding.Done() }) } diff --git a/dht/transaction.go b/dht/transaction.go index 8e3ad0d8..c4c7b3a4 100644 --- a/dht/transaction.go +++ b/dht/transaction.go @@ -5,8 +5,8 @@ import ( "time" ) -// Transaction keeps track of a message exchange between nodes, -// such as a query message and a response message +// Transaction keeps track of a message exchange between nodes, such as a +// query message and a response message. type Transaction struct { mu sync.Mutex remoteAddr dHTAddr @@ -19,12 +19,12 @@ type Transaction struct { s *Server retries int lastSend time.Time - userOnResponse func(Msg) + userOnResponse func(Msg, bool) } // SetResponseHandler sets up a function to be called when query response -// arrives -func (t *Transaction) SetResponseHandler(f func(Msg)) { +// arrives. +func (t *Transaction) SetResponseHandler(f func(Msg, bool)) { t.mu.Lock() defer t.mu.Unlock() t.userOnResponse = f @@ -37,12 +37,7 @@ func (t *Transaction) tryHandleResponse() { } select { case r, ok := <-t.response: - if !ok { - // TODO: I think some assumption is broken. This isn't supposed to - // happen. - break - } - t.userOnResponse(r) + t.userOnResponse(r, ok) // Shouldn't be called more than once. t.userOnResponse = nil default: -- 2.48.1