From 63982c3c809c837dde36b9e5d54ef4c576ed5891 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 20 Feb 2023 15:49:55 +1100 Subject: [PATCH] Don't reconnect before sending requests with current conn ID The tracker udp://tracker.torrent.eu.org:451/announce gives `error response: "Connection ID missmatch.\x00"` every 2 minutes when under heavy use. I suspect that reconnect requests are sent just after the connection ID is confirmed as not stale, but before it used for a request, and the server rejects the request after processing the reconnect first. It might also just be that that tracker server implementation is lazy and marks everything stale on regular boundaries. --- go.mod | 2 +- tracker/udp/client.go | 80 +++++++++++++++++++++++++++++------------ tracker/udp/udp_test.go | 44 +++++++++++++++++++++++ 3 files changed, 103 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 69e2ed28..64e1d63a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/anacrolix/torrent -go 1.18 +go 1.20 require ( crawshaw.io/sqlite v0.3.3-0.20220618202545-d1964889ea3c diff --git a/tracker/udp/client.go b/tracker/udp/client.go index 42f0d14e..d570b1a0 100644 --- a/tracker/udp/client.go +++ b/tracker/udp/client.go @@ -19,8 +19,11 @@ type Client struct { mu sync.Mutex connId ConnectionId connIdIssued time.Time - Dispatcher *Dispatcher - Writer io.Writer + + shouldReconnectOverride func() bool + + Dispatcher *Dispatcher + Writer io.Writer } func (cl *Client) Announce( @@ -81,14 +84,26 @@ func (cl *Client) Scrape( return } +func (cl *Client) shouldReconnectDefault() bool { + return cl.connIdIssued.IsZero() || time.Since(cl.connIdIssued) >= time.Minute +} + +func (cl *Client) shouldReconnect() bool { + if cl.shouldReconnectOverride != nil { + return cl.shouldReconnectOverride() + } + return cl.shouldReconnectDefault() +} + func (cl *Client) connect(ctx context.Context) (err error) { - // We could get fancier here and use RWMutex, and even fire off the connection asynchronously - // and provide a grace period while it resolves. - cl.mu.Lock() - defer cl.mu.Unlock() - if !cl.connIdIssued.IsZero() && time.Since(cl.connIdIssued) < time.Minute { + if !cl.shouldReconnect() { return nil } + return cl.doConnectRoundTrip(ctx) +} + +// This just does the connect request and updates local state if it succeeds. +func (cl *Client) doConnectRoundTrip(ctx context.Context) (err error) { respBody, _, err := cl.request(ctx, ActionConnect, nil) if err != nil { return err @@ -100,6 +115,7 @@ func (cl *Client) connect(ctx context.Context) (err error) { } cl.connId = connResp.ConnectionId cl.connIdIssued = time.Now() + //log.Printf("conn id set to %x", cl.connId) return } @@ -116,25 +132,45 @@ func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id Conne return } -func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) { - var buf bytes.Buffer - for n := 0; ; n++ { - var connId ConnectionId +func (cl *Client) writeRequest( + ctx context.Context, action Action, body []byte, tId TransactionId, buf *bytes.Buffer, +) ( + err error, +) { + var connId ConnectionId + if action == ActionConnect { + connId = ConnectRequestConnectionId + } else { + // We lock here while establishing a connection ID, and then ensuring that the request is + // written before allowing the connection ID to change again. This is to ensure the server + // doesn't assign us another ID before we've sent this request. Note that this doesn't allow + // for us to return if the context is cancelled while we wait to obtain a new ID. + cl.mu.Lock() + defer cl.mu.Unlock() connId, err = cl.connIdForRequest(ctx, action) if err != nil { return } - buf.Reset() - err = Write(&buf, RequestHeader{ - ConnectionId: connId, - Action: action, - TransactionId: tId, - }) - if err != nil { - panic(err) - } - buf.Write(body) - _, err = cl.Writer.Write(buf.Bytes()) + } + buf.Reset() + err = Write(buf, RequestHeader{ + ConnectionId: connId, + Action: action, + TransactionId: tId, + }) + if err != nil { + panic(err) + } + buf.Write(body) + _, err = cl.Writer.Write(buf.Bytes()) + //log.Printf("sent request with conn id %x", connId) + return +} + +func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) { + var buf bytes.Buffer + for n := 0; ; n++ { + err = cl.writeRequest(ctx, action, body, tId, &buf) if err != nil { return } diff --git a/tracker/udp/udp_test.go b/tracker/udp/udp_test.go index cbb41778..51f58e4c 100644 --- a/tracker/udp/udp_test.go +++ b/tracker/udp/udp_test.go @@ -2,12 +2,18 @@ package udp import ( "bytes" + "context" + "crypto/rand" "encoding/binary" "io" "net" + "sync" "testing" + "time" "github.com/anacrolix/dht/v2/krpc" + _ "github.com/anacrolix/envpprof" + "github.com/anacrolix/missinggo/v2/iter" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/require" ) @@ -91,3 +97,41 @@ func TestConnClientLogDispatchUnknownTransactionId(t *testing.T) { _, err = pc.WriteTo(make([]byte, 30), &ccAddr) c.Assert(err, qt.IsNil) } + +func TestConnectionIdMismatch(t *testing.T) { + t.Skip("Server host returns consistent connection ID in limited tests and so isn't effective.") + cl, err := NewConnClient(NewConnClientOpts{ + // This host seems to return `Connection ID missmatch.\x00` every 2 minutes or so under + // heavy use. + Host: "tracker.torrent.eu.org:451", + //Host: "tracker.opentrackr.org:1337", + Network: "udp", + }) + c := qt.New(t) + c.Assert(err, qt.IsNil) + defer cl.Close() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + // Force every request to use a different connection ID. It's racey, but we want to get a + // different ID issued before a request can be sent with an old ID. + cl.Client.shouldReconnectOverride = func() bool { return true } + started := time.Now() + var wg sync.WaitGroup + for range iter.N(2) { + ar := AnnounceRequest{ + NumWant: -1, + Event: 2, + } + rand.Read(ar.InfoHash[:]) + rand.Read(ar.PeerId[:]) + //spew.Dump(ar) + wg.Add(1) + go func() { + defer wg.Done() + _, _, err := cl.Announce(ctx, ar, Options{}) + // I'm looking for `error response: "Connection ID missmatch.\x00"`. + t.Logf("announce error after %v: %v", time.Since(started), err) + }() + } + wg.Wait() +} -- 2.44.0