]> Sergey Matveev's repositories - btrtrc.git/blob - ut-holepunching_test.go
Ignore dial rate limits for holepunch connects
[btrtrc.git] / ut-holepunching_test.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "io"
8         "math/rand"
9         "net"
10         "os"
11         "sync"
12         "testing"
13         "testing/iotest"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo/v2/iter"
18         qt "github.com/frankban/quicktest"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21         "golang.org/x/time/rate"
22
23         "github.com/anacrolix/torrent/internal/testutil"
24 )
25
26 // Check that after completing leeching, a leecher transitions to a seeding
27 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
28 func TestHolepunchConnect(t *testing.T) {
29         c := qt.New(t)
30         greetingTempDir, mi := testutil.GreetingTestTorrent()
31         defer os.RemoveAll(greetingTempDir)
32
33         cfg := TestingConfig(t)
34         cfg.Seed = true
35         cfg.MaxAllocPeerRequestDataPerConn = 4
36         cfg.DataDir = greetingTempDir
37         cfg.DisablePEX = true
38         cfg.Debug = true
39         cfg.AcceptPeerConnections = false
40         // Listening, even without accepting, still means the leecher-leecher completes the dial to the
41         // seeder, and so it won't attempt to holepunch.
42         cfg.DisableTCP = true
43         // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
44         // have to allow the initial connection to the leecher though, so it can rendezvous for us.
45         cfg.DialRateLimiter = rate.NewLimiter(0, 1)
46         seeder, err := NewClient(cfg)
47         require.NoError(t, err)
48         defer seeder.Close()
49         defer testutil.ExportStatusWriter(seeder, "s", t)()
50         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
51         require.NoError(t, err)
52         assert.True(t, ok)
53         seederTorrent.VerifyData()
54
55         cfg = TestingConfig(t)
56         cfg.Seed = true
57         cfg.DataDir = t.TempDir()
58         cfg.AlwaysWantConns = true
59         // This way the leecher leecher will still try to use this peer as a relay, but won't be told
60         // about the seeder via PEX.
61         //cfg.DisablePEX = true
62         //cfg.Debug = true
63         leecher, err := NewClient(cfg)
64         require.NoError(t, err)
65         defer leecher.Close()
66         defer testutil.ExportStatusWriter(leecher, "l", t)()
67
68         cfg = TestingConfig(t)
69         cfg.Seed = false
70         cfg.DataDir = t.TempDir()
71         cfg.MaxAllocPeerRequestDataPerConn = 4
72         //cfg.Debug = true
73         cfg.NominalDialTimeout = time.Second
74         //cfg.DisableUTP = true
75         leecherLeecher, _ := NewClient(cfg)
76         require.NoError(t, err)
77         defer leecherLeecher.Close()
78         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
79         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
80                 ret = TorrentSpecFromMetaInfo(mi)
81                 ret.ChunkSize = 2
82                 return
83         }())
84         _ = leecherGreeting
85         require.NoError(t, err)
86         assert.True(t, ok)
87         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
88                 ret = TorrentSpecFromMetaInfo(mi)
89                 ret.ChunkSize = 3
90                 return
91         }())
92         require.NoError(t, err)
93         assert.True(t, ok)
94
95         var wg sync.WaitGroup
96         wg.Add(1)
97         go func() {
98                 defer wg.Done()
99                 r := llg.NewReader()
100                 defer r.Close()
101                 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
102         }()
103         go seederTorrent.AddClientPeer(leecher)
104         waitForConns(seederTorrent)
105         go llg.AddClientPeer(leecher)
106         waitForConns(llg)
107         time.Sleep(time.Second)
108         llg.cl.lock()
109         targetAddr := seeder.ListenAddrs()[0]
110         log.Printf("trying to initiate to %v", targetAddr)
111         initiateConn(outgoingConnOpts{
112                 peerInfo: PeerInfo{
113                         Addr: targetAddr,
114                 },
115                 t:                       llg,
116                 requireRendezvous:       true,
117                 skipHolepunchRendezvous: false,
118                 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
119         }, true)
120         llg.cl.unlock()
121         wg.Wait()
122
123         c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
124         c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
125
126         llClientStats := leecherLeecher.Stats()
127         c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
128         c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
129         c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
130 }
131
132 func waitForConns(t *Torrent) {
133         t.cl.lock()
134         defer t.cl.unlock()
135         for {
136                 for range t.conns {
137                         return
138                 }
139                 t.cl.event.Wait()
140         }
141 }
142
143 // Show that dialling TCP will complete before the other side accepts.
144 func TestDialTcpNotAccepting(t *testing.T) {
145         l, err := net.Listen("tcp", "localhost:0")
146         c := qt.New(t)
147         c.Check(err, qt.IsNil)
148         defer l.Close()
149         dialedConn, err := net.Dial("tcp", l.Addr().String())
150         c.Assert(err, qt.IsNil)
151         dialedConn.Close()
152 }
153
154 func TestTcpSimultaneousOpen(t *testing.T) {
155         const network = "tcp"
156         ctx := context.Background()
157         makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
158                 dialer := net.Dialer{
159                         LocalAddr: &net.TCPAddr{
160                                 //IP:   net.IPv6loopback,
161                                 Port: localPort,
162                         },
163                 }
164                 return func() (net.Conn, error) {
165                         return dialer.DialContext(ctx, network, remoteAddr)
166                 }
167         }
168         c := qt.New(t)
169         // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
170         // perfectly synchronized simultaneous dials.
171         for range iter.N(10) {
172                 first, second := randPortPair()
173                 t.Logf("ports are %v and %v", first, second)
174                 err := testSimultaneousOpen(
175                         c.Cleanup,
176                         makeDialer(first, fmt.Sprintf("localhost:%d", second)),
177                         makeDialer(second, fmt.Sprintf("localhost:%d", first)),
178                 )
179                 if err == nil {
180                         return
181                 }
182                 // This proves that the connections are not the same.
183                 if errors.Is(err, errMsgNotReceived) {
184                         t.Fatal(err)
185                 }
186                 // Could be a timing issue, so try again.
187                 t.Log(err)
188         }
189         // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
190         t.Skip("couldn't synchronize dials")
191 }
192
193 func randIntInRange(low, high int) int {
194         return rand.Intn(high-low+1) + low
195 }
196
197 func randDynamicPort() int {
198         return randIntInRange(49152, 65535)
199 }
200
201 func randPortPair() (first int, second int) {
202         first = randDynamicPort()
203         for {
204                 second = randDynamicPort()
205                 if second != first {
206                         return
207                 }
208         }
209 }
210
211 func writeMsg(conn net.Conn) {
212         conn.Write([]byte(defaultMsg))
213         // Writing must be closed so the reader will get EOF and stop reading.
214         conn.Close()
215 }
216
217 func readMsg(conn net.Conn) error {
218         msgBytes, err := io.ReadAll(conn)
219         if err != nil {
220                 return err
221         }
222         msgStr := string(msgBytes)
223         if msgStr != defaultMsg {
224                 return fmt.Errorf("read %q", msgStr)
225         }
226         return nil
227 }
228
229 var errMsgNotReceived = errors.New("msg not received in time")
230
231 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
232 // the other, thereby showing that both dials obtained endpoints to the same connection.
233 func testSimultaneousOpen(
234         cleanup func(func()),
235         firstDialer, secondDialer func() (net.Conn, error),
236 ) error {
237         errs := make(chan error)
238         var dialsDone sync.WaitGroup
239         const numDials = 2
240         dialsDone.Add(numDials)
241         signal := make(chan struct{})
242         var dialersDone sync.WaitGroup
243         dialersDone.Add(numDials)
244         doDial := func(
245                 dialer func() (net.Conn, error),
246                 onSignal func(net.Conn),
247         ) {
248                 defer dialersDone.Done()
249                 conn, err := dialer()
250                 dialsDone.Done()
251                 errs <- err
252                 if err != nil {
253                         return
254                 }
255                 cleanup(func() {
256                         conn.Close()
257                 })
258                 <-signal
259                 onSignal(conn)
260                 //if err == nil {
261                 //      conn.Close()
262                 //}
263         }
264         go doDial(
265                 firstDialer,
266                 func(conn net.Conn) {
267                         writeMsg(conn)
268                         errs <- nil
269                 },
270         )
271         go doDial(
272                 secondDialer,
273                 func(conn net.Conn) {
274                         gotMsg := make(chan error, 1)
275                         go func() {
276                                 gotMsg <- readMsg(conn)
277                         }()
278                         select {
279                         case err := <-gotMsg:
280                                 errs <- err
281                         case <-time.After(time.Second):
282                                 errs <- errMsgNotReceived
283                         }
284                 },
285         )
286         dialsDone.Wait()
287         for range iter.N(numDials) {
288                 err := <-errs
289                 if err != nil {
290                         return err
291                 }
292         }
293         close(signal)
294         for range iter.N(numDials) {
295                 err := <-errs
296                 if err != nil {
297                         return err
298                 }
299         }
300         dialersDone.Wait()
301         return nil
302 }
303
304 const defaultMsg = "hello"
305
306 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
307 // get separate connections. This means that holepunch connect may result in an accept (and dial)
308 // for one or both peers involved.
309 func TestUtpSimultaneousOpen(t *testing.T) {
310         c := qt.New(t)
311         const network = "udp"
312         ctx := context.Background()
313         newUtpSocket := func(addr string) utpSocket {
314                 socket, err := NewUtpSocket(
315                         network,
316                         addr,
317                         func(net.Addr) bool {
318                                 return false
319                         },
320                         log.Default,
321                 )
322                 c.Assert(err, qt.IsNil)
323                 return socket
324         }
325         first := newUtpSocket("localhost:3000")
326         defer first.Close()
327         second := newUtpSocket("localhost:3001")
328         defer second.Close()
329         getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
330                 return func() (net.Conn, error) {
331                         return sock.DialContext(ctx, network, addr)
332                 }
333         }
334         err := testSimultaneousOpen(
335                 c.Cleanup,
336                 getDial(first, "localhost:3001"),
337                 getDial(second, "localhost:3000"),
338         )
339         c.Assert(err, qt.ErrorIs, errMsgNotReceived)
340 }
341
342 func testDirectDialMsg(c *qt.C, r, w net.Conn) {
343         go writeMsg(w)
344         err := readMsg(r)
345         c.Assert(err, qt.IsNil)
346 }
347
348 // Show that dialling one socket and accepting from the other results in them having ends of the
349 // same connection.
350 func TestUtpDirectDialMsg(t *testing.T) {
351         c := qt.New(t)
352         const network = "udp"
353         ctx := context.Background()
354         newUtpSocket := func(addr string) utpSocket {
355                 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
356                         return false
357                 }, log.Default)
358                 c.Assert(err, qt.IsNil)
359                 return socket
360         }
361         first := newUtpSocket("localhost:0")
362         defer first.Close()
363         second := newUtpSocket("localhost:0")
364         defer second.Close()
365         writer, err := first.DialContext(ctx, network, second.Addr().String())
366         c.Assert(err, qt.IsNil)
367         defer writer.Close()
368         reader, err := second.Accept()
369         defer reader.Close()
370         c.Assert(err, qt.IsNil)
371         testDirectDialMsg(c, reader, writer)
372 }