mu sync.Mutex
connId ConnectionId
connIdIssued time.Time
- Dispatcher *Dispatcher
- Writer io.Writer
+
+ shouldReconnectOverride func() bool
+
+ Dispatcher *Dispatcher
+ Writer io.Writer
}
func (cl *Client) Announce(
return
}
+func (cl *Client) shouldReconnectDefault() bool {
+ return cl.connIdIssued.IsZero() || time.Since(cl.connIdIssued) >= time.Minute
+}
+
+func (cl *Client) shouldReconnect() bool {
+ if cl.shouldReconnectOverride != nil {
+ return cl.shouldReconnectOverride()
+ }
+ return cl.shouldReconnectDefault()
+}
+
func (cl *Client) connect(ctx context.Context) (err error) {
- // We could get fancier here and use RWMutex, and even fire off the connection asynchronously
- // and provide a grace period while it resolves.
- cl.mu.Lock()
- defer cl.mu.Unlock()
- if !cl.connIdIssued.IsZero() && time.Since(cl.connIdIssued) < time.Minute {
+ if !cl.shouldReconnect() {
return nil
}
+ return cl.doConnectRoundTrip(ctx)
+}
+
+// This just does the connect request and updates local state if it succeeds.
+func (cl *Client) doConnectRoundTrip(ctx context.Context) (err error) {
respBody, _, err := cl.request(ctx, ActionConnect, nil)
if err != nil {
return err
}
cl.connId = connResp.ConnectionId
cl.connIdIssued = time.Now()
+ //log.Printf("conn id set to %x", cl.connId)
return
}
return
}
-func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
- var buf bytes.Buffer
- for n := 0; ; n++ {
- var connId ConnectionId
+func (cl *Client) writeRequest(
+ ctx context.Context, action Action, body []byte, tId TransactionId, buf *bytes.Buffer,
+) (
+ err error,
+) {
+ var connId ConnectionId
+ if action == ActionConnect {
+ connId = ConnectRequestConnectionId
+ } else {
+ // We lock here while establishing a connection ID, and then ensuring that the request is
+ // written before allowing the connection ID to change again. This is to ensure the server
+ // doesn't assign us another ID before we've sent this request. Note that this doesn't allow
+ // for us to return if the context is cancelled while we wait to obtain a new ID.
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
connId, err = cl.connIdForRequest(ctx, action)
if err != nil {
return
}
- buf.Reset()
- err = Write(&buf, RequestHeader{
- ConnectionId: connId,
- Action: action,
- TransactionId: tId,
- })
- if err != nil {
- panic(err)
- }
- buf.Write(body)
- _, err = cl.Writer.Write(buf.Bytes())
+ }
+ buf.Reset()
+ err = Write(buf, RequestHeader{
+ ConnectionId: connId,
+ Action: action,
+ TransactionId: tId,
+ })
+ if err != nil {
+ panic(err)
+ }
+ buf.Write(body)
+ _, err = cl.Writer.Write(buf.Bytes())
+ //log.Printf("sent request with conn id %x", connId)
+ return
+}
+
+func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
+ var buf bytes.Buffer
+ for n := 0; ; n++ {
+ err = cl.writeRequest(ctx, action, body, tId, &buf)
if err != nil {
return
}
import (
"bytes"
+ "context"
+ "crypto/rand"
"encoding/binary"
"io"
"net"
+ "sync"
"testing"
+ "time"
"github.com/anacrolix/dht/v2/krpc"
+ _ "github.com/anacrolix/envpprof"
+ "github.com/anacrolix/missinggo/v2/iter"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/require"
)
_, err = pc.WriteTo(make([]byte, 30), &ccAddr)
c.Assert(err, qt.IsNil)
}
+
+func TestConnectionIdMismatch(t *testing.T) {
+ t.Skip("Server host returns consistent connection ID in limited tests and so isn't effective.")
+ cl, err := NewConnClient(NewConnClientOpts{
+ // This host seems to return `Connection ID missmatch.\x00` every 2 minutes or so under
+ // heavy use.
+ Host: "tracker.torrent.eu.org:451",
+ //Host: "tracker.opentrackr.org:1337",
+ Network: "udp",
+ })
+ c := qt.New(t)
+ c.Assert(err, qt.IsNil)
+ defer cl.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+ // Force every request to use a different connection ID. It's racey, but we want to get a
+ // different ID issued before a request can be sent with an old ID.
+ cl.Client.shouldReconnectOverride = func() bool { return true }
+ started := time.Now()
+ var wg sync.WaitGroup
+ for range iter.N(2) {
+ ar := AnnounceRequest{
+ NumWant: -1,
+ Event: 2,
+ }
+ rand.Read(ar.InfoHash[:])
+ rand.Read(ar.PeerId[:])
+ //spew.Dump(ar)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, _, err := cl.Announce(ctx, ar, Options{})
+ // I'm looking for `error response: "Connection ID missmatch.\x00"`.
+ t.Logf("announce error after %v: %v", time.Since(started), err)
+ }()
+ }
+ wg.Wait()
+}