]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Actually fix #41 properly; and several big changes
authorMatt Joiner <anacrolix@gmail.com>
Sun, 6 Dec 2015 16:28:28 +0000 (03:28 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 6 Dec 2015 16:28:28 +0000 (03:28 +1100)
bencode:
 * Handle omitempty on non-trivial types.
cmd/dht-ping:
 * Handle timeouts in ping transactions.
dht:
 * Propagate failed transaction responses properly.
 * Msg related tests are moved into their own file.
 * In some places, IPs in binary form are shorted to 4 bytes if IPv4.

bencode/encode.go
cmd/dht-ping/main.go
dht/announce.go
dht/dht_test.go
dht/msg.go
dht/msg_test.go [new file with mode: 0644]
dht/nodeinfo.go
dht/server.go
dht/transaction.go

index 17da1defaed2a9baf73b94b84f290971dc159c7c..54250af31e3a8ec10d07485b4659d7ba3d423a8b 100644 (file)
@@ -7,24 +7,12 @@ import (
        "sort"
        "strconv"
        "sync"
+
+       "github.com/anacrolix/missinggo"
 )
 
 func is_empty_value(v reflect.Value) bool {
-       switch v.Kind() {
-       case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
-               return v.Len() == 0
-       case reflect.Bool:
-               return !v.Bool()
-       case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
-               return v.Int() == 0
-       case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
-               return v.Uint() == 0
-       case reflect.Float32, reflect.Float64:
-               return v.Float() == 0
-       case reflect.Interface, reflect.Ptr:
-               return v.IsNil()
-       }
-       return false
+       return missinggo.IsEmptyValue(v)
 }
 
 type encoder struct {
index bae5843171a81264b858bb4860b525328f05c3a2..a1c4b5749bc14bfa2a5c2340cccef52f071be811 100644 (file)
@@ -13,9 +13,10 @@ import (
 )
 
 type pingResponse struct {
-       addr string
-       krpc dht.Msg
-       rtt  time.Duration
+       addr  string
+       krpc  dht.Msg
+       msgOk bool
+       rtt   time.Duration
 }
 
 func main() {
@@ -48,12 +49,13 @@ func main() {
                                log.Fatal(err)
                        }
                        start := time.Now()
-                       t.SetResponseHandler(func(addr string) func(dht.Msg) {
-                               return func(resp dht.Msg) {
+                       t.SetResponseHandler(func(addr string) func(dht.Msg, bool) {
+                               return func(resp dht.Msg, ok bool) {
                                        pingResponses <- pingResponse{
-                                               addr: addr,
-                                               krpc: resp,
-                                               rtt:  time.Now().Sub(start),
+                                               addr:  addr,
+                                               krpc:  resp,
+                                               rtt:   time.Now().Sub(start),
+                                               msgOk: ok,
                                        }
                                }
                        }(netloc))
@@ -68,8 +70,11 @@ pingResponses:
        for _ = range pingStrAddrs {
                select {
                case resp := <-pingResponses:
+                       if !resp.msgOk {
+                               break
+                       }
                        responses++
-                       fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.R.ID, resp.addr), resp.rtt)
+                       fmt.Printf("%-65s %s\n", fmt.Sprintf("%x (%s):", resp.krpc.SenderID(), resp.addr), resp.rtt)
                case <-timeoutChan:
                        break pingResponses
                }
index f87b76f200f29851f8cf2a40f5c3238ae456fcb6..a41dbf9d4e887f11c1559abc6b9a53ed77deb90f 100644 (file)
@@ -8,9 +8,8 @@ import (
 
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/sync"
-       "github.com/willf/bloom"
-
        "github.com/anacrolix/torrent/logonce"
+       "github.com/willf/bloom"
 )
 
 // Maintains state for an ongoing Announce operation. An Announce is started
@@ -158,7 +157,7 @@ func (me *Announce) getPeers(addr dHTAddr) error {
        if err != nil {
                return err
        }
-       t.SetResponseHandler(func(m Msg) {
+       t.SetResponseHandler(func(m Msg, ok bool) {
                // Register suggested nodes closer to the target info-hash.
                if m.R != nil {
                        me.mu.Lock()
index b8a8897c5b849ec58d8a8b14ed6c17fc452c6029..53457c1a62350e5bfcc177e93b3dea277e92d4e2 100644 (file)
@@ -10,9 +10,6 @@ import (
        "github.com/anacrolix/missinggo"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
-
-       "github.com/anacrolix/torrent/bencode"
-       "github.com/anacrolix/torrent/util"
 )
 
 func TestSetNilBigInt(t *testing.T) {
@@ -109,15 +106,6 @@ func TestClosestNodes(t *testing.T) {
        }
 }
 
-func TestUnmarshalGetPeersResponse(t *testing.T) {
-       var msg Msg
-       err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg)
-       require.NoError(t, err)
-       assert.Len(t, msg.R.Values, 2)
-       assert.Len(t, msg.R.Nodes, 2)
-       assert.Nil(t, msg.E)
-}
-
 func TestDHTDefaultConfig(t *testing.T) {
        s, err := NewServer(nil)
        if err != nil {
@@ -146,7 +134,7 @@ func TestPing(t *testing.T) {
        }
        defer tn.Close()
        ok := make(chan bool)
-       tn.SetResponseHandler(func(msg Msg) {
+       tn.SetResponseHandler(func(msg Msg, msgOk bool) {
                ok <- msg.SenderID() == srv0.ID()
        })
        if !<-ok {
@@ -206,62 +194,6 @@ func TestServerDefaultNodeIdSecure(t *testing.T) {
        }
 }
 
-func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) {
-       b, err := bencode.Marshal(m)
-       require.NoError(t, err)
-       assert.Equal(t, expected, string(b))
-       var _m Msg
-       err = bencode.Unmarshal([]byte(expected), &_m)
-       assert.NoError(t, err)
-       assert.EqualValues(t, m, _m)
-       assert.EqualValues(t, m.R, _m.R)
-}
-
-func TestMarshalUnmarshalMsg(t *testing.T) {
-       testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e")
-       testMarshalUnmarshalMsg(t, Msg{
-               Y: "q",
-               Q: "ping",
-               T: "hi",
-       }, "d1:q4:ping1:t2:hi1:y1:qe")
-       testMarshalUnmarshalMsg(t, Msg{
-               Y: "e",
-               T: "42",
-               E: &KRPCError{Code: 200, Msg: "fuck"},
-       }, "d1:eli200e4:fucke1:t2:421:y1:ee")
-       testMarshalUnmarshalMsg(t, Msg{
-               Y: "r",
-               T: "\x8c%",
-               R: &Return{},
-       }, "d1:rd2:id0:5:token0:e1:t2:\x8c%1:y1:re")
-       testMarshalUnmarshalMsg(t, Msg{
-               Y: "r",
-               T: "\x8c%",
-               R: &Return{
-                       Nodes: CompactIPv4NodeInfo{
-                               NodeInfo{
-                                       Addr: newDHTAddr(&net.UDPAddr{
-                                               IP:   net.IPv4(1, 2, 3, 4),
-                                               Port: 0x1234,
-                                       }),
-                               },
-                       },
-               },
-       }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x1245:token0:e1:t2:\x8c%1:y1:re")
-       testMarshalUnmarshalMsg(t, Msg{
-               Y: "r",
-               T: "\x8c%",
-               R: &Return{
-                       Values: []util.CompactPeer{
-                               util.CompactPeer{
-                                       IP:   net.IPv4(1, 2, 3, 4).To4(),
-                                       Port: 0x5678,
-                               },
-                       },
-               },
-       }, "d1:rd2:id0:5:token0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re")
-}
-
 func TestAnnounceTimeout(t *testing.T) {
        s, err := NewServer(&ServerConfig{
                BootstrapNodes: []string{"1.2.3.4:5"},
index 3fdacceae8601e5b051cbbf32a43f09ecdfb3991..094653bdde01b7d9e804513b321ae87bedc1dc34 100644 (file)
@@ -19,22 +19,25 @@ import (
 // may be correlated with multiple queries to the same node. The transaction ID should be encoded as a short string of binary numbers, typically 2 characters are enough as they cover 2^16 outstanding queries. The other key contained in every KRPC message is "y" with a single character value describing the type of message. The value of the "y" key is one of "q" for query, "r" for response, or "e" for error.
 // 3 message types:  QUERY, RESPONSE, ERROR
 type Msg struct {
-       Q string `bencode:"q,omitempty"` //  method name of the query (on of 4: "ping", "find_node", "get_peers", "announce_peer")
-       A *struct {
-               ID       string `bencode:"id"`        // ID of the quirying Node
-               InfoHash string `bencode:"info_hash"` // InfoHash of the torrent
-               Target   string `bencode:"target"`    // ID of the node sought
-       } `bencode:"a,omitempty"` // named arguments sent with a query
-       T string     `bencode:"t"`           // required: transaction ID
-       Y string     `bencode:"y"`           // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR
-       R *Return    `bencode:"r,omitempty"` // RESPONSE type only
-       E *KRPCError `bencode:"e,omitempty"` // ERROR type only
+       Q  string           `bencode:"q,omitempty"` // Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer")
+       A  *MsgArgs         `bencode:"a,omitempty"` // named arguments sent with a query
+       T  string           `bencode:"t"`           // required: transaction ID
+       Y  string           `bencode:"y"`           // required: type of the message: q for QUERY, r for RESPONSE, e for ERROR
+       R  *Return          `bencode:"r,omitempty"` // RESPONSE type only
+       E  *KRPCError       `bencode:"e,omitempty"` // ERROR type only
+       IP util.CompactPeer `bencode:"ip,omitempty"`
+}
+
+type MsgArgs struct {
+       ID       string `bencode:"id"`        // ID of the quirying Node
+       InfoHash string `bencode:"info_hash"` // InfoHash of the torrent
+       Target   string `bencode:"target"`    // ID of the node sought
 }
 
 type Return struct {
        ID     string              `bencode:"id"` // ID of the querying node
        Nodes  CompactIPv4NodeInfo `bencode:"nodes,omitempty"`
-       Token  string              `bencode:"token"`
+       Token  string              `bencode:"token,omitempty"`
        Values []util.CompactPeer  `bencode:"values,omitempty"`
 }
 
diff --git a/dht/msg_test.go b/dht/msg_test.go
new file mode 100644 (file)
index 0000000..1f1aeb3
--- /dev/null
@@ -0,0 +1,85 @@
+package dht
+
+import (
+       "net"
+       "testing"
+
+       "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/util"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func testMarshalUnmarshalMsg(t *testing.T, m Msg, expected string) {
+       b, err := bencode.Marshal(m)
+       require.NoError(t, err)
+       assert.Equal(t, expected, string(b))
+       var _m Msg
+       err = bencode.Unmarshal([]byte(expected), &_m)
+       assert.NoError(t, err)
+       assert.EqualValues(t, m, _m)
+       assert.EqualValues(t, m.A, _m.A)
+       assert.EqualValues(t, m.R, _m.R)
+}
+
+func TestMarshalUnmarshalMsg(t *testing.T) {
+       testMarshalUnmarshalMsg(t, Msg{}, "d1:t0:1:y0:e")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "q",
+               Q: "ping",
+               T: "hi",
+       }, "d1:q4:ping1:t2:hi1:y1:qe")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "e",
+               T: "42",
+               E: &KRPCError{Code: 200, Msg: "fuck"},
+       }, "d1:eli200e4:fucke1:t2:421:y1:ee")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "r",
+               T: "\x8c%",
+               R: &Return{},
+       }, "d1:rd2:id0:e1:t2:\x8c%1:y1:re")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "r",
+               T: "\x8c%",
+               R: &Return{
+                       Nodes: CompactIPv4NodeInfo{
+                               NodeInfo{
+                                       Addr: newDHTAddr(&net.UDPAddr{
+                                               IP:   net.IPv4(1, 2, 3, 4).To4(),
+                                               Port: 0x1234,
+                                       }),
+                               },
+                       },
+               },
+       }, "d1:rd2:id0:5:nodes26:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x124e1:t2:\x8c%1:y1:re")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "r",
+               T: "\x8c%",
+               R: &Return{
+                       Values: []util.CompactPeer{
+                               util.CompactPeer{
+                                       IP:   net.IPv4(1, 2, 3, 4).To4(),
+                                       Port: 0x5678,
+                               },
+                       },
+               },
+       }, "d1:rd2:id0:6:valuesl6:\x01\x02\x03\x04\x56\x78ee1:t2:\x8c%1:y1:re")
+       testMarshalUnmarshalMsg(t, Msg{
+               Y: "r",
+               T: "\x03",
+               R: &Return{
+                       ID: "\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2g",
+               },
+               IP: util.CompactPeer{net.IPv4(124, 168, 180, 8).To4(), 62844},
+       }, "d2:ip6:|\xa8\xb4\b\xf5|1:rd2:id20:\xeb\xff6isQ\xffJ\xec)ͺ\xab\xf2\xfb\xe3F|\xc2ge1:t1:\x031:y1:re")
+}
+
+func TestUnmarshalGetPeersResponse(t *testing.T) {
+       var msg Msg
+       err := bencode.Unmarshal([]byte("d1:rd6:valuesl6:\x01\x02\x03\x04\x05\x066:\x07\x08\x09\x0a\x0b\x0ce5:nodes52:\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x02\x03\x04\x05\x06\x07\x08\x09\x02\x03\x04\x05\x06\x07ee"), &msg)
+       require.NoError(t, err)
+       assert.Len(t, msg.R.Values, 2)
+       assert.Len(t, msg.R.Nodes, 2)
+       assert.Nil(t, msg.E)
+}
index 49f923218c095007d62e2430a894d0409a152af4..84887c7e33cfe0e082852b3b46e1b372efd562f2 100644 (file)
@@ -39,7 +39,7 @@ func (cni *NodeInfo) UnmarshalCompactIPv4(b []byte) error {
        }
        missinggo.CopyExact(cni.ID[:], b[:20])
        cni.Addr = newDHTAddr(&net.UDPAddr{
-               IP:   net.IPv4(b[20], b[21], b[22], b[23]),
+               IP:   append(make([]byte, 0, 4), b[20:24]...),
                Port: int(binary.BigEndian.Uint16(b[24:26])),
        })
        return nil
index d203ed4b3486215d637d29ab9ae4769d8210eacc..843a389a5a09259646fae78246bb1c4cf773cbc0 100644 (file)
@@ -556,7 +556,7 @@ func (s *Server) bootstrap() (err error) {
                                return
                        }
                        outstanding.Add(1)
-                       t.SetResponseHandler(func(Msg) {
+                       t.SetResponseHandler(func(Msg, bool) {
                                outstanding.Done()
                        })
                }
index 8e3ad0d8bf70c09f41b8f50feddd2b4a731a3911..c4c7b3a451627e340f66651dee7fd039a2ac02f4 100644 (file)
@@ -5,8 +5,8 @@ import (
        "time"
 )
 
-// Transaction keeps track of a message exchange between nodes,
-// such as a query message and a response message
+// Transaction keeps track of a message exchange between nodes, such as a
+// query message and a response message.
 type Transaction struct {
        mu             sync.Mutex
        remoteAddr     dHTAddr
@@ -19,12 +19,12 @@ type Transaction struct {
        s              *Server
        retries        int
        lastSend       time.Time
-       userOnResponse func(Msg)
+       userOnResponse func(Msg, bool)
 }
 
 // SetResponseHandler sets up a function to be called when query response
-// arrives
-func (t *Transaction) SetResponseHandler(f func(Msg)) {
+// arrives.
+func (t *Transaction) SetResponseHandler(f func(Msg, bool)) {
        t.mu.Lock()
        defer t.mu.Unlock()
        t.userOnResponse = f
@@ -37,12 +37,7 @@ func (t *Transaction) tryHandleResponse() {
        }
        select {
        case r, ok := <-t.response:
-               if !ok {
-                       // TODO: I think some assumption is broken. This isn't supposed to
-                       // happen.
-                       break
-               }
-               t.userOnResponse(r)
+               t.userOnResponse(r, ok)
                // Shouldn't be called more than once.
                t.userOnResponse = nil
        default: