]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Don't reconnect before sending requests with current conn ID
authorMatt Joiner <anacrolix@gmail.com>
Mon, 20 Feb 2023 04:49:55 +0000 (15:49 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 20 Feb 2023 04:49:55 +0000 (15:49 +1100)
The tracker udp://tracker.torrent.eu.org:451/announce gives `error response: "Connection ID missmatch.\x00"` every 2 minutes when under heavy use. I suspect that reconnect requests are sent just after the connection ID is confirmed as not stale, but before it used for a request, and the server rejects the request after processing the reconnect first.

It might also just be that that tracker server implementation is lazy and marks everything stale on regular boundaries.

go.mod
tracker/udp/client.go
tracker/udp/udp_test.go

diff --git a/go.mod b/go.mod
index 69e2ed28491c0ff03d8c453f43dd085dd4ab78c5..64e1d63a4d1f779b3b243c9a64736e6c69c33a7a 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/anacrolix/torrent
 
-go 1.18
+go 1.20
 
 require (
        crawshaw.io/sqlite v0.3.3-0.20220618202545-d1964889ea3c
index 42f0d14e6a99e6a4a226a25adef43d91c4469b14..d570b1a0ee0c4e007667e9ffc4349a9d6ca5024a 100644 (file)
@@ -19,8 +19,11 @@ type Client struct {
        mu           sync.Mutex
        connId       ConnectionId
        connIdIssued time.Time
-       Dispatcher   *Dispatcher
-       Writer       io.Writer
+
+       shouldReconnectOverride func() bool
+
+       Dispatcher *Dispatcher
+       Writer     io.Writer
 }
 
 func (cl *Client) Announce(
@@ -81,14 +84,26 @@ func (cl *Client) Scrape(
        return
 }
 
+func (cl *Client) shouldReconnectDefault() bool {
+       return cl.connIdIssued.IsZero() || time.Since(cl.connIdIssued) >= time.Minute
+}
+
+func (cl *Client) shouldReconnect() bool {
+       if cl.shouldReconnectOverride != nil {
+               return cl.shouldReconnectOverride()
+       }
+       return cl.shouldReconnectDefault()
+}
+
 func (cl *Client) connect(ctx context.Context) (err error) {
-       // We could get fancier here and use RWMutex, and even fire off the connection asynchronously
-       // and provide a grace period while it resolves.
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       if !cl.connIdIssued.IsZero() && time.Since(cl.connIdIssued) < time.Minute {
+       if !cl.shouldReconnect() {
                return nil
        }
+       return cl.doConnectRoundTrip(ctx)
+}
+
+// This just does the connect request and updates local state if it succeeds.
+func (cl *Client) doConnectRoundTrip(ctx context.Context) (err error) {
        respBody, _, err := cl.request(ctx, ActionConnect, nil)
        if err != nil {
                return err
@@ -100,6 +115,7 @@ func (cl *Client) connect(ctx context.Context) (err error) {
        }
        cl.connId = connResp.ConnectionId
        cl.connIdIssued = time.Now()
+       //log.Printf("conn id set to %x", cl.connId)
        return
 }
 
@@ -116,25 +132,45 @@ func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id Conne
        return
 }
 
-func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
-       var buf bytes.Buffer
-       for n := 0; ; n++ {
-               var connId ConnectionId
+func (cl *Client) writeRequest(
+       ctx context.Context, action Action, body []byte, tId TransactionId, buf *bytes.Buffer,
+) (
+       err error,
+) {
+       var connId ConnectionId
+       if action == ActionConnect {
+               connId = ConnectRequestConnectionId
+       } else {
+               // We lock here while establishing a connection ID, and then ensuring that the request is
+               // written before allowing the connection ID to change again. This is to ensure the server
+               // doesn't assign us another ID before we've sent this request. Note that this doesn't allow
+               // for us to return if the context is cancelled while we wait to obtain a new ID.
+               cl.mu.Lock()
+               defer cl.mu.Unlock()
                connId, err = cl.connIdForRequest(ctx, action)
                if err != nil {
                        return
                }
-               buf.Reset()
-               err = Write(&buf, RequestHeader{
-                       ConnectionId:  connId,
-                       Action:        action,
-                       TransactionId: tId,
-               })
-               if err != nil {
-                       panic(err)
-               }
-               buf.Write(body)
-               _, err = cl.Writer.Write(buf.Bytes())
+       }
+       buf.Reset()
+       err = Write(buf, RequestHeader{
+               ConnectionId:  connId,
+               Action:        action,
+               TransactionId: tId,
+       })
+       if err != nil {
+               panic(err)
+       }
+       buf.Write(body)
+       _, err = cl.Writer.Write(buf.Bytes())
+       //log.Printf("sent request with conn id %x", connId)
+       return
+}
+
+func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
+       var buf bytes.Buffer
+       for n := 0; ; n++ {
+               err = cl.writeRequest(ctx, action, body, tId, &buf)
                if err != nil {
                        return
                }
index cbb41778bd4368d3b87d3a97eb577c4c865d9cb1..51f58e4c2c1cfa533d49be9d963408d91f17194e 100644 (file)
@@ -2,12 +2,18 @@ package udp
 
 import (
        "bytes"
+       "context"
+       "crypto/rand"
        "encoding/binary"
        "io"
        "net"
+       "sync"
        "testing"
+       "time"
 
        "github.com/anacrolix/dht/v2/krpc"
+       _ "github.com/anacrolix/envpprof"
+       "github.com/anacrolix/missinggo/v2/iter"
        qt "github.com/frankban/quicktest"
        "github.com/stretchr/testify/require"
 )
@@ -91,3 +97,41 @@ func TestConnClientLogDispatchUnknownTransactionId(t *testing.T) {
        _, err = pc.WriteTo(make([]byte, 30), &ccAddr)
        c.Assert(err, qt.IsNil)
 }
+
+func TestConnectionIdMismatch(t *testing.T) {
+       t.Skip("Server host returns consistent connection ID in limited tests and so isn't effective.")
+       cl, err := NewConnClient(NewConnClientOpts{
+               // This host seems to return `Connection ID missmatch.\x00` every 2 minutes or so under
+               // heavy use.
+               Host: "tracker.torrent.eu.org:451",
+               //Host:    "tracker.opentrackr.org:1337",
+               Network: "udp",
+       })
+       c := qt.New(t)
+       c.Assert(err, qt.IsNil)
+       defer cl.Close()
+       ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+       defer cancel()
+       // Force every request to use a different connection ID. It's racey, but we want to get a
+       // different ID issued before a request can be sent with an old ID.
+       cl.Client.shouldReconnectOverride = func() bool { return true }
+       started := time.Now()
+       var wg sync.WaitGroup
+       for range iter.N(2) {
+               ar := AnnounceRequest{
+                       NumWant: -1,
+                       Event:   2,
+               }
+               rand.Read(ar.InfoHash[:])
+               rand.Read(ar.PeerId[:])
+               //spew.Dump(ar)
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       _, _, err := cl.Announce(ctx, ar, Options{})
+                       // I'm looking for `error response: "Connection ID missmatch.\x00"`.
+                       t.Logf("announce error after %v: %v", time.Since(started), err)
+               }()
+       }
+       wg.Wait()
+}