16 "github.com/anacrolix/log"
17 "github.com/anacrolix/missinggo/v2/iter"
18 qt "github.com/frankban/quicktest"
19 "github.com/stretchr/testify/assert"
20 "github.com/stretchr/testify/require"
22 "github.com/anacrolix/torrent/internal/testutil"
25 // Check that after completing leeching, a leecher transitions to a seeding
26 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
27 func TestHolepunchConnect(t *testing.T) {
29 greetingTempDir, mi := testutil.GreetingTestTorrent()
30 defer os.RemoveAll(greetingTempDir)
32 cfg := TestingConfig(t)
34 cfg.MaxAllocPeerRequestDataPerConn = 4
35 cfg.DataDir = greetingTempDir
38 cfg.AcceptPeerConnections = false
39 // Listening, even without accepting, still means the leecher-leecher completes the dial to the seeder, and so it
40 // won't attempt to holepunch.
42 seeder, err := NewClient(cfg)
43 require.NoError(t, err)
45 defer testutil.ExportStatusWriter(seeder, "s", t)()
46 seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
47 require.NoError(t, err)
49 seederTorrent.VerifyData()
51 cfg = TestingConfig(t)
53 cfg.DataDir = t.TempDir()
54 cfg.AlwaysWantConns = true
55 // This way the leecher leecher will still try to use this peer as a relay, but won't be told
56 // about the seeder via PEX.
57 //cfg.DisablePEX = true
59 leecher, err := NewClient(cfg)
60 require.NoError(t, err)
62 defer testutil.ExportStatusWriter(leecher, "l", t)()
64 cfg = TestingConfig(t)
66 cfg.DataDir = t.TempDir()
67 cfg.MaxAllocPeerRequestDataPerConn = 4
69 cfg.NominalDialTimeout = time.Second
70 //cfg.DisableUTP = true
71 leecherLeecher, _ := NewClient(cfg)
72 require.NoError(t, err)
73 defer leecherLeecher.Close()
74 defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
75 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
76 ret = TorrentSpecFromMetaInfo(mi)
81 require.NoError(t, err)
83 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) {
84 ret = TorrentSpecFromMetaInfo(mi)
88 require.NoError(t, err)
97 qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil)
99 go seederTorrent.AddClientPeer(leecher)
100 waitForConns(seederTorrent)
101 go llg.AddClientPeer(leecher)
103 time.Sleep(time.Second)
105 targetAddr := seeder.ListenAddrs()[0]
106 log.Printf("trying to initiate to %v", targetAddr)
107 initiateConn(outgoingConnOpts{
112 requireRendezvous: true,
113 skipHolepunchRendezvous: false,
114 HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy,
119 c.Check(seeder.dialedSuccessfullyAfterHolepunchConnect, qt.Not(qt.HasLen), 0)
120 c.Check(leecherLeecher.probablyOnlyConnectedDueToHolepunch, qt.Not(qt.HasLen), 0)
122 llClientStats := leecherLeecher.Stats()
123 c.Check(llClientStats.NumPeersUndialableWithoutHolepunch, qt.Not(qt.Equals), 0)
124 c.Check(llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, qt.Not(qt.Equals), 0)
125 c.Check(llClientStats.NumPeersProbablyOnlyConnectedDueToHolepunch, qt.Not(qt.Equals), 0)
128 func waitForConns(t *Torrent) {
139 // Show that dialling TCP will complete before the other side accepts.
140 func TestDialTcpNotAccepting(t *testing.T) {
141 l, err := net.Listen("tcp", "localhost:0")
143 c.Check(err, qt.IsNil)
145 dialedConn, err := net.Dial("tcp", l.Addr().String())
146 c.Assert(err, qt.IsNil)
150 func TestTcpSimultaneousOpen(t *testing.T) {
151 const network = "tcp"
152 ctx := context.Background()
153 makeDialer := func(localPort int, remoteAddr string) func() (net.Conn, error) {
154 dialer := net.Dialer{
155 LocalAddr: &net.TCPAddr{
156 //IP: net.IPv6loopback,
160 return func() (net.Conn, error) {
161 return dialer.DialContext(ctx, network, remoteAddr)
165 // I really hate doing this in unit tests, but we would need to pick apart Dialer to get
166 // perfectly synchronized simultaneous dials.
167 for range iter.N(10) {
168 first, second := randPortPair()
169 t.Logf("ports are %v and %v", first, second)
170 err := testSimultaneousOpen(
172 makeDialer(first, fmt.Sprintf("localhost:%d", second)),
173 makeDialer(second, fmt.Sprintf("localhost:%d", first)),
178 // This proves that the connections are not the same.
179 if errors.Is(err, errMsgNotReceived) {
182 // Could be a timing issue, so try again.
185 // If we weren't able to get a simultaneous dial to occur, then we can't call it a failure.
186 t.Skip("couldn't synchronize dials")
189 func randIntInRange(low, high int) int {
190 return rand.Intn(high-low+1) + low
193 func randDynamicPort() int {
194 return randIntInRange(49152, 65535)
197 func randPortPair() (first int, second int) {
198 first = randDynamicPort()
200 second = randDynamicPort()
207 func writeMsg(conn net.Conn) {
208 conn.Write([]byte(defaultMsg))
209 // Writing must be closed so the reader will get EOF and stop reading.
213 func readMsg(conn net.Conn) error {
214 msgBytes, err := io.ReadAll(conn)
218 msgStr := string(msgBytes)
219 if msgStr != defaultMsg {
220 return fmt.Errorf("read %q", msgStr)
225 var errMsgNotReceived = errors.New("msg not received in time")
227 // Runs two dialers simultaneously, then sends a message on one connection and check it reads from
228 // the other, thereby showing that both dials obtained endpoints to the same connection.
229 func testSimultaneousOpen(
230 cleanup func(func()),
231 firstDialer, secondDialer func() (net.Conn, error),
233 errs := make(chan error)
234 var dialsDone sync.WaitGroup
236 dialsDone.Add(numDials)
237 signal := make(chan struct{})
238 var dialersDone sync.WaitGroup
239 dialersDone.Add(numDials)
241 dialer func() (net.Conn, error),
242 onSignal func(net.Conn),
244 defer dialersDone.Done()
245 conn, err := dialer()
262 func(conn net.Conn) {
269 func(conn net.Conn) {
270 gotMsg := make(chan error, 1)
272 gotMsg <- readMsg(conn)
275 case err := <-gotMsg:
277 case <-time.After(time.Second):
278 errs <- errMsgNotReceived
283 for range iter.N(numDials) {
290 for range iter.N(numDials) {
300 const defaultMsg = "hello"
302 // Show that uTP doesn't implement simultaneous open. When two sockets dial each other, they both
303 // get separate connections. This means that holepunch connect may result in an accept (and dial)
304 // for one or both peers involved.
305 func TestUtpSimultaneousOpen(t *testing.T) {
307 const network = "udp"
308 ctx := context.Background()
309 newUtpSocket := func(addr string) utpSocket {
310 socket, err := NewUtpSocket(
313 func(net.Addr) bool {
318 c.Assert(err, qt.IsNil)
321 first := newUtpSocket("localhost:3000")
323 second := newUtpSocket("localhost:3001")
325 getDial := func(sock utpSocket, addr string) func() (net.Conn, error) {
326 return func() (net.Conn, error) {
327 return sock.DialContext(ctx, network, addr)
330 err := testSimultaneousOpen(
332 getDial(first, "localhost:3001"),
333 getDial(second, "localhost:3000"),
335 c.Assert(err, qt.ErrorIs, errMsgNotReceived)
338 func testDirectDialMsg(c *qt.C, r, w net.Conn) {
341 c.Assert(err, qt.IsNil)
344 // Show that dialling one socket and accepting from the other results in them having ends of the
346 func TestUtpDirectDialMsg(t *testing.T) {
348 const network = "udp"
349 ctx := context.Background()
350 newUtpSocket := func(addr string) utpSocket {
351 socket, err := NewUtpSocket(network, addr, func(net.Addr) bool {
354 c.Assert(err, qt.IsNil)
357 first := newUtpSocket("localhost:0")
359 second := newUtpSocket("localhost:0")
361 writer, err := first.DialContext(ctx, network, second.Addr().String())
362 c.Assert(err, qt.IsNil)
364 reader, err := second.Accept()
366 c.Assert(err, qt.IsNil)
367 testDirectDialMsg(c, reader, writer)