NOTES.md | 6 ++++++ client.go | 99 +++++++++++++++++++++++++++++++++++++++++++++++------ go.mod | 2 +- go.sum | 2 -- peer_protocol/ut-holepunch/err-code.go | 27 +++++++++++++++++++++++++++ peer_protocol/ut-holepunch/ut-holepunch.go | 8 -------- peerconn.go | 5 +++-- torrent.go | 97 ++++++++++++++++++++++++++++++++++++++++++----------- torrent_test.go | 6 +++--- ut-holepunching.go | 11 +++++++++++ ut-holepunching_test.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/NOTES.md b/NOTES.md index 37afb8c5c8202c64ac8d7c364c46a716f61ee81e..4b1f4fff4afce1a9ed8e1e3f25b895724f8dd6dd 100644 --- a/NOTES.md +++ b/NOTES.md @@ -11,3 +11,9 @@ * [A South American paper on peer-selection strategies for uploading](https://arxiv.org/pdf/1402.2187.pdf) Has some useful overviews of piece-selection. + +### Hole-punching + +Holepunching is tracked in Torrent, rather than in Client because if we send a rendezvous message, and subsequently receive a connect message, we do not know if a peer sent a rendezvous message to our relay and we're receiving the connect message for their rendezvous or ours. Relays are not required to respond to rendezvous, so we can't enforce a timeout. If we don't know if who sent the rendezvous that triggered a connect, then we don't know what infohash to use in the handshake. Once we send a rendezvous, and never receive a reply, we would have to always perform handshakes with our original infohash, or always copy the infohash the remote sends. Handling connects by always being the passive side in the handshake won't work since the other side might use the same behaviour and neither will initiate. + +If we only perform rendezvous through relays for the same torrent as the relay, then all the handshake can be done actively for all connect messages. All connect messages received from a peer can only be for the same torrent for which we are connected to the peer. \ No newline at end of file diff --git a/client.go b/client.go index 75960923d7e02b35a917348d375c4abe9c29de91..c2e01ee56ace962f1815859a69414bd082ab1414 100644 --- a/client.go +++ b/client.go @@ -19,8 +19,6 @@ "strconv" "strings" "time" - utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" - "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -45,6 +43,7 @@ "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" @@ -715,8 +714,77 @@ err = cl.initiateHandshakes(c, t) return } +func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error { + for { + switch { + case rz.gotConnect.IsSet(): + return nil + case len(rz.relays) == 0: + return errors.New("all relays failed") + case ctx.Err() != nil: + return context.Cause(ctx) + } + relayCond := rz.relayCond.Signaled() + cl.unlock() + select { + case <-rz.gotConnect.Done(): + case <-relayCond: + case <-ctx.Done(): + } + cl.lock() + } +} + // Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) { +func (cl *Client) initiateRendezvousConnect( + t *Torrent, addr PeerRemoteAddr, +) (ok bool, err error) { + holepunchAddr, err := addrPortFromPeerRemoteAddr(addr) + if err != nil { + return + } + cl.lock() + defer cl.unlock() + rz, err := t.startHolepunchRendezvous(holepunchAddr) + if err != nil { + return + } + if rz == nil { + return + } + ok = true + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err = cl.waitForRendezvousConnect(ctx, rz) + delete(t.utHolepunchRendezvous, holepunchAddr) + if err != nil { + err = fmt.Errorf("waiting for rendezvous connect signal: %w", err) + } + return +} + +// Returns nil connection and nil error if no connection could be established for valid reasons. +func (cl *Client) establishOutgoingConnEx( + opts outgoingConnOpts, + obfuscatedHeader bool, +) ( + _ *PeerConn, err error, +) { + t := opts.t + addr := opts.addr + var rzOk bool + if !opts.skipHolepunchRendezvous { + rzOk, err = cl.initiateRendezvousConnect(t, addr) + if err != nil { + err = fmt.Errorf("initiating rendezvous connect: %w", err) + } + } + if opts.requireRendezvous && !rzOk { + return nil, err + } + if err != nil { + log.Print(err) + } dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration { cl.rLock() defer cl.rUnlock() @@ -750,10 +818,10 @@ } // Returns nil connection and nil error if no connection could be established // for valid reasons. -func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) { +func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) { torrent.Add("establish outgoing connection", 1) obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred - c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst) + c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst) if err == nil { torrent.Add("initiated conn with preferred header obfuscation", 1) return @@ -765,7 +833,7 @@ // there's nothing else to try. return } // Try again with encryption if we didn't earlier, or without if we did. - c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst) + c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst) if err == nil { torrent.Add("initiated conn with fallback header obfuscation", 1) } @@ -773,11 +841,20 @@ // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err) return } +type outgoingConnOpts struct { + t *Torrent + addr PeerRemoteAddr + // Don't attempt to connect unless a connect message is received after initiating a rendezvous. + requireRendezvous bool + // Don't send rendezvous requests to eligible relays. + skipHolepunchRendezvous bool +} + // Called to dial out and run a connection. The addr we're given is already // considered half-open. -func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) { +func (cl *Client) outgoingConnection(opts outgoingConnOpts, ps PeerSource, trusted bool) { cl.dialRateLimiter.Wait(context.Background()) - c, err := cl.establishOutgoingConn(t, addr) + c, err := cl.establishOutgoingConn(opts) if err == nil { c.conn.SetWriteDeadline(time.Time{}) } @@ -785,17 +862,17 @@ cl.lock() defer cl.unlock() // Don't release lock between here and addPeerConn, unless it's for // failure. - cl.noLongerHalfOpen(t, addr.String()) + cl.noLongerHalfOpen(opts.t, opts.addr.String()) if err != nil { if cl.config.Debug { - cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", addr, err) + cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err) } return } defer c.close() c.Discovery = ps c.trusted = trusted - t.runHandshookConnLoggingErr(c) + opts.t.runHandshookConnLoggingErr(c) } // The port number for incoming peer connections. 0 if the client isn't listening. diff --git a/go.mod b/go.mod index cf5700a18b5243d9c981076946343b8aca24538f..9ea01b969420d297be873fe12aee712285d1078f 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ go.opentelemetry.io/otel v1.8.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0 go.opentelemetry.io/otel/sdk v1.8.0 go.opentelemetry.io/otel/trace v1.8.0 + golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d golang.org/x/time v0.0.0-20220609170525-579cf78fd858 ) @@ -98,7 +99,6 @@ go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 // indirect go.opentelemetry.io/proto/otlp v0.18.0 // indirect golang.org/x/crypto v0.5.0 // indirect - golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 4e9c7ec466a58915f1bd66ea079866dde6cb8c8c..3dbd985b1a5f13c3f7eaa81335de002af2514320 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,6 @@ github.com/anacrolix/envpprof v1.2.1 h1:25TJe6t/i0AfzzldiGFKCpD+s+dk8lONBcacJZB2rdE= github.com/anacrolix/envpprof v1.2.1/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4= github.com/anacrolix/fuse v0.2.0 h1:pc+To78kI2d/WUjIyrsdqeJQAesuwpGxlI3h1nAv3Do= github.com/anacrolix/fuse v0.2.0/go.mod h1:Kfu02xBwnySDpH3N23BmrP3MDfwAQGRLUCj6XyeOvBQ= -github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 h1:k4/h2B1gGF+PJGyGHxs8nmHHt1pzWXZWBj6jn4OBlRc= -github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68 h1:fyXlBfnlFzZSFckJ8QLb2lfmWfY++4RiUnae7ZMuv0A= github.com/anacrolix/generics v0.0.0-20230428105757-683593396d68/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= github.com/anacrolix/go-libutp v1.2.0 h1:sjxoB+/ARiKUR7IK/6wLWyADIBqGmu1fm0xo+8Yy7u0= diff --git a/peer_protocol/ut-holepunch/err-code.go b/peer_protocol/ut-holepunch/err-code.go new file mode 100644 index 0000000000000000000000000000000000000000..42b1db1adf3f31d8abbb707918d3de20d92e9c2a --- /dev/null +++ b/peer_protocol/ut-holepunch/err-code.go @@ -0,0 +1,27 @@ +package utHolepunch + +type ErrCode uint32 + +var _ error = ErrCode(0) + +const ( + NoSuchPeer ErrCode = iota + 1 + NotConnected + NoSupport + NoSelf +) + +func (ec ErrCode) Error() string { + switch ec { + case NoSuchPeer: + return "target endpoint is invalid" + case NotConnected: + return "the relaying peer is not connected to the target peer" + case NoSupport: + return "the target peer does not support the holepunch extension" + case NoSelf: + return "the target endpoint belongs to the relaying peer" + default: + panic(ec) + } +} diff --git a/peer_protocol/ut-holepunch/ut-holepunch.go b/peer_protocol/ut-holepunch/ut-holepunch.go index f3ff0c1908dd8cbf082266cdedccb9c2b4588b37..1436c43f22cb14201f5a6213e4473d8b34600f2e 100644 --- a/peer_protocol/ut-holepunch/ut-holepunch.go +++ b/peer_protocol/ut-holepunch/ut-holepunch.go @@ -17,7 +17,6 @@ ErrCode ErrCode } MsgType byte AddrType byte - ErrCode uint32 ) const ( @@ -29,13 +28,6 @@ const ( Ipv4 AddrType = iota Ipv6 AddrType = iota -) - -const ( - NoSuchPeer ErrCode = iota + 1 - NotConnected - NoSupport - NoSelf ) func (m *Msg) UnmarshalBinary(b []byte) error { diff --git a/peerconn.go b/peerconn.go index c45d87ff6d06e527f7f82a7c006014352c497d30..df792ae3053cfa6cdfa9153776a23d38d83ee5cc 100644 --- a/peerconn.go +++ b/peerconn.go @@ -14,8 +14,6 @@ "strconv" "strings" "time" - utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" - "github.com/RoaringBitmap/roaring" . "github.com/anacrolix/generics" "github.com/anacrolix/log" @@ -28,6 +26,7 @@ "github.com/anacrolix/torrent/internal/alloclim" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) // Maintains the state of a BitTorrent-protocol based connection with a peer. @@ -891,6 +890,8 @@ if err != nil { err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err) return } + err = c.t.handleReceivedUtHolepunchMsg(msg, c) + return default: return fmt.Errorf("unexpected extended message ID: %v", id) } diff --git a/torrent.go b/torrent.go index 285f5cc804cb07e159aed181c42aee2eff97c4e6..978619cc7dd9794a404fefd77005eba2e29cc08c 100644 --- a/torrent.go +++ b/torrent.go @@ -17,8 +17,6 @@ "text/tabwriter" "time" "unsafe" - utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" - "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" @@ -41,6 +39,7 @@ "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/storage" @@ -106,6 +105,8 @@ maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. halfOpen map[string]PeerInfo + // The final ess is not silent here as it's in the plural. + utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -1378,7 +1379,7 @@ if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns { return } p := t.peers.PopMax() - t.initiateConn(p) + t.initiateConn(p, false, false) initiated++ } return @@ -2361,7 +2362,11 @@ } // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not // sure all the PeerInfo fields are being used. -func (t *Torrent) initiateConn(peer PeerInfo) { +func (t *Torrent) initiateConn( + peer PeerInfo, + requireRendezvous bool, + skipHolepunchRendezvous bool, +) { if peer.Id == t.cl.peerID { return } @@ -2374,7 +2379,12 @@ return } t.cl.numHalfOpen++ t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) + go t.cl.outgoingConnection(outgoingConnOpts{ + t: t, + addr: peer.Addr, + requireRendezvous: requireRendezvous, + skipHolepunchRendezvous: skipHolepunchRendezvous, + }, peer.Source, peer.Trusted) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2701,21 +2711,24 @@ ExtendedPayload: extendedPayload, } } +func sendUtHolepunchMsg( + pc *PeerConn, + msgType utHolepunch.MsgType, + addrPort netip.AddrPort, + errCode utHolepunch.ErrCode, +) { + pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode)) +} + func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { switch msg.MsgType { case utHolepunch.Rendezvous: - sendMsg := func( - pc *PeerConn, - msgType utHolepunch.MsgType, - addrPort netip.AddrPort, - errCode utHolepunch.ErrCode, - ) { - pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode)) - } + log.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) + sendMsg := sendUtHolepunchMsg targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort) if len(targets) == 0 { sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) - break + return nil } for _, pc := range targets { if !pc.supportsExtension(utHolepunch.ExtensionName) { @@ -2725,14 +2738,60 @@ } sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0) } + return nil case utHolepunch.Connect: - t.initiateConn(PeerInfo{ - Addr: msg.AddrPort, - Source: PeerSourceUtHolepunch, - }) + log.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort) + rz, ok := t.utHolepunchRendezvous[msg.AddrPort] + if ok { + delete(rz.relays, sender) + rz.gotConnect.Set() + rz.relayCond.Broadcast() + } else { + // If the rendezvous was removed because we timed out or already got a connect signal, + // it doesn't hurt to try again. + t.initiateConn(PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + }, false, true) + } + return nil case utHolepunch.Error: - + rz, ok := t.utHolepunchRendezvous[msg.AddrPort] + if ok { + delete(rz.relays, sender) + rz.relayCond.Broadcast() + } + log.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + return nil default: return fmt.Errorf("unhandled msg type %v", msg.MsgType) } } + +func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) { + if MapContains(t.utHolepunchRendezvous, addrPort) { + err = errors.New("rendezvous already exists") + return + } + g.InitNew(&rz) + for pc := range t.conns { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + continue + } + if pc.supportsExtension(pp.ExtensionNamePex) { + if !g.MapContains(pc.pex.remoteLiveConns, addrPort) { + continue + } + } + sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) + MakeMapIfNilAndSet(&rz.relays, pc, struct{}{}) + } + if len(rz.relays) == 0 { + err = fmt.Errorf("no eligible relays") + return + } + if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) { + panic("expected to fail earlier if rendezvous already exists") + } + return +} diff --git a/torrent_test.go b/torrent_test.go index d2c7ba72057ac0af9d0f22eb94a30894c4cfb9f6..b62be706f642663e3016bfe03652e3e08937b2cc 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -9,7 +9,7 @@ "path/filepath" "sync" "testing" - "github.com/anacrolix/generics" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" @@ -232,12 +232,12 @@ logger: log.Default, gotMetainfoC: make(chan struct{}), } tt.setChunkSize(2) - generics.MakeMapIfNil(&tt.conns) + g.MakeMapIfNil(&tt.conns) pc := PeerConn{} pc.t = &tt pc.peerImpl = &pc pc.initRequestState() - generics.InitNew(&pc.callbacks) + g.InitNew(&pc.callbacks) tt.conns[&pc] = struct{}{} err = pc.peerSentHave(0) c.Assert(err, qt.IsNil) diff --git a/ut-holepunching.go b/ut-holepunching.go new file mode 100644 index 0000000000000000000000000000000000000000..08f0ac79e45c7818896f65306f1575f5c70d0033 --- /dev/null +++ b/ut-holepunching.go @@ -0,0 +1,11 @@ +package torrent + +import ( + "github.com/anacrolix/chansync" +) + +type utHolepunchRendezvous struct { + relays map[*PeerConn]struct{} + gotConnect chansync.SetOnce + relayCond chansync.BroadcastCond +} diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1b4b7d08c29aff12af0a0c5aff318b0dfe60be55 --- /dev/null +++ b/ut-holepunching_test.go @@ -0,0 +1,105 @@ +package torrent + +import ( + "os" + "sync" + "testing" + "testing/iotest" + + "github.com/anacrolix/torrent/internal/testutil" + qt "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Check that after completing leeching, a leecher transitions to a seeding +// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher. +func TestHolepunchConnect(t *testing.T) { + greetingTempDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingTempDir) + + cfg := TestingConfig(t) + cfg.Seed = true + cfg.MaxAllocPeerRequestDataPerConn = 4 + cfg.DataDir = greetingTempDir + cfg.DisablePEX = true + //cfg.Debug = true + cfg.AcceptPeerConnections = false + seeder, err := NewClient(cfg) + require.NoError(t, err) + defer seeder.Close() + defer testutil.ExportStatusWriter(seeder, "s", t)() + seederTorrent, ok, err := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + require.NoError(t, err) + assert.True(t, ok) + seederTorrent.VerifyData() + + cfg = TestingConfig(t) + cfg.Seed = true + cfg.DataDir = t.TempDir() + cfg.AlwaysWantConns = true + // This way the leecher leecher will still try to use this peer as a relay, but won't be told + // about the seeder via PEX. + //cfg.DisablePEX = true + //cfg.Debug = true + leecher, err := NewClient(cfg) + require.NoError(t, err) + defer leecher.Close() + defer testutil.ExportStatusWriter(leecher, "l", t)() + + cfg = TestingConfig(t) + cfg.Seed = false + cfg.DataDir = t.TempDir() + cfg.MaxAllocPeerRequestDataPerConn = 4 + cfg.Debug = true + //cfg.DisableUTP = true + leecherLeecher, _ := NewClient(cfg) + require.NoError(t, err) + defer leecherLeecher.Close() + defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)() + leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + return + }()) + _ = leecherGreeting + require.NoError(t, err) + assert.True(t, ok) + llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 3 + return + }()) + require.NoError(t, err) + assert.True(t, ok) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + r := llg.NewReader() + defer r.Close() + qt.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), qt.IsNil) + }() + go seederTorrent.AddClientPeer(leecher) + waitForConns(seederTorrent) + go llg.AddClientPeer(leecher) + waitForConns(llg) + llg.cl.lock() + llg.initiateConn(PeerInfo{ + Addr: seeder.ListenAddrs()[0], + }, true, false) + llg.cl.unlock() + wg.Wait() +} + +func waitForConns(t *Torrent) { + t.cl.lock() + defer t.cl.unlock() + for { + for range t.conns { + return + } + t.cl.event.Wait() + } +}