]> Sergey Matveev's repositories - btrtrc.git/blob - ut-holepunching_test.go
Add sqlite storage build conditions to sqlite storage closed test
[btrtrc.git] / ut-holepunching_test.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "io"
8         "math/rand"
9         "net"
10         "os"
11         "sync"
12         "testing"
13         "testing/iotest"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo/v2/iter"
18         qt "github.com/frankban/quicktest"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21         "golang.org/x/time/rate"
22
23         "github.com/anacrolix/torrent/internal/testutil"
24 )
25
26 // Check that after completing leeching, a leecher transitions to a seeding
27 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
28 func TestHolepunchConnect(t *testing.T) {
29         c := qt.New(t)
30         greetingTempDir, mi := testutil.GreetingTestTorrent()
31         defer os.RemoveAll(greetingTempDir)
32
33         cfg := TestingConfig(t)
34         cfg.Seed = true
35         cfg.MaxAllocPeerRequestDataPerConn = 4
36         cfg.DataDir = greetingTempDir
37         cfg.DisablePEX = true
38         cfg.Debug = true
39         cfg.AcceptPeerConnections = false
40         // Listening, even without accepting, still means the leecher-leecher completes the dial to the
41         // seeder, and so it won't attempt to holepunch.
42         cfg.DisableTCP = true
43         // Ensure that responding to holepunch connects don't wait around for the dial limit. We also
44         // have to allow the initial connection to the leecher though, so it can rendezvous for us.
45         cfg.DialRateLimiter = rate.NewLimiter(0, 1)
46         cfg.Logger = cfg.Logger.WithContextText("seeder")
47         seeder, err := NewClient(cfg)
48         require.NoError(t, err)
49         defer seeder.Close()
50         defer testutil.ExportStatusWriter(seeder, "s", t)()
51         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
52         require.NoError(t, err)
53         assert.True(t, ok)
54         seederTorrent.VerifyData()
55
56         cfg = TestingConfig(t)
57         cfg.Seed = true
58         cfg.DataDir = t.TempDir()
59         cfg.AlwaysWantConns = true
60         cfg.Logger = cfg.Logger.WithContextText("leecher")
61         // This way the leecher leecher will still try to use this peer as a relay, but won't be told
62         // about the seeder via PEX.
63         //cfg.DisablePEX = true
64         cfg.Debug = true
65         leecher, err := NewClient(cfg)
66         require.NoError(t, err)
67         defer leecher.Close()
68         defer testutil.ExportStatusWriter(leecher, "l", t)()
69
70         cfg = TestingConfig(t)
71         cfg.Seed = false
72         cfg.DataDir = t.TempDir()
73         cfg.MaxAllocPeerRequestDataPerConn = 4
74         cfg.Debug = true
75         cfg.NominalDialTimeout = time.Second
76         cfg.Logger = cfg.Logger.WithContextText("leecher-leecher")
77         //cfg.DisableUTP = true
78         leecherLeecher, _ := NewClient(cfg)
79         require.NoError(t, err)
80         defer leecherLeecher.Close()
81         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
82         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
83                 ret = TorrentSpecFromMetaInfo(mi)
84                 ret.ChunkSize = 2
85                 return
86         }())
87         _ = leecherGreeting
88         require.NoError(t, err)
89         assert.True(t, ok)
90         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
91                 ret = TorrentSpecFromMetaInfo(mi)
92                 ret.ChunkSize = 3
93                 return
94         }())
95         require.NoError(t, err)
96         assert.True(t, ok)
97
98         var wg sync.WaitGroup
99         wg.Add(1)
100         go func() {
101                 defer wg.Done()
102                 r := llg.NewReader()
103                 defer r.Close()
104                 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
105         }()
106         go seederTorrent.AddClientPeer(leecher)
107         waitForConns(seederTorrent)
108         go llg.AddClientPeer(leecher)
109         waitForConns(llg)
110         time.Sleep(time.Second)
111         llg.cl.lock()
112         targetAddr := seeder.ListenAddrs()[0]
113         log.Printf("trying to initiate to %v", targetAddr)
114         initiateConn(outgoingConnOpts{
115                 peerInfo: PeerInfo{
116                         Addr: targetAddr,
117                 },
118                 t:                       llg,
119                 requireRendezvous:       true,
120                 skipHolepunchRendezvous: false,
121                 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
122         }, true)
123         llg.cl.unlock()
124         wg.Wait()
125
126         c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
127         c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
128
129         llClientStats := leecherLeecher.Stats()
130         c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
131         c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
132         c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
133 }
134
135 func waitForConns(t *Torrent) {
136         t.cl.lock()
137         defer t.cl.unlock()
138         for {
139                 for range t.conns {
140                         return
141                 }
142                 t.cl.event.Wait()
143         }
144 }
145
146 // Show that dialling TCP will complete before the other side accepts.
147 func TestDialTcpNotAccepting(t *testing.T) {
148         l, err := net.Listen("tcp", "localhost:0")
149         c := qt.New(t)
150         c.Check(err, qt.IsNil)
151         defer l.Close()
152         dialedConn, err := net.Dial("tcp", l.Addr().String())
153         c.Assert(err, qt.IsNil)
154         dialedConn.Close()
155 }
156
157 func TestTcpSimultaneousOpen(t *testing.T) {
158         const network = "tcp"
159         ctx := context.Background()
160         makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
161                 dialer := net.Dialer{
162                         LocalAddr: &net.TCPAddr{
163                                 //IP:   net.IPv6loopback,
164                                 Port: localPort,
165                         },
166                 }
167                 return func() (net.Conn, error) {
168                         return dialer.DialContext(ctx, network, remoteAddr)
169                 }
170         }
171         c := qt.New(t)
172         // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
173         // perfectly synchronized simultaneous dials.
174         for range iter.N(10) {
175                 first, second := randPortPair()
176                 t.Logf("ports are %v and %v", first, second)
177                 err := testSimultaneousOpen(
178                         c.Cleanup,
179                         makeDialer(first, fmt.Sprintf("localhost:%d", second)),
180                         makeDialer(second, fmt.Sprintf("localhost:%d", first)),
181                 )
182                 if err == nil {
183                         return
184                 }
185                 // This proves that the connections are not the same.
186                 if errors.Is(err, errMsgNotReceived) {
187                         t.Fatal(err)
188                 }
189                 // Could be a timing issue, so try again.
190                 t.Log(err)
191         }
192         // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
193         t.Skip("couldn't synchronize dials")
194 }
195
196 func randIntInRange(low, high int) int {
197         return rand.Intn(high-low+1) + low
198 }
199
200 func randDynamicPort() int {
201         return randIntInRange(49152, 65535)
202 }
203
204 func randPortPair() (first int, second int) {
205         first = randDynamicPort()
206         for {
207                 second = randDynamicPort()
208                 if second != first {
209                         return
210                 }
211         }
212 }
213
214 func writeMsg(conn net.Conn) {
215         conn.Write([]byte(defaultMsg))
216         // Writing must be closed so the reader will get EOF and stop reading.
217         conn.Close()
218 }
219
220 func readMsg(conn net.Conn) error {
221         msgBytes, err := io.ReadAll(conn)
222         if err != nil {
223                 return err
224         }
225         msgStr := string(msgBytes)
226         if msgStr != defaultMsg {
227                 return fmt.Errorf("read %q", msgStr)
228         }
229         return nil
230 }
231
232 var errMsgNotReceived = errors.New("msg not received in time")
233
234 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
235 // the other, thereby showing that both dials obtained endpoints to the same connection.
236 func testSimultaneousOpen(
237         cleanup func(func()),
238         firstDialer, secondDialer func() (net.Conn, error),
239 ) error {
240         errs := make(chan error)
241         var dialsDone sync.WaitGroup
242         const numDials = 2
243         dialsDone.Add(numDials)
244         signal := make(chan struct{})
245         var dialersDone sync.WaitGroup
246         dialersDone.Add(numDials)
247         doDial := func(
248                 dialer func() (net.Conn, error),
249                 onSignal func(net.Conn),
250         ) {
251                 defer dialersDone.Done()
252                 conn, err := dialer()
253                 dialsDone.Done()
254                 errs <- err
255                 if err != nil {
256                         return
257                 }
258                 cleanup(func() {
259                         conn.Close()
260                 })
261                 <-signal
262                 onSignal(conn)
263                 //if err == nil {
264                 //      conn.Close()
265                 //}
266         }
267         go doDial(
268                 firstDialer,
269                 func(conn net.Conn) {
270                         writeMsg(conn)
271                         errs <- nil
272                 },
273         )
274         go doDial(
275                 secondDialer,
276                 func(conn net.Conn) {
277                         gotMsg := make(chan error, 1)
278                         go func() {
279                                 gotMsg <- readMsg(conn)
280                         }()
281                         select {
282                         case err := <-gotMsg:
283                                 errs <- err
284                         case <-time.After(time.Second):
285                                 errs <- errMsgNotReceived
286                         }
287                 },
288         )
289         dialsDone.Wait()
290         for range iter.N(numDials) {
291                 err := <-errs
292                 if err != nil {
293                         return err
294                 }
295         }
296         close(signal)
297         for range iter.N(numDials) {
298                 err := <-errs
299                 if err != nil {
300                         return err
301                 }
302         }
303         dialersDone.Wait()
304         return nil
305 }
306
307 const defaultMsg = "hello"
308
309 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
310 // get separate connections. This means that holepunch connect may result in an accept (and dial)
311 // for one or both peers involved.
312 func TestUtpSimultaneousOpen(t *testing.T) {
313         t.Parallel()
314         c := qt.New(t)
315         const network = "udp"
316         ctx := context.Background()
317         newUtpSocket := func(addr string) utpSocket {
318                 socket, err := NewUtpSocket(
319                         network,
320                         addr,
321                         func(net.Addr) bool {
322                                 return false
323                         },
324                         log.Default,
325                 )
326                 c.Assert(err, qt.IsNil)
327                 return socket
328         }
329         first := newUtpSocket("localhost:0")
330         defer first.Close()
331         second := newUtpSocket("localhost:0")
332         defer second.Close()
333         getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
334                 return func() (net.Conn, error) {
335                         return sock.DialContext(ctx, network, addr)
336                 }
337         }
338         t.Logf("first addr is %v. second addr is %v", first.Addr().String(), second.Addr().String())
339         for range iter.N(10) {
340                 err := testSimultaneousOpen(
341                         c.Cleanup,
342                         getDial(first, second.Addr().String()),
343                         getDial(second, first.Addr().String()),
344                 )
345                 if err == nil {
346                         t.Fatal("expected utp to fail simultaneous open")
347                 }
348                 if errors.Is(err, errMsgNotReceived) {
349                         return
350                 }
351                 skipGoUtpDialIssue(t, err)
352                 t.Log(err)
353                 time.Sleep(time.Second)
354         }
355         t.FailNow()
356 }
357
358 func writeAndReadMsg(r, w net.Conn) error {
359         go writeMsg(w)
360         return readMsg(r)
361 }
362
363 func skipGoUtpDialIssue(t *testing.T, err error) {
364         if err.Error() == "timed out waiting for ack" {
365                 t.Skip("anacrolix go utp implementation has issues. Use anacrolix/go-libutp by enabling CGO.")
366         }
367 }
368
369 // Show that dialling one socket and accepting from the other results in them having ends of the
370 // same connection.
371 func TestUtpDirectDialMsg(t *testing.T) {
372         t.Parallel()
373         c := qt.New(t)
374         const network = "udp4"
375         ctx := context.Background()
376         newUtpSocket := func(addr string) utpSocket {
377                 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
378                         return false
379                 }, log.Default)
380                 c.Assert(err, qt.IsNil)
381                 return socket
382         }
383         for range iter.N(10) {
384                 err := func() error {
385                         first := newUtpSocket("localhost:0")
386                         defer first.Close()
387                         second := newUtpSocket("localhost:0")
388                         defer second.Close()
389                         writer, err := first.DialContext(ctx, network, second.Addr().String())
390                         if err != nil {
391                                 return err
392                         }
393                         defer writer.Close()
394                         reader, err := second.Accept()
395                         defer reader.Close()
396                         c.Assert(err, qt.IsNil)
397                         return writeAndReadMsg(reader, writer)
398                 }()
399                 if err == nil {
400                         return
401                 }
402                 skipGoUtpDialIssue(t, err)
403                 t.Log(err)
404                 time.Sleep(time.Second)
405         }
406         t.FailNow()
407 }