From: Matt Joiner Date: Tue, 2 May 2023 03:06:54 +0000 (+1000) Subject: Support multiple ongoing half-open attempts X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0f2604e3e98aae3241d917f02af729a2e5be581b;p=btrtrc.git Support multiple ongoing half-open attempts --- diff --git a/client.go b/client.go index eaf3d677..709bc4c0 100644 --- a/client.go +++ b/client.go @@ -39,6 +39,7 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/check" "github.com/anacrolix/torrent/internal/limiter" + "github.com/anacrolix/torrent/internal/panicif" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" @@ -676,17 +677,30 @@ func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { return c } -func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { - if _, ok := t.halfOpen[addr]; !ok { - panic("invariant broken") +func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) { + path := t.getHalfOpenPath(addr, attemptKey) + if !path.Exists() { + panic("should exist") } - delete(t.halfOpen, addr) + path.Delete() cl.numHalfOpen-- + if check.Enabled { + panicif.NotEqual(cl.numHalfOpen, cl.countHalfOpenFromTorrents()) + } for _, t := range cl.torrents { t.openNewConns() } } +func (cl *Client) countHalfOpenFromTorrents() (count int) { + for _, t := range cl.torrents { + for _, attempts := range t.halfOpen { + count += len(attempts) + } + } + return +} + // Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection // for valid reasons. func (cl *Client) initiateProtocolHandshakes( @@ -852,7 +866,12 @@ type outgoingConnOpts struct { // Called to dial out and run a connection. The addr we're given is already // considered half-open. -func (cl *Client) outgoingConnection(opts outgoingConnOpts, ps PeerSource, trusted bool) { +func (cl *Client) outgoingConnection( + opts outgoingConnOpts, + ps PeerSource, + trusted bool, + attemptKey outgoingConnAttemptKey, +) { cl.dialRateLimiter.Wait(context.Background()) c, err := cl.establishOutgoingConn(opts) if err == nil { @@ -862,7 +881,7 @@ func (cl *Client) outgoingConnection(opts outgoingConnOpts, ps PeerSource, trust defer cl.unlock() // Don't release lock between here and addPeerConn, unless it's for // failure. - cl.noLongerHalfOpen(opts.t, opts.addr.String()) + cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey) if err != nil { if cl.config.Debug { cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err) @@ -1278,8 +1297,6 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { }, conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent), - halfOpen: make(map[string]PeerInfo), - storageOpener: storageClient, maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, diff --git a/torrent.go b/torrent.go index 3dd47cd5..41848e4a 100644 --- a/torrent.go +++ b/torrent.go @@ -38,6 +38,7 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/internal/check" + "github.com/anacrolix/torrent/internal/nestedmaps" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" @@ -105,7 +106,7 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo + halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo // The final ess is not silent here as it's in the plural. utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous @@ -174,6 +175,8 @@ type Torrent struct { requestIndexes []RequestIndex } +type outgoingConnAttemptKey = *PeerInfo + func (t *Torrent) length() int64 { return t._length.Value } @@ -238,8 +241,10 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) { }) // Add half-open peers to the list - for _, peer := range t.halfOpen { - ks = append(ks, peer) + for _, attempts := range t.halfOpen { + for _, peer := range attempts { + ks = append(ks, *peer) + } } // Add active peers to the list @@ -1380,7 +1385,7 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p, false, false) + t.initiateConn(p, false, false, false) initiated++ } return @@ -2362,12 +2367,44 @@ func (t *Torrent) VerifyData() { } } +func (t *Torrent) connectingToPeerAddr(addrStr string) bool { + return len(t.halfOpen[addrStr]) != 0 +} + +func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool { + addrStr := x.String() + for c := range t.conns { + ra := c.RemoteAddr + if ra.String() == addrStr { + return true + } + } + return false +} + +func (t *Torrent) getHalfOpenPath( + addrStr string, + attemptKey outgoingConnAttemptKey, +) nestedmaps.Path[*PeerInfo] { + return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey) +} + +func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { + path := t.getHalfOpenPath(addrStr, attemptKey) + if path.Exists() { + panic("should be unique") + } + path.Set(attemptKey) + t.cl.numHalfOpen++ +} + // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not // sure all the PeerInfo fields are being used. func (t *Torrent) initiateConn( peer PeerInfo, requireRendezvous bool, skipHolepunchRendezvous bool, + ignoreLimits bool, ) { if peer.Id == t.cl.peerID { return @@ -2376,17 +2413,28 @@ func (t *Torrent) initiateConn( return } addr := peer.Addr - if t.addrActive(addr.String()) { + addrStr := addr.String() + if !ignoreLimits { + if t.connectingToPeerAddr(addrStr) { + return + } + } + if t.hasPeerConnForAddr(addr) { return } - t.cl.numHalfOpen++ - t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(outgoingConnOpts{ - t: t, - addr: peer.Addr, - requireRendezvous: requireRendezvous, - skipHolepunchRendezvous: skipHolepunchRendezvous, - }, peer.Source, peer.Trusted) + attemptKey := &peer + t.addHalfOpen(addrStr, attemptKey) + go t.cl.outgoingConnection( + outgoingConnOpts{ + t: t, + addr: peer.Addr, + requireRendezvous: requireRendezvous, + skipHolepunchRendezvous: skipHolepunchRendezvous, + }, + peer.Source, + peer.Trusted, + attemptKey, + ) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2768,7 +2816,7 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer t.initiateConn(PeerInfo{ Addr: msg.AddrPort, Source: PeerSourceUtHolepunch, - }, false, true) + }, false, true, true) } return nil case utHolepunch.Error: diff --git a/ut-holepunching_test.go b/ut-holepunching_test.go index fe73e8d1..57423629 100644 --- a/ut-holepunching_test.go +++ b/ut-holepunching_test.go @@ -94,7 +94,7 @@ func TestHolepunchConnect(t *testing.T) { log.Printf("trying to initiate to %v", targetAddr) llg.initiateConn(PeerInfo{ Addr: targetAddr, - }, true, false) + }, true, false, false) llg.cl.unlock() wg.Wait() }