X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=torrent.go;h=6385a3fc026f9a2474ddc029908ef236ab30713a;hb=HEAD;hp=3e74df6394d160eec83a4fc31015c03938c20476;hpb=c47e6b1f605143e1f0f8d2172c2b5dfadd8fff71;p=btrtrc.git diff --git a/torrent.go b/torrent.go index 3e74df63..6385a3fc 100644 --- a/torrent.go +++ b/torrent.go @@ -8,6 +8,8 @@ import ( "errors" "fmt" "io" + "math/rand" + "net/netip" "net/url" "sort" "strings" @@ -19,25 +21,31 @@ import ( "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" + . "github.com/anacrolix/generics" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" - "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - request_strategy "github.com/anacrolix/torrent/request-strategy" - "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" + "golang.org/x/exp/maps" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" + "github.com/anacrolix/torrent/internal/check" + "github.com/anacrolix/torrent/internal/nestedmaps" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" "github.com/anacrolix/torrent/webseed" "github.com/anacrolix/torrent/webtorrent" ) @@ -57,17 +65,21 @@ type Torrent struct { userOnWriteChunkErr func(error) closed chansync.SetOnce + onClose []func() infoHash metainfo.Hash pieces []Piece + + // The order pieces are requested if there's no stronger reason like availability or priority. + pieceRequestOrder []int // Values are the piece indices that changed. - pieceStateChanges *pubsub.PubSub + pieceStateChanges pubsub.PubSub[PieceStateChange] // The size of chunks to request from peers over the wire. This is // normally 16KiB by convention these days. chunkSize pp.Integer chunkPool sync.Pool // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. - length *int64 + _length Option[int64] // The storage to open when the info dict becomes available. storageOpener *storage.Client @@ -84,6 +96,8 @@ type Torrent struct { fileIndex segments.Index files *[]*File + _chunksPerRegularPiece chunkIndexType + webSeeds map[string]*Peer // Active peer connections, running message stream loops. TODO: Make this // open (not-closed) connections only. @@ -91,14 +105,14 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo + halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known in // the swarm. peers prioritizedPeers - // Whether we want to know to know more peers. + // Whether we want to know more peers. wantPeersEvent missinggo.Event // An announcer for each tracker URL. trackerAnnouncers map[string]torrentTrackerAnnouncer @@ -137,16 +151,31 @@ type Torrent struct { initialPieceCheckDisabled bool connsWithAllPieces map[*Peer]struct{} - // Count of each request across active connections. - pendingRequests map[RequestIndex]*Peer - lastRequested map[RequestIndex]time.Time + + requestState map[RequestIndex]requestState // Chunks we've written to since the corresponding piece was last checked. - dirtyChunks roaring.Bitmap + dirtyChunks typedRoaring.Bitmap[RequestIndex] pex pexState // Is On when all pieces are complete. Complete chansync.Flag + + // Torrent sources in use keyed by the source string. + activeSources sync.Map + sourcesLogger log.Logger + + smartBanCache smartBanCache + + // Large allocations reused between request state updates. + requestPieceStates []request_strategy.PieceRequestOrderState + requestIndexes []RequestIndex +} + +type outgoingConnAttemptKey = *PeerInfo + +func (t *Torrent) length() int64 { + return t._length.Value } func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { @@ -209,8 +238,10 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) { }) // Add half-open peers to the list - for _, peer := range t.halfOpen { - ks = append(ks, peer) + for _, attempts := range t.halfOpen { + for _, peer := range attempts { + ks = append(ks, *peer) + } } // Add active peers to the list @@ -372,11 +403,6 @@ func (t *Torrent) makePieces() { beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files) endFile := pieceEndFileIndex(piece.torrentEndOffset(), files) piece.files = files[beginFile:endFile] - piece.undirtiedChunksIter = undirtiedChunksIter{ - TorrentDirtyChunks: &t.dirtyChunks, - StartRequestIndex: piece.requestIndexOffset(), - EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(), - } } } @@ -407,7 +433,7 @@ func (t *Torrent) cacheLength() { for _, f := range t.info.UpvertedFiles() { l += f.Length } - t.length = &l + t._length = Some(l) } // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure @@ -426,6 +452,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { t.nameMu.Lock() t.info = info t.nameMu.Unlock() + t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) t.updateComplete() t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles())) t.displayName = "" // Save a few bytes lol. @@ -444,7 +471,9 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { + t.pieceRequestOrder = rand.Perm(t.numPieces()) t.initPieceRequestOrder() + MakeSliceWithLength(&t.requestPieceStates, t.numPieces()) for i := range t.pieces { p := &t.pieces[i] // Need to add relativeAvailability before updating piece completion, as that may result in conns @@ -454,17 +483,16 @@ func (t *Torrent) onSetInfo() { } p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i) t.addRequestOrderPiece(i) - t.updatePieceCompletion(pieceIndex(i)) + t.updatePieceCompletion(i) if !t.initialPieceCheckDisabled && !p.storageCompletionOk { // t.logger.Printf("piece %s completion unknown, queueing check", p) - t.queuePieceCheck(pieceIndex(i)) + t.queuePieceCheck(i) } } t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests = make(map[RequestIndex]*Peer) - t.lastRequested = make(map[RequestIndex]time.Time) + t.requestState = make(map[RequestIndex]requestState) t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) @@ -515,7 +543,7 @@ func (t *Torrent) setMetadataSize(size int) (err error) { return } if uint32(size) > maxMetadataSize { - return errors.New("bad size") + return log.WithLevel(log.Warning, errors.New("bad size")) } if len(t.metadataBytes) == size { return @@ -535,7 +563,7 @@ func (t *Torrent) name() string { t.nameMu.RLock() defer t.nameMu.RUnlock() if t.haveInfo() { - return t.info.Name + return t.info.BestName() } if t.displayName != "" { return t.displayName @@ -736,23 +764,40 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces) - spew.NewDefaultConfig() - spew.Fdump(w, t.statsLocked()) - - peers := t.peersAsSlice() - sort.Slice(peers, func(_i, _j int) bool { - i := peers[_i] - j := peers[_j] - if less, ok := multiless.New().EagerSameLess( - i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(), - ).LessOk(); ok { - return less - } - return worseConn(i, j) + dumpStats(w, t.statsLocked()) + + fmt.Fprintf(w, "webseeds:\n") + t.writePeerStatuses(w, maps.Values(t.webSeeds)) + + peerConns := maps.Keys(t.conns) + // Peers without priorities first, then those with. I'm undecided about how to order peers + // without priorities. + sort.Slice(peerConns, func(li, ri int) bool { + l := peerConns[li] + r := peerConns[ri] + ml := multiless.New() + lpp := g.ResultFromTuple(l.peerPriority()).ToOption() + rpp := g.ResultFromTuple(r.peerPriority()).ToOption() + ml = ml.Bool(lpp.Ok, rpp.Ok) + ml = ml.Uint32(rpp.Value, lpp.Value) + return ml.Less() }) - for i, c := range peers { - fmt.Fprintf(w, "%2d. ", i+1) - c.writeStatus(w, t) + + fmt.Fprintf(w, "%v peer conns:\n", len(peerConns)) + t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer { + return &pc.Peer + })) +} + +func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) { + var buf bytes.Buffer + for _, c := range peers { + fmt.Fprintf(w, "- ") + buf.Reset() + c.writeStatus(&buf) + w.Write(bytes.TrimRight( + bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), + " ")) } } @@ -785,7 +830,10 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } } -// Get bytes left +// Returns a count of bytes that are not complete in storage, and not pending being written to +// storage. This value is from the perspective of the download manager, and may not agree with the +// actual state in storage. If you want read data synchronously you should use a Reader. See +// https://github.com/anacrolix/torrent/issues/828. func (t *Torrent) BytesMissing() (n int64) { t.cl.rLock() n = t.bytesMissingLocked() @@ -834,7 +882,7 @@ func (t *Torrent) usualPieceSize() int { } func (t *Torrent) numPieces() pieceIndex { - return pieceIndex(t.info.NumPieces()) + return t.info.NumPieces() } func (t *Torrent) numPiecesCompleted() (num pieceIndex) { @@ -842,7 +890,13 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) { } func (t *Torrent) close(wg *sync.WaitGroup) (err error) { - t.closed.Set() + if !t.closed.Set() { + err = errors.New("already closed") + return + } + for _, f := range t.onClose { + f() + } if t.storage != nil { wg.Add(1) go func() { @@ -863,27 +917,31 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { if t.storage != nil { t.deletePieceRequestOrder() } + t.assertAllPiecesRelativeAvailabilityZero() + t.pex.Reset() + t.cl.event.Broadcast() + t.pieceStateChanges.Close() + t.updateWantPeersEvent() + return +} + +func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() { for i := range t.pieces { p := t.piece(i) if p.relativeAvailability != 0 { panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability)) } } - t.pex.Reset() - t.cl.event.Broadcast() - t.pieceStateChanges.Close() - t.updateWantPeersEvent() - return } func (t *Torrent) requestOffset(r Request) int64 { - return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r) + return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r) } // Return the request that would include the given offset into the torrent data. Returns !ok if // there is no such request. func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { - return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off) + return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off) } func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) { @@ -908,15 +966,15 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType { return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) } -func (t *Torrent) chunksPerRegularPiece() uint32 { - return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) +func (t *Torrent) chunksPerRegularPiece() chunkIndexType { + return t._chunksPerRegularPiece } -func (t *Torrent) numRequests() RequestIndex { +func (t *Torrent) numChunks() RequestIndex { if t.numPieces() == 0 { return 0 } - return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1) + return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1) } func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { @@ -931,7 +989,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return 0 } if piece == t.numPieces()-1 { - ret := pp.Integer(*t.length % t.info.PieceLength) + ret := pp.Integer(t.length() % t.info.PieceLength) if ret != 0 { return ret } @@ -939,7 +997,20 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return pp.Integer(t.info.PieceLength) } -func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) { +func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter { + return &blockCheckingWriter{ + cache: &t.smartBanCache, + requestIndex: t.pieceRequestIndexOffset(piece), + chunkSize: t.chunkSize.Int(), + } +} + +func (t *Torrent) hashPiece(piece pieceIndex) ( + ret metainfo.Hash, + // These are peers that sent us blocks that differ from what we hash here. + differingPeers map[bannableAddr]struct{}, + err error, +) { p := t.piece(piece) p.waitNoPendingWrites() storagePiece := t.pieces[piece].Storage() @@ -955,13 +1026,18 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) { hash := pieceHash.New() const logPieceContents = false + smartBanWriter := t.smartBanBlockCheckingWriter(piece) + writers := []io.Writer{hash, smartBanWriter} + var examineBuf bytes.Buffer if logPieceContents { - var examineBuf bytes.Buffer - _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf)) - log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) - } else { - _, err = storagePiece.WriteTo(hash) + writers = append(writers, &examineBuf) + } + _, err = storagePiece.WriteTo(io.MultiWriter(writers...)) + if logPieceContents { + t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err) } + smartBanWriter.Flush() + differingPeers = smartBanWriter.badPeers missinggo.CopyExact(&ret, hash.Sum(nil)) return } @@ -984,7 +1060,7 @@ func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) maybeDropMutuallyCompletePeer( // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's // okay? - p *Peer, + p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { return @@ -998,7 +1074,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( if p.useful() { return } - t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p) + p.logger.Levelf(log.Debug, "is mutually complete; dropping") p.drop() } @@ -1037,17 +1113,26 @@ func getPeerConnSlice(cap int) []*PeerConn { } } -// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad -// connection is one that usually sends us unwanted pieces, or has been in the worse half of the -// established connections for more than a minute. This is O(n log n). If there was a way to not -// consider the position of a conn relative to the total number, it could be reduced to O(n). -func (t *Torrent) worstBadConn() (ret *PeerConn) { - wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))} - defer peerConnSlices.Put(wcs.conns) - wcs.initKeys() +// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as +// this is a frequent occurrence. +func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) { + sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns))) + f(sl) + peerConnSlices.Put(sl) +} + +func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn { + wcs := worseConnSlice{conns: sl} + wcs.initKeys(opts) heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*PeerConn) + if opts.incomingIsBad && !c.outgoing { + return c + } + if opts.outgoingIsBad && c.outgoing { + return c + } if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() { return c } @@ -1063,6 +1148,17 @@ func (t *Torrent) worstBadConn() (ret *PeerConn) { return nil } +// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad +// connection is one that usually sends us unwanted pieces, or has been in the worse half of the +// established connections for more than a minute. This is O(n log n). If there was a way to not +// consider the position of a conn relative to the total number, it could be reduced to O(n). +func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) { + t.withUnclosedConns(func(ucs []*PeerConn) { + ret = t.worstBadConnFromSlice(opts, ucs) + }) + return +} + type PieceStateChange struct { Index int PieceState @@ -1144,7 +1240,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { if !c.peerHasPiece(piece) { return } - if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) { + if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) { return } c.updateRequests(reason) @@ -1189,7 +1285,7 @@ func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) { // Returns the range of pieces [begin, end) that contains the extent of bytes. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { - if off >= *t.length { + if off >= t.length() { return } if off < 0 { @@ -1228,7 +1324,7 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { } func (t *Torrent) pendRequest(req RequestIndex) { - t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece()) + t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece()) } func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) { @@ -1250,6 +1346,15 @@ func (t *Torrent) numReceivedConns() (ret int) { return } +func (t *Torrent) numOutgoingConns() (ret int) { + for c := range t.conns { + if c.outgoing { + ret++ + } + } + return +} + func (t *Torrent) maxHalfOpen() int { // Note that if we somehow exceed the maximum established conns, we want // the negative value to have an effect. @@ -1257,13 +1362,16 @@ func (t *Torrent) maxHalfOpen() int { extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2) // We want to allow some experimentation with new peers, and to try to // upset an oversupply of received connections. - return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent))) + return int(min( + max(5, extraIncoming)+establishedHeadroom, + int64(t.cl.config.HalfOpenConnsPerTorrent), + )) } func (t *Torrent) openNewConns() (initiated int) { defer t.updateWantPeersEvent() for t.peers.Len() != 0 { - if !t.wantConns() { + if !t.wantOutgoingConns() { return } if len(t.halfOpen) >= t.maxHalfOpen() { @@ -1276,7 +1384,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -1302,7 +1418,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } if changed { - log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger) + log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger) t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion") } return changed @@ -1406,7 +1522,7 @@ func (t *Torrent) bytesCompleted() int64 { if !t.haveInfo() { return 0 } - return *t.length - t.bytesLeft() + return t.length() - t.bytesLeft() } func (t *Torrent) SetInfoBytes(b []byte) (err error) { @@ -1432,13 +1548,7 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { } } torrent.Add("deleted connections", 1) - if !c.deleteAllRequests().IsEmpty() { - t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests("Torrent.deletePeerConn") - } - }) - } + c.deleteAllRequests("Torrent.deletePeerConn") t.assertPendingRequests() if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { panic(t.connsWithAllPieces) @@ -1460,12 +1570,12 @@ func (t *Torrent) decPeerPieceAvailability(p *Peer) { } func (t *Torrent) assertPendingRequests() { - if !check { + if !check.Enabled { return } // var actual pendingRequests // if t.haveInfo() { - // actual.m = make([]int, t.numRequests()) + // actual.m = make([]int, t.numChunks()) // } // t.iterPeers(func(p *Peer) { // p.requestState.Requests.Iterate(func(x uint32) bool { @@ -1495,7 +1605,7 @@ func (t *Torrent) wantPeers() bool { if t.peers.Len() > t.cl.config.TorrentPeersLowWater { return false } - return t.wantConns() + return t.wantOutgoingConns() } func (t *Torrent) updateWantPeersEvent() { @@ -1537,18 +1647,23 @@ func (t *Torrent) onWebRtcConn( DataChannelContext: dcc, } peerRemoteAddr := netConn.RemoteAddr() + //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr) if t.cl.badPeerAddr(peerRemoteAddr) { return } + localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, t, - dcc.LocalOffered, false, - netConn.RemoteAddr(), - webrtcNetwork, - fmt.Sprintf("webrtc offer_id %x", dcc.OfferId), + newConnectionOpts{ + outgoing: dcc.LocalOffered, + remoteAddr: peerRemoteAddr, + localPublicAddr: localAddrIpPort, + network: webrtcNetwork, + connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + }, ) if err != nil { t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err) @@ -1564,14 +1679,14 @@ func (t *Torrent) onWebRtcConn( defer t.cl.unlock() err = t.cl.runHandshookConn(pc, t) if err != nil { - t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err) + t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err) } } func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) { err := t.cl.runHandshookConn(pc, t) if err != nil || logAll { - t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err) + t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err) } } @@ -1580,11 +1695,10 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { } func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer { - wtc, release := t.cl.websocketTrackers.Get(u.String()) - go func() { - <-t.closed.Done() - release() - }() + wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash) + // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for + // the same info hash before the old one is cleaned up. + t.onClose = append(t.onClose, release) wst := websocketTrackerStatus{u, wtc} go func() { err := wtc.Announce(tracker.Started, t.infoHash) @@ -1604,10 +1718,9 @@ func (t *Torrent) startScrapingTracker(_url string) { } u, err := url.Parse(_url) if err != nil { - // URLs with a leading '*' appear to be a uTorrent convention to - // disable trackers. + // URLs with a leading '*' appear to be a uTorrent convention to disable trackers. if _url[0] != '*' { - log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger) + t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err) } return } @@ -1677,7 +1790,7 @@ func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceR Event: event, NumWant: func() int32 { if t.wantPeers() && len(t.cl.dialers) > 0 { - return -1 + return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764 } else { return 0 } @@ -1764,7 +1877,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) { // We're also announcing ourselves as a listener, so we don't just want peer addresses. // TODO: We can include the announce_peer step depending on whether we can receive // inbound connections. We should probably only announce once every 15 mins too. - if !t.wantConns() { + if !t.wantAnyConns() { goto wait } // TODO: Determine if there's a listener on the port we're announcing. @@ -1882,9 +1995,16 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } } if len(t.conns) >= t.maxEstablishedConns { - c := t.worstBadConn() + numOutgoing := t.numOutgoingConns() + numIncoming := len(t.conns) - numOutgoing + c := t.worstBadConn(worseConnLensOpts{ + // We've already established that we have too many connections at this point, so we just + // need to match what kind we have too many of vs. what we're trying to add now. + incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing, + outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing, + }) if c == nil { - return errors.New("don't want conns") + return errors.New("don't want conn") } c.close() t.deletePeerConn(c) @@ -1893,13 +2013,15 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { panic(len(t.conns)) } t.conns[c] = struct{}{} + t.cl.event.Broadcast() + // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { - t.pex.Add(c) // as no further extended handshake expected + t.pex.Add(c) } return nil } -func (t *Torrent) wantConns() bool { +func (t *Torrent) newConnsAllowed() bool { if !t.networkingEnabled.Bool() { return false } @@ -1909,7 +2031,48 @@ func (t *Torrent) wantConns() bool { if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { return false } - return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil + return true +} + +func (t *Torrent) wantAnyConns() bool { + if !t.networkingEnabled.Bool() { + return false + } + if t.closed.IsSet() { + return false + } + if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { + return false + } + return len(t.conns) < t.maxEstablishedConns +} + +func (t *Torrent) wantOutgoingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1, + outgoingIsBad: false, + }) != nil +} + +func (t *Torrent) wantIncomingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: false, + outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1, + }) != nil } func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { @@ -1922,7 +2085,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { return true }), } - wcs.initKeys() + wcs.initKeys(worseConnLensOpts{}) heap.Init(&wcs) for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { t.dropConnection(heap.Pop(&wcs).(*PeerConn)) @@ -1932,7 +2095,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { } func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { - t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug)) + t.logger.LazyLog(log.Debug, func() log.Msg { + return log.Fstr("hashed piece %d (passed=%t)", piece, passed) + }) p := t.piece(piece) p.numVerifies++ t.cl.event.Broadcast() @@ -1947,7 +2112,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } else { log.Fmsg( "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers), - ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger) + ).AddValues(t, p).LogLevel( + + log.Debug, t.logger) + pieceHashedNotCorrect.Add(1) } } @@ -1968,10 +2136,14 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { c._stats.incrementPiecesDirtiedGood() } t.clearPieceTouchers(piece) + hasDirty := p.hasDirtyChunks() t.cl.unlock() + if hasDirty { + p.Flush() // You can be synchronous here! + } err := p.Storage().MarkComplete() if err != nil { - t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err) + t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) } t.cl.lock() @@ -2015,8 +2187,16 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] - t.cl.banPeerIP(c.remoteIp()) - c.drop() + if len(bannableTouchers) != 1 { + t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + } else { + // Turns out it's still useful to ban peers like this because if there's only a + // single peer for a piece, and we never progress that piece to completion, we + // will never smart-ban them. Discovered in + // https://github.com/anacrolix/torrent/issues/715. + t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece) + c.ban() + } } } t.onIncompletePiece(piece) @@ -2026,7 +2206,9 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { - for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ { + start := t.pieceRequestIndexOffset(piece) + end := start + t.pieceNumChunks(piece) + for ri := start; ri < end; ri++ { t.cancelRequest(ri) } } @@ -2037,7 +2219,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) - t.maybeDropMutuallyCompletePeer(&conn.Peer) + t.maybeDropMutuallyCompletePeer(conn) } } @@ -2069,7 +2251,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { } func (t *Torrent) tryCreateMorePieceHashers() { - for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() { + for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { } } @@ -2104,9 +2286,32 @@ func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) { return } +func (t *Torrent) dropBannedPeers() { + t.iterPeers(func(p *Peer) { + remoteIp := p.remoteIp() + if remoteIp == nil { + if p.bannableAddr.Ok { + t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p) + } + return + } + netipAddr := netip.MustParseAddr(remoteIp.String()) + if Some(netipAddr) != p.bannableAddr { + t.logger.WithDefaultLevel(log.Debug).Printf( + "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]", + p, remoteIp, p.bannableAddr) + } + if _, ok := t.cl.badPeerIPs[netipAddr]; ok { + // Should this be a close? + p.drop() + t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr) + } + }) +} + func (t *Torrent) pieceHasher(index pieceIndex) { p := t.piece(index) - sum, copyErr := t.hashPiece(index) + sum, failedPeers, copyErr := t.hashPiece(index) correct := sum == *p.hash switch copyErr { case nil, io.EOF: @@ -2116,6 +2321,16 @@ func (t *Torrent) pieceHasher(index pieceIndex) { t.storageLock.RUnlock() t.cl.lock() defer t.cl.unlock() + if correct { + for peer := range failedPeers { + t.cl.banPeerIP(peer.AsSlice()) + t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index) + } + t.dropBannedPeers() + for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ { + t.smartBanCache.ForgetBlock(ri) + } + } p.hashing = false t.pieceHashed(index, correct, copyErr) t.updatePiecePriority(index, "Torrent.pieceHasher") @@ -2158,8 +2373,45 @@ func (t *Torrent) VerifyData() { } } -// Start the process of connecting to the given peer for the given torrent if appropriate. -func (t *Torrent) initiateConn(peer PeerInfo) { +func (t *Torrent) connectingToPeerAddr(addrStr string) bool { + return len(t.halfOpen[addrStr]) != 0 +} + +func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool { + addrStr := x.String() + for c := range t.conns { + ra := c.RemoteAddr + if ra.String() == addrStr { + return true + } + } + return false +} + +func (t *Torrent) getHalfOpenPath( + addrStr string, + attemptKey outgoingConnAttemptKey, +) nestedmaps.Path[*PeerInfo] { + return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey) +} + +func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { + path := t.getHalfOpenPath(addrStr, attemptKey) + if path.Exists() { + panic("should be unique") + } + path.Set(attemptKey) + t.cl.numHalfOpen++ +} + +// Start the process of connecting to the given peer for the given torrent if appropriate. I'm not +// sure all the PeerInfo fields are being used. +func initiateConn( + opts outgoingConnOpts, + ignoreLimits bool, +) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2167,12 +2419,21 @@ func (t *Torrent) initiateConn(peer PeerInfo) { return } addr := peer.Addr - if t.addrActive(addr.String()) { + addrStr := addr.String() + if !ignoreLimits { + if t.connectingToPeerAddr(addrStr) { + return + } + } + if t.hasPeerConnForAddr(addr) { return } - t.cl.numHalfOpen++ - t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) + attemptKey := &peer + t.addHalfOpen(addrStr, attemptKey) + go t.cl.outgoingConnection( + opts, + attemptKey, + ) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2193,7 +2454,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // connection. func (t *Torrent) allStats(f func(*ConnStats)) { f(&t.stats) - f(&t.cl.stats) + f(&t.cl.connStats) } func (t *Torrent) hashingPiece(i pieceIndex) bool { @@ -2275,7 +2536,24 @@ func (t *Torrent) callbacks() *Callbacks { return &t.cl.config.Callbacks } -func (t *Torrent) addWebSeed(url string) { +type AddWebSeedsOpt func(*webseed.Client) + +// Sets the WebSeed trailing path escaper for a webseed.Client. +func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt { + return func(c *webseed.Client) { + c.PathEscaper = custom + } +} + +func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) { + t.cl.lock() + defer t.cl.unlock() + for _, u := range urls { + t.addWebSeed(u, opts...) + } +} + +func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) { if t.cl.config.DisableWebseeds { return } @@ -2299,11 +2577,12 @@ func (t *Torrent) addWebSeed(url string) { // requests mark more often, so recomputation is probably sooner than with regular peer // conns. ~4x maxRequests would be about right. PeerMaxRequests: 128, - RemoteAddr: remoteAddrFromUrl(url), - callbacks: t.callbacks(), + // TODO: Set ban prefix? + RemoteAddr: remoteAddrFromUrl(url), + callbacks: t.callbacks(), }, client: webseed.Client{ - HttpClient: t.cl.webseedHttpClient, + HttpClient: t.cl.httpClient, Url: url, ResponseBodyWrapper: func(r io.Reader) io.Reader { return &rateLimitedReader{ @@ -2313,7 +2592,10 @@ func (t *Torrent) addWebSeed(url string) { }, }, activeRequests: make(map[Request]webseed.Request, maxRequests), - maxRequests: maxRequests, + } + ws.peer.initRequestState() + for _, opt := range opts { + opt(&ws.client) } ws.peer.initUpdateRequestsTimer() ws.requesterCond.L = t.cl.locker() @@ -2341,15 +2623,15 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) { } func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request { - index := ri / t.chunksPerRegularPiece() + index := t.pieceIndexOfRequestIndex(ri) return Request{ pp.Integer(index), - t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()), + t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()), } } func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { - return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize) + return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize) } func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { @@ -2361,16 +2643,20 @@ func (t *Torrent) updateComplete() { } func (t *Torrent) cancelRequest(r RequestIndex) *Peer { - p := t.pendingRequests[r] + p := t.requestingPeer(r) if p != nil { p.cancel(r) } - delete(t.pendingRequests, r) + // TODO: This is a check that an old invariant holds. It can be removed after some testing. + //delete(t.pendingRequests, r) + if _, ok := t.requestState[r]; ok { + panic("expected request state to be gone") + } return p } func (t *Torrent) requestingPeer(r RequestIndex) *Peer { - return t.pendingRequests[r] + return t.requestState[r].peer } func (t *Torrent) addConnWithAllPieces(p *Peer) { @@ -2398,3 +2684,222 @@ func (t *Torrent) hasStorageCap() bool { _, ok := (*f)() return ok } + +func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex { + return pieceIndex(ri / t.chunksPerRegularPiece()) +} + +func (t *Torrent) iterUndirtiedRequestIndexesInPiece( + reuseIter *typedRoaring.Iterator[RequestIndex], + piece pieceIndex, + f func(RequestIndex), +) { + reuseIter.Initialize(&t.dirtyChunks) + pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece) + iterBitmapUnsetInRange( + reuseIter, + pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece), + f, + ) +} + +type requestState struct { + peer *Peer + when time.Time +} + +// Returns an error if a received chunk is out of bounds in someway. +func (t *Torrent) checkValidReceiveChunk(r Request) error { + if !t.haveInfo() { + return errors.New("torrent missing info") + } + if int(r.Index) >= t.numPieces() { + return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces()) + } + pieceLength := t.pieceLength(pieceIndex(r.Index)) + if r.Begin >= pieceLength { + return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength) + } + // We could check chunk lengths here, but chunk request size is not changed often, and tricky + // for peers to manipulate as they need to send potentially large buffers to begin with. There + // should be considerable checks elsewhere for this case due to the network overhead. We should + // catch most of the overflow manipulation stuff by checking index and begin above. + return nil +} + +func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) { + for pc := range t.conns { + dialAddr, err := pc.remoteDialAddrPort() + if err != nil { + continue + } + if dialAddr != target { + continue + } + ret = append(ret, pc) + } + return +} + +func wrapUtHolepunchMsgForPeerConn( + recipient *PeerConn, + msg utHolepunch.Msg, +) pp.Message { + extendedPayload, err := msg.MarshalBinary() + if err != nil { + panic(err) + } + return pp.Message{ + Type: pp.Extended, + ExtendedID: MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName), + ExtendedPayload: extendedPayload, + } +} + +func sendUtHolepunchMsg( + pc *PeerConn, + msgType utHolepunch.MsgType, + addrPort netip.AddrPort, + errCode utHolepunch.ErrCode, +) { + holepunchMsg := utHolepunch.Msg{ + MsgType: msgType, + AddrPort: addrPort, + ErrCode: errCode, + } + incHolepunchMessagesSent(holepunchMsg) + ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg) + pc.write(ppMsg) +} + +func incHolepunchMessages(msg utHolepunch.Msg, verb string) { + torrent.Add( + fmt.Sprintf( + "holepunch %v %v messages %v", + msg.MsgType, + addrPortProtocolStr(msg.AddrPort), + verb, + ), + 1, + ) +} + +func incHolepunchMessagesReceived(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "received") +} + +func incHolepunchMessagesSent(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "sent") +} + +func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { + incHolepunchMessagesReceived(msg) + switch msg.MsgType { + case utHolepunch.Rendezvous: + t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) + sendMsg := sendUtHolepunchMsg + senderAddrPort, err := sender.remoteDialAddrPort() + if err != nil { + sender.logger.Levelf( + log.Warning, + "error getting ut_holepunch rendezvous sender's dial address: %v", + err, + ) + // There's no better error code. The sender's address itself is invalid. I don't see + // this error message being appropriate anywhere else anyway. + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer) + } + targets := t.peerConnsWithDialAddrPort(msg.AddrPort) + if len(targets) == 0 { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) + return nil + } + for _, pc := range targets { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport) + continue + } + sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) + sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0) + } + return nil + case utHolepunch.Connect: + holepunchAddr := msg.AddrPort + t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender) + if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr) + if g.MapContains(t.cl.accepted, holepunchAddr) { + setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) + return nil + case utHolepunch.Error: + torrent.Add("holepunch error messages received", 1) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + return nil + default: + return fmt.Errorf("unhandled msg type %v", msg.MsgType) + } +} + +func addrPortProtocolStr(addrPort netip.AddrPort) string { + addr := addrPort.Addr() + switch { + case addr.Is4(): + return "ipv4" + case addr.Is6(): + return "ipv6" + default: + panic(addrPort) + } +} + +func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 + for pc := range t.conns { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + continue + } + if pc.supportsExtension(pp.ExtensionNamePex) { + if !g.MapContains(pc.pex.remoteLiveConns, addrPort) { + continue + } + } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) + sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) + rzsSent++ + } + if rzsSent == 0 { + return errors.New("no eligible relays") + } + return nil +} + +func (t *Torrent) numHalfOpenAttempts() (num int) { + for _, attempts := range t.halfOpen { + num += len(attempts) + } + return +} + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +}