]> Sergey Matveev's repositories - btrtrc.git/blob - ut-holepunching_test.go
Relax TestTcpSimultaneousOpen
[btrtrc.git] / ut-holepunching_test.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "io"
8         "math/rand"
9         "net"
10         "os"
11         "sync"
12         "testing"
13         "testing/iotest"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo/v2/iter"
18         qt "github.com/frankban/quicktest"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21
22         "github.com/anacrolix/torrent/internal/testutil"
23 )
24
25 // Check that after completing leeching, a leecher transitions to a seeding
26 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
27 func TestHolepunchConnect(t *testing.T) {
28         c := qt.New(t)
29         greetingTempDir, mi := testutil.GreetingTestTorrent()
30         defer os.RemoveAll(greetingTempDir)
31
32         cfg := TestingConfig(t)
33         cfg.Seed = true
34         cfg.MaxAllocPeerRequestDataPerConn = 4
35         cfg.DataDir = greetingTempDir
36         cfg.DisablePEX = true
37         cfg.Debug = true
38         cfg.AcceptPeerConnections = false
39         // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
40         // won't attempt to holepunch.
41         cfg.DisableTCP = true
42         seeder, err := NewClient(cfg)
43         require.NoError(t, err)
44         defer seeder.Close()
45         defer testutil.ExportStatusWriter(seeder, "s", t)()
46         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
47         require.NoError(t, err)
48         assert.True(t, ok)
49         seederTorrent.VerifyData()
50
51         cfg = TestingConfig(t)
52         cfg.Seed = true
53         cfg.DataDir = t.TempDir()
54         cfg.AlwaysWantConns = true
55         // This way the leecher leecher will still try to use this peer as a relay, but won't be told
56         // about the seeder via PEX.
57         //cfg.DisablePEX = true
58         //cfg.Debug = true
59         leecher, err := NewClient(cfg)
60         require.NoError(t, err)
61         defer leecher.Close()
62         defer testutil.ExportStatusWriter(leecher, "l", t)()
63
64         cfg = TestingConfig(t)
65         cfg.Seed = false
66         cfg.DataDir = t.TempDir()
67         cfg.MaxAllocPeerRequestDataPerConn = 4
68         cfg.Debug = true
69         cfg.NominalDialTimeout = time.Second
70         //cfg.DisableUTP = true
71         leecherLeecher, _ := NewClient(cfg)
72         require.NoError(t, err)
73         defer leecherLeecher.Close()
74         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
75         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
76                 ret = TorrentSpecFromMetaInfo(mi)
77                 ret.ChunkSize = 2
78                 return
79         }())
80         _ = leecherGreeting
81         require.NoError(t, err)
82         assert.True(t, ok)
83         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
84                 ret = TorrentSpecFromMetaInfo(mi)
85                 ret.ChunkSize = 3
86                 return
87         }())
88         require.NoError(t, err)
89         assert.True(t, ok)
90
91         var wg sync.WaitGroup
92         wg.Add(1)
93         go func() {
94                 defer wg.Done()
95                 r := llg.NewReader()
96                 defer r.Close()
97                 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
98         }()
99         go seederTorrent.AddClientPeer(leecher)
100         waitForConns(seederTorrent)
101         go llg.AddClientPeer(leecher)
102         waitForConns(llg)
103         time.Sleep(time.Second)
104         llg.cl.lock()
105         targetAddr := seeder.ListenAddrs()[0]
106         log.Printf("trying to initiate to %v", targetAddr)
107         initiateConn(outgoingConnOpts{
108                 peerInfo: PeerInfo{
109                         Addr: targetAddr,
110                 },
111                 t:                       llg,
112                 requireRendezvous:       true,
113                 skipHolepunchRendezvous: false,
114                 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
115         }, true)
116         llg.cl.unlock()
117         wg.Wait()
118
119         c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
120         c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
121
122         llClientStats := leecherLeecher.Stats()
123         c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
124         c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
125         c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
126 }
127
128 func waitForConns(t *Torrent) {
129         t.cl.lock()
130         defer t.cl.unlock()
131         for {
132                 for range t.conns {
133                         return
134                 }
135                 t.cl.event.Wait()
136         }
137 }
138
139 // Show that dialling TCP will complete before the other side accepts.
140 func TestDialTcpNotAccepting(t *testing.T) {
141         l, err := net.Listen("tcp", "localhost:0")
142         c := qt.New(t)
143         c.Check(err, qt.IsNil)
144         defer l.Close()
145         dialedConn, err := net.Dial("tcp", l.Addr().String())
146         c.Assert(err, qt.IsNil)
147         dialedConn.Close()
148 }
149
150 func TestTcpSimultaneousOpen(t *testing.T) {
151         const network = "tcp"
152         ctx := context.Background()
153         makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
154                 dialer := net.Dialer{
155                         LocalAddr: &net.TCPAddr{
156                                 //IP:   net.IPv6loopback,
157                                 Port: localPort,
158                         },
159                 }
160                 return func() (net.Conn, error) {
161                         return dialer.DialContext(ctx, network, remoteAddr)
162                 }
163         }
164         c := qt.New(t)
165         // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
166         // perfectly synchronized simultaneous dials.
167         for range iter.N(10) {
168                 first, second := randPortPair()
169                 t.Logf("ports are %v and %v", first, second)
170                 err := testSimultaneousOpen(
171                         c.Cleanup,
172                         makeDialer(first, fmt.Sprintf("localhost:%d", second)),
173                         makeDialer(second, fmt.Sprintf("localhost:%d", first)),
174                 )
175                 if err == nil {
176                         return
177                 }
178                 // This proves that the connections are not the same.
179                 if errors.Is(err, errMsgNotReceived) {
180                         t.Fatal(err)
181                 }
182                 // Could be a timing issue, so try again.
183                 t.Log(err)
184         }
185         // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
186         t.Skip("couldn't synchronize dials")
187 }
188
189 func randIntInRange(low, high int) int {
190         return rand.Intn(high-low+1) + low
191 }
192
193 func randDynamicPort() int {
194         return randIntInRange(49152, 65535)
195 }
196
197 func randPortPair() (first int, second int) {
198         first = randDynamicPort()
199         for {
200                 second = randDynamicPort()
201                 if second != first {
202                         return
203                 }
204         }
205 }
206
207 func writeMsg(conn net.Conn) {
208         conn.Write([]byte(defaultMsg))
209         // Writing must be closed so the reader will get EOF and stop reading.
210         conn.Close()
211 }
212
213 func readMsg(conn net.Conn) error {
214         msgBytes, err := io.ReadAll(conn)
215         if err != nil {
216                 return err
217         }
218         msgStr := string(msgBytes)
219         if msgStr != defaultMsg {
220                 return fmt.Errorf("read %q", msgStr)
221         }
222         return nil
223 }
224
225 var errMsgNotReceived = errors.New("msg not received in time")
226
227 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
228 // the other, thereby showing that both dials obtained endpoints to the same connection.
229 func testSimultaneousOpen(
230         cleanup func(func()),
231         firstDialer, secondDialer func() (net.Conn, error),
232 ) error {
233         errs := make(chan error)
234         var dialsDone sync.WaitGroup
235         const numDials = 2
236         dialsDone.Add(numDials)
237         signal := make(chan struct{})
238         var dialersDone sync.WaitGroup
239         dialersDone.Add(numDials)
240         doDial := func(
241                 dialer func() (net.Conn, error),
242                 onSignal func(net.Conn),
243         ) {
244                 defer dialersDone.Done()
245                 conn, err := dialer()
246                 dialsDone.Done()
247                 errs <- err
248                 if err != nil {
249                         return
250                 }
251                 cleanup(func() {
252                         conn.Close()
253                 })
254                 <-signal
255                 onSignal(conn)
256                 //if err == nil {
257                 //      conn.Close()
258                 //}
259         }
260         go doDial(
261                 firstDialer,
262                 func(conn net.Conn) {
263                         writeMsg(conn)
264                         errs <- nil
265                 },
266         )
267         go doDial(
268                 secondDialer,
269                 func(conn net.Conn) {
270                         gotMsg := make(chan error, 1)
271                         go func() {
272                                 gotMsg <- readMsg(conn)
273                         }()
274                         select {
275                         case err := <-gotMsg:
276                                 errs <- err
277                         case <-time.After(time.Second):
278                                 errs <- errMsgNotReceived
279                         }
280                 },
281         )
282         dialsDone.Wait()
283         for range iter.N(numDials) {
284                 err := <-errs
285                 if err != nil {
286                         return err
287                 }
288         }
289         close(signal)
290         for range iter.N(numDials) {
291                 err := <-errs
292                 if err != nil {
293                         return err
294                 }
295         }
296         dialersDone.Wait()
297         return nil
298 }
299
300 const defaultMsg = "hello"
301
302 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
303 // get separate connections. This means that holepunch connect may result in an accept (and dial)
304 // for one or both peers involved.
305 func TestUtpSimultaneousOpen(t *testing.T) {
306         c := qt.New(t)
307         const network = "udp"
308         ctx := context.Background()
309         newUtpSocket := func(addr string) utpSocket {
310                 socket, err := NewUtpSocket(
311                         network,
312                         addr,
313                         func(net.Addr) bool {
314                                 return false
315                         },
316                         log.Default,
317                 )
318                 c.Assert(err, qt.IsNil)
319                 return socket
320         }
321         first := newUtpSocket("localhost:3000")
322         defer first.Close()
323         second := newUtpSocket("localhost:3001")
324         defer second.Close()
325         getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
326                 return func() (net.Conn, error) {
327                         return sock.DialContext(ctx, network, addr)
328                 }
329         }
330         err := testSimultaneousOpen(
331                 c.Cleanup,
332                 getDial(first, "localhost:3001"),
333                 getDial(second, "localhost:3000"),
334         )
335         c.Assert(err, qt.ErrorIs, errMsgNotReceived)
336 }
337
338 func testDirectDialMsg(c *qt.C, r, w net.Conn) {
339         go writeMsg(w)
340         err := readMsg(r)
341         c.Assert(err, qt.IsNil)
342 }
343
344 // Show that dialling one socket and accepting from the other results in them having ends of the
345 // same connection.
346 func TestUtpDirectDialMsg(t *testing.T) {
347         c := qt.New(t)
348         const network = "udp"
349         ctx := context.Background()
350         newUtpSocket := func(addr string) utpSocket {
351                 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
352                         return false
353                 }, log.Default)
354                 c.Assert(err, qt.IsNil)
355                 return socket
356         }
357         first := newUtpSocket("localhost:0")
358         defer first.Close()
359         second := newUtpSocket("localhost:0")
360         defer second.Close()
361         writer, err := first.DialContext(ctx, network, second.Addr().String())
362         c.Assert(err, qt.IsNil)
363         defer writer.Close()
364         reader, err := second.Accept()
365         defer reader.Close()
366         c.Assert(err, qt.IsNil)
367         testDirectDialMsg(c, reader, writer)
368 }