16 "github.com/anacrolix/log"
17 "github.com/anacrolix/missinggo/v2/iter"
18 qt "github.com/frankban/quicktest"
19 "github.com/stretchr/testify/assert"
20 "github.com/stretchr/testify/require"
21 "golang.org/x/time/rate"
23 "github.com/anacrolix/torrent/internal/testutil"
26 // Check that after completing leeching, a leecher transitions to a seeding
27 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
28 func TestHolepunchConnect(t *testing.T) {
30 greetingTempDir, mi := testutil.GreetingTestTorrent()
31 defer os.RemoveAll(greetingTempDir)
33 cfg := TestingConfig(t)
35 cfg.MaxAllocPeerRequestDataPerConn = 4
36 cfg.DataDir = greetingTempDir
39 cfg.AcceptPeerConnections = false
40 // Listening, even without accepting, still means the leecher-leecher completes the dial to the
41 // seeder, and so it won't attempt to holepunch.
43 // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
44 // have to allow the initial connection to the leecher though, so it can rendezvous for us.
45 cfg.DialRateLimiter = rate.NewLimiter(0, 1)
46 cfg.Logger = cfg.Logger.WithContextText("seeder")
47 seeder, err := NewClient(cfg)
48 require.NoError(t, err)
50 defer testutil.ExportStatusWriter(seeder, "s", t)()
51 seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
52 require.NoError(t, err)
54 seederTorrent.VerifyData()
56 cfg = TestingConfig(t)
58 cfg.DataDir = t.TempDir()
59 cfg.AlwaysWantConns = true
60 cfg.Logger = cfg.Logger.WithContextText("leecher")
61 // This way the leecher leecher will still try to use this peer as a relay, but won't be told
62 // about the seeder via PEX.
63 //cfg.DisablePEX = true
65 leecher, err := NewClient(cfg)
66 require.NoError(t, err)
68 defer testutil.ExportStatusWriter(leecher, "l", t)()
70 cfg = TestingConfig(t)
72 cfg.DataDir = t.TempDir()
73 cfg.MaxAllocPeerRequestDataPerConn = 4
75 cfg.NominalDialTimeout = time.Second
76 cfg.Logger = cfg.Logger.WithContextText("leecher-leecher")
77 //cfg.DisableUTP = true
78 leecherLeecher, _ := NewClient(cfg)
79 require.NoError(t, err)
80 defer leecherLeecher.Close()
81 defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
82 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
83 ret = TorrentSpecFromMetaInfo(mi)
88 require.NoError(t, err)
90 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
91 ret = TorrentSpecFromMetaInfo(mi)
95 require.NoError(t, err)
104 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
106 go seederTorrent.AddClientPeer(leecher)
107 waitForConns(seederTorrent)
108 go llg.AddClientPeer(leecher)
110 time.Sleep(time.Second)
112 targetAddr := seeder.ListenAddrs()[0]
113 log.Printf("trying to initiate to %v", targetAddr)
114 initiateConn(outgoingConnOpts{
119 requireRendezvous: true,
120 skipHolepunchRendezvous: false,
121 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
126 c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
127 c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
129 llClientStats := leecherLeecher.Stats()
130 c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
131 c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
132 c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
135 func waitForConns(t *Torrent) {
146 // Show that dialling TCP will complete before the other side accepts.
147 func TestDialTcpNotAccepting(t *testing.T) {
148 l, err := net.Listen("tcp", "localhost:0")
150 c.Check(err, qt.IsNil)
152 dialedConn, err := net.Dial("tcp", l.Addr().String())
153 c.Assert(err, qt.IsNil)
157 func TestTcpSimultaneousOpen(t *testing.T) {
158 const network = "tcp"
159 ctx := context.Background()
160 makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
161 dialer := net.Dialer{
162 LocalAddr: &net.TCPAddr{
163 //IP: net.IPv6loopback,
167 return func() (net.Conn, error) {
168 return dialer.DialContext(ctx, network, remoteAddr)
172 // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
173 // perfectly synchronized simultaneous dials.
174 for range iter.N(10) {
175 first, second := randPortPair()
176 t.Logf("ports are %v and %v", first, second)
177 err := testSimultaneousOpen(
179 makeDialer(first, fmt.Sprintf("localhost:%d", second)),
180 makeDialer(second, fmt.Sprintf("localhost:%d", first)),
185 // This proves that the connections are not the same.
186 if errors.Is(err, errMsgNotReceived) {
189 // Could be a timing issue, so try again.
192 // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
193 t.Skip("couldn't synchronize dials")
196 func randIntInRange(low, high int) int {
197 return rand.Intn(high-low+1) + low
200 func randDynamicPort() int {
201 return randIntInRange(49152, 65535)
204 func randPortPair() (first int, second int) {
205 first = randDynamicPort()
207 second = randDynamicPort()
214 func writeMsg(conn net.Conn) {
215 conn.Write([]byte(defaultMsg))
216 // Writing must be closed so the reader will get EOF and stop reading.
220 func readMsg(conn net.Conn) error {
221 msgBytes, err := io.ReadAll(conn)
225 msgStr := string(msgBytes)
226 if msgStr != defaultMsg {
227 return fmt.Errorf("read %q", msgStr)
232 var errMsgNotReceived = errors.New("msg not received in time")
234 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
235 // the other, thereby showing that both dials obtained endpoints to the same connection.
236 func testSimultaneousOpen(
237 cleanup func(func()),
238 firstDialer, secondDialer func() (net.Conn, error),
240 errs := make(chan error)
241 var dialsDone sync.WaitGroup
243 dialsDone.Add(numDials)
244 signal := make(chan struct{})
245 var dialersDone sync.WaitGroup
246 dialersDone.Add(numDials)
248 dialer func() (net.Conn, error),
249 onSignal func(net.Conn),
251 defer dialersDone.Done()
252 conn, err := dialer()
269 func(conn net.Conn) {
276 func(conn net.Conn) {
277 gotMsg := make(chan error, 1)
279 gotMsg <- readMsg(conn)
282 case err := <-gotMsg:
284 case <-time.After(time.Second):
285 errs <- errMsgNotReceived
290 for range iter.N(numDials) {
297 for range iter.N(numDials) {
307 const defaultMsg = "hello"
309 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
310 // get separate connections. This means that holepunch connect may result in an accept (and dial)
311 // for one or both peers involved.
312 func TestUtpSimultaneousOpen(t *testing.T) {
315 const network = "udp"
316 ctx := context.Background()
317 newUtpSocket := func(addr string) utpSocket {
318 socket, err := NewUtpSocket(
321 func(net.Addr) bool {
326 c.Assert(err, qt.IsNil)
329 first := newUtpSocket("localhost:0")
331 second := newUtpSocket("localhost:0")
333 getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
334 return func() (net.Conn, error) {
335 return sock.DialContext(ctx, network, addr)
338 t.Logf("first addr is %v. second addr is %v", first.Addr().String(), second.Addr().String())
339 for range iter.N(10) {
340 err := testSimultaneousOpen(
342 getDial(first, second.Addr().String()),
343 getDial(second, first.Addr().String()),
346 t.Fatal("expected utp to fail simultaneous open")
348 if errors.Is(err, errMsgNotReceived) {
351 skipGoUtpDialIssue(t, err)
353 time.Sleep(time.Second)
358 func writeAndReadMsg(r, w net.Conn) error {
363 func skipGoUtpDialIssue(t *testing.T, err error) {
364 if err.Error() == "timed out waiting for ack" {
365 t.Skip("anacrolix go utp implementation has issues. Use anacrolix/go-libutp by enabling CGO.")
369 // Show that dialling one socket and accepting from the other results in them having ends of the
371 func TestUtpDirectDialMsg(t *testing.T) {
374 const network = "udp4"
375 ctx := context.Background()
376 newUtpSocket := func(addr string) utpSocket {
377 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
380 c.Assert(err, qt.IsNil)
383 for range iter.N(10) {
384 err := func() error {
385 first := newUtpSocket("localhost:0")
387 second := newUtpSocket("localhost:0")
389 writer, err := first.DialContext(ctx, network, second.Addr().String())
394 reader, err := second.Accept()
396 c.Assert(err, qt.IsNil)
397 return writeAndReadMsg(reader, writer)
402 skipGoUtpDialIssue(t, err)
404 time.Sleep(time.Second)