]> Sergey Matveev's repositories - btrtrc.git/blob - ut-holepunching_test.go
gorond test files
[btrtrc.git] / ut-holepunching_test.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "io"
8         "math/rand"
9         "net"
10         "os"
11         "sync"
12         "testing"
13         "testing/iotest"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo/v2/iter"
18         qt "github.com/frankban/quicktest"
19         "github.com/stretchr/testify/assert"
20         "github.com/stretchr/testify/require"
21
22         "github.com/anacrolix/torrent/internal/testutil"
23 )
24
25 // Check that after completing leeching, a leecher transitions to a seeding
26 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
27 func TestHolepunchConnect(t *testing.T) {
28         c := qt.New(t)
29         greetingTempDir, mi := testutil.GreetingTestTorrent()
30         defer os.RemoveAll(greetingTempDir)
31
32         cfg := TestingConfig(t)
33         cfg.Seed = true
34         cfg.MaxAllocPeerRequestDataPerConn = 4
35         cfg.DataDir = greetingTempDir
36         cfg.DisablePEX = true
37         cfg.Debug = true
38         cfg.AcceptPeerConnections = false
39         // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
40         // won't attempt to holepunch.
41         cfg.DisableTCP = true
42         seeder, err := NewClient(cfg)
43         require.NoError(t, err)
44         defer seeder.Close()
45         defer testutil.ExportStatusWriter(seeder, "s", t)()
46         seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
47         require.NoError(t, err)
48         assert.True(t, ok)
49         seederTorrent.VerifyData()
50
51         cfg = TestingConfig(t)
52         cfg.Seed = true
53         cfg.DataDir = t.TempDir()
54         cfg.AlwaysWantConns = true
55         // This way the leecher leecher will still try to use this peer as a relay, but won't be told
56         // about the seeder via PEX.
57         //cfg.DisablePEX = true
58         //cfg.Debug = true
59         leecher, err := NewClient(cfg)
60         require.NoError(t, err)
61         defer leecher.Close()
62         defer testutil.ExportStatusWriter(leecher, "l", t)()
63
64         cfg = TestingConfig(t)
65         cfg.Seed = false
66         cfg.DataDir = t.TempDir()
67         cfg.MaxAllocPeerRequestDataPerConn = 4
68         cfg.Debug = true
69         cfg.NominalDialTimeout = time.Second
70         //cfg.DisableUTP = true
71         leecherLeecher, _ := NewClient(cfg)
72         require.NoError(t, err)
73         defer leecherLeecher.Close()
74         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
75         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
76                 ret = TorrentSpecFromMetaInfo(mi)
77                 ret.ChunkSize = 2
78                 return
79         }())
80         _ = leecherGreeting
81         require.NoError(t, err)
82         assert.True(t, ok)
83         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
84                 ret = TorrentSpecFromMetaInfo(mi)
85                 ret.ChunkSize = 3
86                 return
87         }())
88         require.NoError(t, err)
89         assert.True(t, ok)
90
91         var wg sync.WaitGroup
92         wg.Add(1)
93         go func() {
94                 defer wg.Done()
95                 r := llg.NewReader()
96                 defer r.Close()
97                 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
98         }()
99         go seederTorrent.AddClientPeer(leecher)
100         waitForConns(seederTorrent)
101         go llg.AddClientPeer(leecher)
102         waitForConns(llg)
103         time.Sleep(time.Second)
104         llg.cl.lock()
105         targetAddr := seeder.ListenAddrs()[0]
106         log.Printf("trying to initiate to %v", targetAddr)
107         initiateConn(outgoingConnOpts{
108                 peerInfo: PeerInfo{
109                         Addr: targetAddr,
110                 },
111                 t:                       llg,
112                 requireRendezvous:       true,
113                 skipHolepunchRendezvous: false,
114                 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
115         }, true)
116         llg.cl.unlock()
117         wg.Wait()
118
119         c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
120         c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
121
122         llClientStats := leecherLeecher.Stats()
123         c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
124         c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
125         c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
126 }
127
128 func waitForConns(t *Torrent) {
129         t.cl.lock()
130         defer t.cl.unlock()
131         for {
132                 for range t.conns {
133                         return
134                 }
135                 t.cl.event.Wait()
136         }
137 }
138
139 // Show that dialling TCP will complete before the other side accepts.
140 func TestDialTcpNotAccepting(t *testing.T) {
141         l, err := net.Listen("tcp", "localhost:0")
142         c := qt.New(t)
143         c.Check(err, qt.IsNil)
144         defer l.Close()
145         dialedConn, err := net.Dial("tcp", l.Addr().String())
146         c.Assert(err, qt.IsNil)
147         dialedConn.Close()
148 }
149
150 func TestTcpSimultaneousOpen(t *testing.T) {
151         const network = "tcp"
152         ctx := context.Background()
153         makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
154                 dialer := net.Dialer{
155                         LocalAddr: &net.TCPAddr{
156                                 Port: localPort,
157                         },
158                 }
159                 return func() (net.Conn, error) {
160                         return dialer.DialContext(ctx, network, remoteAddr)
161                 }
162         }
163         c := qt.New(t)
164         first, second := randPortPair()
165         t.Logf("ports are %v and %v", first, second)
166         err := testSimultaneousOpen(
167                 c.Cleanup,
168                 makeDialer(first, fmt.Sprintf("localhost:%d", second)),
169                 makeDialer(second, fmt.Sprintf("localhost:%d", first)),
170         )
171         c.Assert(err, qt.IsNil)
172 }
173
174 func randIntInRange(low, high int) int {
175         return rand.Intn(high-low+1) + low
176 }
177
178 func randDynamicPort() int {
179         return randIntInRange(49152, 65535)
180 }
181
182 func randPortPair() (first int, second int) {
183         first = randDynamicPort()
184         for {
185                 second = randDynamicPort()
186                 if second != first {
187                         return
188                 }
189         }
190 }
191
192 func writeMsg(conn net.Conn) {
193         conn.Write([]byte(defaultMsg))
194         // Writing must be closed so the reader will get EOF and stop reading.
195         conn.Close()
196 }
197
198 func readMsg(conn net.Conn) error {
199         msgBytes, err := io.ReadAll(conn)
200         if err != nil {
201                 return err
202         }
203         msgStr := string(msgBytes)
204         if msgStr != defaultMsg {
205                 return fmt.Errorf("read %q", msgStr)
206         }
207         return nil
208 }
209
210 var errMsgNotReceived = errors.New("msg not received in time")
211
212 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
213 // the other, thereby showing that both dials obtained endpoints to the same connection.
214 func testSimultaneousOpen(
215         cleanup func(func()),
216         firstDialer, secondDialer func() (net.Conn, error),
217 ) error {
218         errs := make(chan error)
219         var dialsDone sync.WaitGroup
220         const numDials = 2
221         dialsDone.Add(numDials)
222         signal := make(chan struct{})
223         var dialersDone sync.WaitGroup
224         dialersDone.Add(numDials)
225         doDial := func(
226                 dialer func() (net.Conn, error),
227                 onSignal func(net.Conn),
228         ) {
229                 defer dialersDone.Done()
230                 conn, err := dialer()
231                 dialsDone.Done()
232                 errs <- err
233                 if err != nil {
234                         return
235                 }
236                 cleanup(func() {
237                         conn.Close()
238                 })
239                 <-signal
240                 onSignal(conn)
241                 //if err == nil {
242                 //      conn.Close()
243                 //}
244         }
245         go doDial(
246                 firstDialer,
247                 func(conn net.Conn) {
248                         writeMsg(conn)
249                         errs <- nil
250                 },
251         )
252         go doDial(
253                 secondDialer,
254                 func(conn net.Conn) {
255                         gotMsg := make(chan error, 1)
256                         go func() {
257                                 gotMsg <- readMsg(conn)
258                         }()
259                         select {
260                         case err := <-gotMsg:
261                                 errs <- err
262                         case <-time.After(time.Second):
263                                 errs <- errMsgNotReceived
264                         }
265                 },
266         )
267         dialsDone.Wait()
268         for range iter.N(numDials) {
269                 err := <-errs
270                 if err != nil {
271                         return err
272                 }
273         }
274         close(signal)
275         for range iter.N(numDials) {
276                 err := <-errs
277                 if err != nil {
278                         return err
279                 }
280         }
281         dialersDone.Wait()
282         return nil
283 }
284
285 const defaultMsg = "hello"
286
287 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
288 // get separate connections. This means that holepunch connect may result in an accept (and dial)
289 // for one or both peers involved.
290 func TestUtpSimultaneousOpen(t *testing.T) {
291         c := qt.New(t)
292         const network = "udp"
293         ctx := context.Background()
294         newUtpSocket := func(addr string) utpSocket {
295                 socket, err := NewUtpSocket(
296                         network,
297                         addr,
298                         func(net.Addr) bool {
299                                 return false
300                         },
301                         log.Default,
302                 )
303                 c.Assert(err, qt.IsNil)
304                 return socket
305         }
306         first := newUtpSocket("localhost:3000")
307         defer first.Close()
308         second := newUtpSocket("localhost:3001")
309         defer second.Close()
310         getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
311                 return func() (net.Conn, error) {
312                         return sock.DialContext(ctx, network, addr)
313                 }
314         }
315         err := testSimultaneousOpen(
316                 c.Cleanup,
317                 getDial(first, "localhost:3001"),
318                 getDial(second, "localhost:3000"),
319         )
320         c.Assert(err, qt.ErrorIs, errMsgNotReceived)
321 }
322
323 func testDirectDialMsg(c *qt.C, r, w net.Conn) {
324         go writeMsg(w)
325         err := readMsg(r)
326         c.Assert(err, qt.IsNil)
327 }
328
329 // Show that dialling one socket and accepting from the other results in them having ends of the
330 // same connection.
331 func TestUtpDirectDialMsg(t *testing.T) {
332         c := qt.New(t)
333         const network = "udp"
334         ctx := context.Background()
335         newUtpSocket := func(addr string) utpSocket {
336                 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
337                         return false
338                 }, log.Default)
339                 c.Assert(err, qt.IsNil)
340                 return socket
341         }
342         first := newUtpSocket("localhost:0")
343         defer first.Close()
344         second := newUtpSocket("localhost:0")
345         defer second.Close()
346         writer, err := first.DialContext(ctx, network, second.Addr().String())
347         c.Assert(err, qt.IsNil)
348         defer writer.Close()
349         reader, err := second.Accept()
350         defer reader.Close()
351         c.Assert(err, qt.IsNil)
352         testDirectDialMsg(c, reader, writer)
353 }