From a11739a667be8c5bfaad2cf73417eba0e9adab2c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 11 May 2023 13:03:54 +1000 Subject: [PATCH] Attempt holepunch after initial dial fails --- client.go | 269 +++++++++++++------------------------- dial-pool.go | 43 ++++++ peer_protocol/extended.go | 10 +- torrent.go | 103 +++++++-------- ut-holepunching.go | 10 -- ut-holepunching_test.go | 45 ++++--- 6 files changed, 224 insertions(+), 256 deletions(-) create mode 100644 dial-pool.go diff --git a/client.go b/client.go index 4c2d9123..95c21aab 100644 --- a/client.go +++ b/client.go @@ -19,8 +19,6 @@ import ( "strconv" "time" - "github.com/anacrolix/torrent/internal/panicif" - "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -621,51 +619,14 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) { // Returns a connection over UTP or TCP, whichever is first to connect. func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) { - { - t := perf.NewTimer(perf.CallerName(0)) - defer func() { - if res.Conn == nil { - t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err())) - } else { - t.Mark("returned conn over " + res.Dialer.DialerNetwork()) - } - }() + pool := dialPool{ + addr: addr, } - ctx, cancel := context.WithCancel(ctx) - // As soon as we return one connection, cancel the others. - defer cancel() - left := 0 - resCh := make(chan DialResult, left) + defer pool.startDrainer() for _, _s := range dialers { - left++ - s := _s - go func() { - resCh <- DialResult{ - dialFromSocket(ctx, s, addr), - s, - } - }() + pool.add(ctx, _s) } - // Wait for a successful connection. - func() { - defer perf.ScopeTimer()() - for ; left > 0 && res.Conn == nil; left-- { - res = <-resCh - } - }() - // There are still uncompleted dials. - go func() { - for ; left > 0; left-- { - conn := (<-resCh).Conn - if conn != nil { - conn.Close() - } - } - }() - if res.Conn != nil { - go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) - } - return res + return pool.getFirst() } func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { @@ -732,119 +693,18 @@ func (cl *Client) initiateProtocolHandshakes( return } -func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error { - for { - switch { - case rz.gotConnect.IsSet(): - return nil - case len(rz.relays) == 0: - return errors.New("all relays failed") - case ctx.Err() != nil: - return context.Cause(ctx) - } - relayCond := rz.relayCond.Signaled() - cl.unlock() - select { - case <-rz.gotConnect.Done(): - case <-relayCond: - case <-ctx.Done(): - } - cl.lock() - } -} - -// Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) initiateRendezvousConnect( - t *Torrent, holepunchAddr netip.AddrPort, -) (ok bool, err error) { - cl.lock() - defer cl.unlock() - rz, err := t.startHolepunchRendezvous(holepunchAddr) - if err != nil { - return - } - if rz == nil { - return - } - ok = true - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - err = cl.waitForRendezvousConnect(ctx, rz) - delete(t.utHolepunchRendezvous, holepunchAddr) - if err != nil { - err = fmt.Errorf("waiting for rendezvous connect signal: %w", err) - } - return -} - -// Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) establishOutgoingConnEx( - opts outgoingConnOpts, +func doProtocolHandshakeOnDialResult( + t *Torrent, obfuscatedHeader bool, + addr PeerRemoteAddr, + dr DialResult, ) ( - _ *PeerConn, err error, + c *PeerConn, err error, ) { - t := opts.t - addr := opts.addr - holepunchAddr, err := addrPortFromPeerRemoteAddr(addr) - var sentRendezvous bool - if err == nil { - if !opts.skipHolepunchRendezvous { - sentRendezvous, err = cl.initiateRendezvousConnect(t, holepunchAddr) - if err != nil { - err = fmt.Errorf("initiating rendezvous connect: %w", err) - } - } - } - gotHolepunchConnect := (err == nil && sentRendezvous) || opts.receivedHolepunchConnect - if opts.requireRendezvous && !sentRendezvous { - return nil, err - } - if err != nil { - t.logger.Print(err) - } - dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration { - cl.rLock() - defer cl.rUnlock() - return t.dialTimeout() - }()) - defer cancel() - dr := cl.dialFirst(dialCtx, addr.String()) + cl := t.cl nc := dr.Conn - cl.lock() - if gotHolepunchConnect && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { - g.MakeMapIfNilAndSet( - &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, - holepunchAddr, - struct{}{}, - ) - } - cl.unlock() - if nc == nil { - if !sentRendezvous && !gotHolepunchConnect { - cl.lock() - g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) - cl.unlock() - } - if dialCtx.Err() != nil { - return nil, fmt.Errorf("dialing: %w", dialCtx.Err()) - } - return nil, errors.New("dial failed") - } - if gotHolepunchConnect { - panicif.False(holepunchAddr.IsValid()) - cl.lock() - if g.MapContains(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, holepunchAddr) { - g.MakeMapIfNilAndSet( - &cl.dialableOnlyAfterHolepunch, - holepunchAddr, - struct{}{}, - ) - } - cl.unlock() - } addrIpPort, _ := tryIpPortFromNetAddr(addr) - c, err := cl.initiateProtocolHandshakes( + c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ outgoing: true, @@ -860,69 +720,128 @@ func (cl *Client) establishOutgoingConnEx( return c, err } -// Returns nil connection and nil error if no connection could be established -// for valid reasons. -func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) { +// Returns nil connection and nil error if no connection could be established for valid reasons. +func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) { torrent.Add("establish outgoing connection", 1) - obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred - c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst) + addr := opts.peerInfo.Addr + dialPool := dialPool{ + resCh: make(chan DialResult), + addr: addr.String(), + } + defer dialPool.startDrainer() + dialTimeout := opts.t.getDialTimeoutUnlocked() + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + for _, d := range cl.dialers { + dialPool.add(ctx, d) + } + } + holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr) + if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect { + g.MakeMapIfNilAndSet( + &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, + holepunchAddr, + struct{}{}, + ) + } + headerObfuscationPolicy := opts.HeaderObfuscationPolicy + obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred + firstDialResult := dialPool.getFirst() + if firstDialResult.Conn == nil { + // No dialers worked. Try to initiate a holepunching rendezvous. + if holepunchAddrErr == nil { + if !opts.receivedHolepunchConnect { + cl.lock() + g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) + cl.unlock() + } + opts.t.startHolepunchRendezvous(holepunchAddr) + } + err = fmt.Errorf("all initial dials failed") + return + } + if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{}) + } + c, err = doProtocolHandshakeOnDialResult( + opts.t, + obfuscatedHeaderFirst, + addr, + firstDialResult, + ) if err == nil { torrent.Add("initiated conn with preferred header obfuscation", 1) return } - // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err) - if cl.config.HeaderObfuscationPolicy.RequirePreferred { - // We should have just tried with the preferred header obfuscation. If it was required, - // there's nothing else to try. + // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try. + if headerObfuscationPolicy.RequirePreferred { + return + } + // Reuse the dialer that returned already but failed to handshake. + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + dialPool.add(ctx, firstDialResult.Dialer) + } + secondDialResult := dialPool.getFirst() + if secondDialResult.Conn == nil { return } - // Try again with encryption if we didn't earlier, or without if we did. - c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst) + c, err = doProtocolHandshakeOnDialResult( + opts.t, + !obfuscatedHeaderFirst, + addr, + secondDialResult, + ) if err == nil { torrent.Add("initiated conn with fallback header obfuscation", 1) + return } - // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err) return } type outgoingConnOpts struct { - t *Torrent - addr PeerRemoteAddr + peerInfo PeerInfo + t *Torrent // Don't attempt to connect unless a connect message is received after initiating a rendezvous. requireRendezvous bool // Don't send rendezvous requests to eligible relays. skipHolepunchRendezvous bool // Outgoing connection attempt is in response to holepunch connect message. receivedHolepunchConnect bool + HeaderObfuscationPolicy HeaderObfuscationPolicy } // Called to dial out and run a connection. The addr we're given is already // considered half-open. func (cl *Client) outgoingConnection( opts outgoingConnOpts, - ps PeerSource, - trusted bool, attemptKey outgoingConnAttemptKey, ) { cl.dialRateLimiter.Wait(context.Background()) - c, err := cl.establishOutgoingConn(opts) + c, err := cl.dialAndCompleteHandshake(opts) if err == nil { c.conn.SetWriteDeadline(time.Time{}) } cl.lock() defer cl.unlock() - // Don't release lock between here and addPeerConn, unless it's for - // failure. - cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey) + // Don't release lock between here and addPeerConn, unless it's for failure. + cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey) if err != nil { if cl.config.Debug { - cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err) + cl.logger.Levelf( + log.Debug, + "error establishing outgoing connection to %v: %v", + opts.peerInfo.Addr, + err, + ) } return } defer c.close() - c.Discovery = ps - c.trusted = trusted + c.Discovery = opts.peerInfo.Source + c.trusted = opts.peerInfo.Trusted opts.t.runHandshookConnLoggingErr(c) } diff --git a/dial-pool.go b/dial-pool.go new file mode 100644 index 00000000..c0c233e4 --- /dev/null +++ b/dial-pool.go @@ -0,0 +1,43 @@ +package torrent + +import ( + "context" +) + +type dialPool struct { + resCh chan DialResult + addr string + left int +} + +func (me *dialPool) getFirst() (res DialResult) { + for me.left > 0 && res.Conn == nil { + res = <-me.resCh + me.left-- + } + return +} + +func (me *dialPool) add(ctx context.Context, dialer Dialer) { + me.left++ + go func() { + me.resCh <- DialResult{ + dialFromSocket(ctx, dialer, me.addr), + dialer, + } + }() +} + +func (me *dialPool) startDrainer() { + go me.drainAndCloseRemainingDials() +} + +func (me *dialPool) drainAndCloseRemainingDials() { + for me.left > 0 { + conn := (<-me.resCh).Conn + me.left-- + if conn != nil { + conn.Close() + } + } +} diff --git a/peer_protocol/extended.go b/peer_protocol/extended.go index e6d935f7..8bc51816 100644 --- a/peer_protocol/extended.go +++ b/peer_protocol/extended.go @@ -7,10 +7,12 @@ import ( // http://www.bittorrent.org/beps/bep_0010.html type ( ExtendedHandshakeMessage struct { - M map[ExtensionName]ExtensionNumber `bencode:"m"` - V string `bencode:"v,omitempty"` - Reqq int `bencode:"reqq,omitempty"` - Encryption bool `bencode:"e,omitempty"` + M map[ExtensionName]ExtensionNumber `bencode:"m"` + V string `bencode:"v,omitempty"` + Reqq int `bencode:"reqq,omitempty"` + // The only mention of this I can find is in https://www.bittorrent.org/beps/bep_0011.html + // for bit 0x01. + Encryption bool `bencode:"e"` // BEP 9 MetadataSize int `bencode:"metadata_size,omitempty"` // The local client port. It would be redundant for the receiving side of diff --git a/torrent.go b/torrent.go index 5d78f390..eeac172d 100644 --- a/torrent.go +++ b/torrent.go @@ -106,8 +106,6 @@ type Torrent struct { // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo - // The final ess is not silent here as it's in the plural. - utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -1383,7 +1381,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p, false, false, false, false) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -2398,13 +2404,12 @@ func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not // sure all the PeerInfo fields are being used. -func (t *Torrent) initiateConn( - peer PeerInfo, - requireRendezvous bool, - skipHolepunchRendezvous bool, +func initiateConn( + opts outgoingConnOpts, ignoreLimits bool, - receivedHolepunchConnect bool, ) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2424,15 +2429,7 @@ func (t *Torrent) initiateConn( attemptKey := &peer t.addHalfOpen(addrStr, attemptKey) go t.cl.outgoingConnection( - outgoingConnOpts{ - t: t, - addr: peer.Addr, - requireRendezvous: requireRendezvous, - skipHolepunchRendezvous: skipHolepunchRendezvous, - receivedHolepunchConnect: receivedHolepunchConnect, - }, - peer.Source, - peer.Trusted, + opts, attemptKey, ) } @@ -2804,40 +2801,32 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer } return nil case utHolepunch.Connect: - t.logger.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort) - rz, ok := t.utHolepunchRendezvous[msg.AddrPort] - if ok { - delete(rz.relays, sender) - rz.gotConnect.Set() - rz.relayCond.Broadcast() - } else { - // If the rendezvous was removed because we timed out or already got a connect signal, - // it doesn't hurt to try again. - t.initiateConn(PeerInfo{ - Addr: msg.AddrPort, - Source: PeerSourceUtHolepunch, - }, false, true, true, true) - } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) return nil case utHolepunch.Error: - rz, ok := t.utHolepunchRendezvous[msg.AddrPort] - if ok { - delete(rz.relays, sender) - rz.relayCond.Broadcast() - } - t.logger.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) return nil default: return fmt.Errorf("unhandled msg type %v", msg.MsgType) } } -func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) { - if MapContains(t.utHolepunchRendezvous, addrPort) { - err = errors.New("rendezvous already exists") - return - } - g.InitNew(&rz) +func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 for pc := range t.conns { if !pc.supportsExtension(utHolepunch.ExtensionName) { continue @@ -2847,17 +2836,14 @@ func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolep continue } } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) - MakeMapIfNilAndSet(&rz.relays, pc, struct{}{}) + rzsSent++ } - if len(rz.relays) == 0 { - err = fmt.Errorf("no eligible relays") - return - } - if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) { - panic("expected to fail earlier if rendezvous already exists") + if rzsSent == 0 { + return errors.New("no eligible relays") } - return + return nil } func (t *Torrent) numHalfOpenAttempts() (num int) { @@ -2866,3 +2852,18 @@ func (t *Torrent) numHalfOpenAttempts() (num int) { } return } + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +} + +func (t *Torrent) startHolepunchRendezvousForPeerRemoteAddr(addr PeerRemoteAddr) error { + addrPort, err := addrPortFromPeerRemoteAddr(addr) + if err != nil { + return err + } + return t.startHolepunchRendezvous(addrPort) +} diff --git a/ut-holepunching.go b/ut-holepunching.go index 08f0ac79..10cbafc7 100644 --- a/ut-holepunching.go +++ b/ut-holepunching.go @@ -1,11 +1 @@ package torrent - -import ( - "github.com/anacrolix/chansync" -) - -type utHolepunchRendezvous struct { - relays map[*PeerConn]struct{} - gotConnect chansync.SetOnce - relayCond chansync.BroadcastCond -} diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go index a45eb8e5..93ff3a4e 100644 --- a/ut-holepunching_test.go +++ b/ut-holepunching_test.go @@ -54,7 +54,8 @@ func TestHolepunchConnect(t *testing.T) { cfg.Seed = false cfg.DataDir = t.TempDir() cfg.MaxAllocPeerRequestDataPerConn = 4 - //cfg.Debug = true + cfg.Debug = true + cfg.NominalDialTimeout = time.Second //cfg.DisableUTP = true leecherLeecher, _ := NewClient(cfg) require.NoError(t, err) @@ -88,26 +89,29 @@ func TestHolepunchConnect(t *testing.T) { waitForConns(seederTorrent) go llg.AddClientPeer(leecher) waitForConns(llg) - //time.Sleep(time.Second) + time.Sleep(time.Second) llg.cl.lock() - targetAddr := seeder.ListenAddrs()[1] + targetAddr := seeder.ListenAddrs()[0] log.Printf("trying to initiate to %v", targetAddr) - llg.initiateConn(PeerInfo{ - Addr: targetAddr, - }, true, false, false, false) + initiateConn(outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: targetAddr, + }, + t: llg, + requireRendezvous: true, + skipHolepunchRendezvous: false, + HeaderObfuscationPolicy: llg.cl.config.HeaderObfuscationPolicy, + }, true) llg.cl.unlock() wg.Wait() - // These checks would require that the leecher leecher first attempt to connect without - // holepunching. - //llClientStats := leecherLeecher.Stats() - //c := qt.New(t) - //c.Check(llClientStats.NumPeersDialedRequiringHolepunch, qt.Not(qt.Equals), 0) - //c.Check( - // llClientStats.NumPeersDialedRequiringHolepunch, - // qt.Equals, - // llClientStats.NumPeersUndiableWithoutHolepunch, - //) + llClientStats := leecherLeecher.Stats() + c.Check(llClientStats.NumPeersDialableOnlyAfterHolepunch, qt.Not(qt.Equals), 0) + c.Check( + llClientStats.NumPeersDialableOnlyAfterHolepunch, + qt.Equals, + llClientStats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect, + ) } func waitForConns(t *Torrent) { @@ -120,3 +124,12 @@ func waitForConns(t *Torrent) { t.cl.event.Wait() } } + +func TestDialTcpNotAccepting(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + c := qt.New(t) + c.Check(err, qt.IsNil) + defer l.Close() + _, err = net.Dial("tcp", l.Addr().String()) + c.Assert(err, qt.IsNotNil) +} -- 2.44.0