X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=torrent.go;h=6385a3fc026f9a2474ddc029908ef236ab30713a;hb=HEAD;hp=dbfbea18ef2a5d8998bd283605724e5eefd325e6;hpb=590d1ac2656497c248ae24dffd146871663f6f4e;p=btrtrc.git diff --git a/torrent.go b/torrent.go index dbfbea18..6385a3fc 100644 --- a/torrent.go +++ b/torrent.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/netip" "net/url" "sort" @@ -21,6 +22,7 @@ import ( "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" . "github.com/anacrolix/generics" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/slices" @@ -29,18 +31,21 @@ import ( "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - request_strategy "github.com/anacrolix/torrent/request-strategy" - typedRoaring "github.com/anacrolix/torrent/typed-roaring" - "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" + "golang.org/x/exp/maps" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" + "github.com/anacrolix/torrent/internal/check" + "github.com/anacrolix/torrent/internal/nestedmaps" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" "github.com/anacrolix/torrent/webseed" "github.com/anacrolix/torrent/webtorrent" ) @@ -60,8 +65,12 @@ type Torrent struct { userOnWriteChunkErr func(error) closed chansync.SetOnce + onClose []func() infoHash metainfo.Hash pieces []Piece + + // The order pieces are requested if there's no stronger reason like availability or priority. + pieceRequestOrder []int // Values are the piece indices that changed. pieceStateChanges pubsub.PubSub[PieceStateChange] // The size of chunks to request from peers over the wire. This is @@ -70,7 +79,7 @@ type Torrent struct { chunkPool sync.Pool // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. - length *int64 + _length Option[int64] // The storage to open when the info dict becomes available. storageOpener *storage.Client @@ -96,14 +105,14 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo + halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known in // the swarm. peers prioritizedPeers - // Whether we want to know to know more peers. + // Whether we want to know more peers. wantPeersEvent missinggo.Event // An announcer for each tracker URL. trackerAnnouncers map[string]torrentTrackerAnnouncer @@ -143,7 +152,7 @@ type Torrent struct { connsWithAllPieces map[*Peer]struct{} - requestState []requestState + requestState map[RequestIndex]requestState // Chunks we've written to since the corresponding piece was last checked. dirtyChunks typedRoaring.Bitmap[RequestIndex] @@ -163,6 +172,12 @@ type Torrent struct { requestIndexes []RequestIndex } +type outgoingConnAttemptKey = *PeerInfo + +func (t *Torrent) length() int64 { + return t._length.Value +} + func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { // This could be done with roaring.BitSliceIndexing. t.iterPeers(func(peer *Peer) { @@ -223,8 +238,10 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) { }) // Add half-open peers to the list - for _, peer := range t.halfOpen { - ks = append(ks, peer) + for _, attempts := range t.halfOpen { + for _, peer := range attempts { + ks = append(ks, *peer) + } } // Add active peers to the list @@ -386,11 +403,6 @@ func (t *Torrent) makePieces() { beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files) endFile := pieceEndFileIndex(piece.torrentEndOffset(), files) piece.files = files[beginFile:endFile] - piece.undirtiedChunksIter = undirtiedChunksIter{ - TorrentDirtyChunks: &t.dirtyChunks, - StartRequestIndex: piece.requestIndexOffset(), - EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(), - } } } @@ -421,7 +433,7 @@ func (t *Torrent) cacheLength() { for _, f := range t.info.UpvertedFiles() { l += f.Length } - t.length = &l + t._length = Some(l) } // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure @@ -459,8 +471,8 @@ func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrder // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { + t.pieceRequestOrder = rand.Perm(t.numPieces()) t.initPieceRequestOrder() - MakeSliceWithLength(&t.requestState, t.numChunks()) MakeSliceWithLength(&t.requestPieceStates, t.numPieces()) for i := range t.pieces { p := &t.pieces[i] @@ -480,6 +492,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() + t.requestState = make(map[RequestIndex]requestState) t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) @@ -530,7 +543,7 @@ func (t *Torrent) setMetadataSize(size int) (err error) { return } if uint32(size) > maxMetadataSize { - return errors.New("bad size") + return log.WithLevel(log.Warning, errors.New("bad size")) } if len(t.metadataBytes) == size { return @@ -751,23 +764,40 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces) - spew.NewDefaultConfig() - spew.Fdump(w, t.statsLocked()) - - peers := t.peersAsSlice() - sort.Slice(peers, func(_i, _j int) bool { - i := peers[_i] - j := peers[_j] - if less, ok := multiless.New().EagerSameLess( - i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(), - ).LessOk(); ok { - return less - } - return worseConn(i, j) + dumpStats(w, t.statsLocked()) + + fmt.Fprintf(w, "webseeds:\n") + t.writePeerStatuses(w, maps.Values(t.webSeeds)) + + peerConns := maps.Keys(t.conns) + // Peers without priorities first, then those with. I'm undecided about how to order peers + // without priorities. + sort.Slice(peerConns, func(li, ri int) bool { + l := peerConns[li] + r := peerConns[ri] + ml := multiless.New() + lpp := g.ResultFromTuple(l.peerPriority()).ToOption() + rpp := g.ResultFromTuple(r.peerPriority()).ToOption() + ml = ml.Bool(lpp.Ok, rpp.Ok) + ml = ml.Uint32(rpp.Value, lpp.Value) + return ml.Less() }) - for i, c := range peers { - fmt.Fprintf(w, "%2d. ", i+1) - c.writeStatus(w, t) + + fmt.Fprintf(w, "%v peer conns:\n", len(peerConns)) + t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer { + return &pc.Peer + })) +} + +func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) { + var buf bytes.Buffer + for _, c := range peers { + fmt.Fprintf(w, "- ") + buf.Reset() + c.writeStatus(&buf) + w.Write(bytes.TrimRight( + bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), + " ")) } } @@ -800,7 +830,10 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } } -// Get bytes left +// Returns a count of bytes that are not complete in storage, and not pending being written to +// storage. This value is from the perspective of the download manager, and may not agree with the +// actual state in storage. If you want read data synchronously you should use a Reader. See +// https://github.com/anacrolix/torrent/issues/828. func (t *Torrent) BytesMissing() (n int64) { t.cl.rLock() n = t.bytesMissingLocked() @@ -849,7 +882,7 @@ func (t *Torrent) usualPieceSize() int { } func (t *Torrent) numPieces() pieceIndex { - return pieceIndex(t.info.NumPieces()) + return t.info.NumPieces() } func (t *Torrent) numPiecesCompleted() (num pieceIndex) { @@ -861,6 +894,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { err = errors.New("already closed") return } + for _, f := range t.onClose { + f() + } if t.storage != nil { wg.Add(1) go func() { @@ -881,27 +917,31 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { if t.storage != nil { t.deletePieceRequestOrder() } + t.assertAllPiecesRelativeAvailabilityZero() + t.pex.Reset() + t.cl.event.Broadcast() + t.pieceStateChanges.Close() + t.updateWantPeersEvent() + return +} + +func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() { for i := range t.pieces { p := t.piece(i) if p.relativeAvailability != 0 { panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability)) } } - t.pex.Reset() - t.cl.event.Broadcast() - t.pieceStateChanges.Close() - t.updateWantPeersEvent() - return } func (t *Torrent) requestOffset(r Request) int64 { - return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r) + return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r) } // Return the request that would include the given offset into the torrent data. Returns !ok if // there is no such request. func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { - return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off) + return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off) } func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) { @@ -949,7 +989,7 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { return 0 } if piece == t.numPieces()-1 { - ret := pp.Integer(*t.length % t.info.PieceLength) + ret := pp.Integer(t.length() % t.info.PieceLength) if ret != 0 { return ret } @@ -1020,7 +1060,7 @@ func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) maybeDropMutuallyCompletePeer( // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's // okay? - p *Peer, + p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { return @@ -1034,7 +1074,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( if p.useful() { return } - t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p) + p.logger.Levelf(log.Debug, "is mutually complete; dropping") p.drop() } @@ -1073,17 +1113,26 @@ func getPeerConnSlice(cap int) []*PeerConn { } } -// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad -// connection is one that usually sends us unwanted pieces, or has been in the worse half of the -// established connections for more than a minute. This is O(n log n). If there was a way to not -// consider the position of a conn relative to the total number, it could be reduced to O(n). -func (t *Torrent) worstBadConn() (ret *PeerConn) { - wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))} - defer peerConnSlices.Put(wcs.conns) - wcs.initKeys() +// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as +// this is a frequent occurrence. +func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) { + sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns))) + f(sl) + peerConnSlices.Put(sl) +} + +func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn { + wcs := worseConnSlice{conns: sl} + wcs.initKeys(opts) heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*PeerConn) + if opts.incomingIsBad && !c.outgoing { + return c + } + if opts.outgoingIsBad && c.outgoing { + return c + } if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() { return c } @@ -1099,6 +1148,17 @@ func (t *Torrent) worstBadConn() (ret *PeerConn) { return nil } +// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad +// connection is one that usually sends us unwanted pieces, or has been in the worse half of the +// established connections for more than a minute. This is O(n log n). If there was a way to not +// consider the position of a conn relative to the total number, it could be reduced to O(n). +func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) { + t.withUnclosedConns(func(ucs []*PeerConn) { + ret = t.worstBadConnFromSlice(opts, ucs) + }) + return +} + type PieceStateChange struct { Index int PieceState @@ -1225,7 +1285,7 @@ func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) { // Returns the range of pieces [begin, end) that contains the extent of bytes. func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { - if off >= *t.length { + if off >= t.length() { return } if off < 0 { @@ -1286,6 +1346,15 @@ func (t *Torrent) numReceivedConns() (ret int) { return } +func (t *Torrent) numOutgoingConns() (ret int) { + for c := range t.conns { + if c.outgoing { + ret++ + } + } + return +} + func (t *Torrent) maxHalfOpen() int { // Note that if we somehow exceed the maximum established conns, we want // the negative value to have an effect. @@ -1293,13 +1362,16 @@ func (t *Torrent) maxHalfOpen() int { extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2) // We want to allow some experimentation with new peers, and to try to // upset an oversupply of received connections. - return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent))) + return int(min( + max(5, extraIncoming)+establishedHeadroom, + int64(t.cl.config.HalfOpenConnsPerTorrent), + )) } func (t *Torrent) openNewConns() (initiated int) { defer t.updateWantPeersEvent() for t.peers.Len() != 0 { - if !t.wantConns() { + if !t.wantOutgoingConns() { return } if len(t.halfOpen) >= t.maxHalfOpen() { @@ -1312,7 +1384,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -1442,7 +1522,7 @@ func (t *Torrent) bytesCompleted() int64 { if !t.haveInfo() { return 0 } - return *t.length - t.bytesLeft() + return t.length() - t.bytesLeft() } func (t *Torrent) SetInfoBytes(b []byte) (err error) { @@ -1490,7 +1570,7 @@ func (t *Torrent) decPeerPieceAvailability(p *Peer) { } func (t *Torrent) assertPendingRequests() { - if !check { + if !check.Enabled { return } // var actual pendingRequests @@ -1525,7 +1605,7 @@ func (t *Torrent) wantPeers() bool { if t.peers.Len() > t.cl.config.TorrentPeersLowWater { return false } - return t.wantConns() + return t.wantOutgoingConns() } func (t *Torrent) updateWantPeersEvent() { @@ -1567,18 +1647,23 @@ func (t *Torrent) onWebRtcConn( DataChannelContext: dcc, } peerRemoteAddr := netConn.RemoteAddr() + //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr) if t.cl.badPeerAddr(peerRemoteAddr) { return } + localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, t, - dcc.LocalOffered, false, - netConn.RemoteAddr(), - webrtcNetwork, - fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + newConnectionOpts{ + outgoing: dcc.LocalOffered, + remoteAddr: peerRemoteAddr, + localPublicAddr: localAddrIpPort, + network: webrtcNetwork, + connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + }, ) if err != nil { t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err) @@ -1594,14 +1679,14 @@ func (t *Torrent) onWebRtcConn( defer t.cl.unlock() err = t.cl.runHandshookConn(pc, t) if err != nil { - t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err) + t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err) } } func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) { err := t.cl.runHandshookConn(pc, t) if err != nil || logAll { - t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err) + t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err) } } @@ -1610,11 +1695,10 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { } func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer { - wtc, release := t.cl.websocketTrackers.Get(u.String()) - go func() { - <-t.closed.Done() - release() - }() + wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash) + // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for + // the same info hash before the old one is cleaned up. + t.onClose = append(t.onClose, release) wst := websocketTrackerStatus{u, wtc} go func() { err := wtc.Announce(tracker.Started, t.infoHash) @@ -1634,10 +1718,9 @@ func (t *Torrent) startScrapingTracker(_url string) { } u, err := url.Parse(_url) if err != nil { - // URLs with a leading '*' appear to be a uTorrent convention to - // disable trackers. + // URLs with a leading '*' appear to be a uTorrent convention to disable trackers. if _url[0] != '*' { - log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger) + t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err) } return } @@ -1707,7 +1790,7 @@ func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceR Event: event, NumWant: func() int32 { if t.wantPeers() && len(t.cl.dialers) > 0 { - return -1 + return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764 } else { return 0 } @@ -1794,7 +1877,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) { // We're also announcing ourselves as a listener, so we don't just want peer addresses. // TODO: We can include the announce_peer step depending on whether we can receive // inbound connections. We should probably only announce once every 15 mins too. - if !t.wantConns() { + if !t.wantAnyConns() { goto wait } // TODO: Determine if there's a listener on the port we're announcing. @@ -1912,9 +1995,16 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } } if len(t.conns) >= t.maxEstablishedConns { - c := t.worstBadConn() + numOutgoing := t.numOutgoingConns() + numIncoming := len(t.conns) - numOutgoing + c := t.worstBadConn(worseConnLensOpts{ + // We've already established that we have too many connections at this point, so we just + // need to match what kind we have too many of vs. what we're trying to add now. + incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing, + outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing, + }) if c == nil { - return errors.New("don't want conns") + return errors.New("don't want conn") } c.close() t.deletePeerConn(c) @@ -1923,13 +2013,15 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { panic(len(t.conns)) } t.conns[c] = struct{}{} + t.cl.event.Broadcast() + // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { - t.pex.Add(c) // as no further extended handshake expected + t.pex.Add(c) } return nil } -func (t *Torrent) wantConns() bool { +func (t *Torrent) newConnsAllowed() bool { if !t.networkingEnabled.Bool() { return false } @@ -1939,7 +2031,48 @@ func (t *Torrent) wantConns() bool { if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { return false } - return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil + return true +} + +func (t *Torrent) wantAnyConns() bool { + if !t.networkingEnabled.Bool() { + return false + } + if t.closed.IsSet() { + return false + } + if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { + return false + } + return len(t.conns) < t.maxEstablishedConns +} + +func (t *Torrent) wantOutgoingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1, + outgoingIsBad: false, + }) != nil +} + +func (t *Torrent) wantIncomingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: false, + outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1, + }) != nil } func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { @@ -1952,7 +2085,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { return true }), } - wcs.initKeys() + wcs.initKeys(worseConnLensOpts{}) heap.Init(&wcs) for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { t.dropConnection(heap.Pop(&wcs).(*PeerConn)) @@ -2003,10 +2136,14 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { c._stats.incrementPiecesDirtiedGood() } t.clearPieceTouchers(piece) + hasDirty := p.hasDirtyChunks() t.cl.unlock() + if hasDirty { + p.Flush() // You can be synchronous here! + } err := p.Storage().MarkComplete() if err != nil { - t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err) + t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) } t.cl.lock() @@ -2051,7 +2188,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] if len(bannableTouchers) != 1 { - t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) } else { // Turns out it's still useful to ban peers like this because if there's only a // single peer for a piece, and we never progress that piece to completion, we @@ -2082,7 +2219,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) - t.maybeDropMutuallyCompletePeer(&conn.Peer) + t.maybeDropMutuallyCompletePeer(conn) } } @@ -2114,7 +2251,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { } func (t *Torrent) tryCreateMorePieceHashers() { - for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() { + for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { } } @@ -2153,7 +2290,7 @@ func (t *Torrent) dropBannedPeers() { t.iterPeers(func(p *Peer) { remoteIp := p.remoteIp() if remoteIp == nil { - if p.bannableAddr.Ok() { + if p.bannableAddr.Ok { t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p) } return @@ -2236,8 +2373,45 @@ func (t *Torrent) VerifyData() { } } -// Start the process of connecting to the given peer for the given torrent if appropriate. -func (t *Torrent) initiateConn(peer PeerInfo) { +func (t *Torrent) connectingToPeerAddr(addrStr string) bool { + return len(t.halfOpen[addrStr]) != 0 +} + +func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool { + addrStr := x.String() + for c := range t.conns { + ra := c.RemoteAddr + if ra.String() == addrStr { + return true + } + } + return false +} + +func (t *Torrent) getHalfOpenPath( + addrStr string, + attemptKey outgoingConnAttemptKey, +) nestedmaps.Path[*PeerInfo] { + return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey) +} + +func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { + path := t.getHalfOpenPath(addrStr, attemptKey) + if path.Exists() { + panic("should be unique") + } + path.Set(attemptKey) + t.cl.numHalfOpen++ +} + +// Start the process of connecting to the given peer for the given torrent if appropriate. I'm not +// sure all the PeerInfo fields are being used. +func initiateConn( + opts outgoingConnOpts, + ignoreLimits bool, +) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2245,12 +2419,21 @@ func (t *Torrent) initiateConn(peer PeerInfo) { return } addr := peer.Addr - if t.addrActive(addr.String()) { + addrStr := addr.String() + if !ignoreLimits { + if t.connectingToPeerAddr(addrStr) { + return + } + } + if t.hasPeerConnForAddr(addr) { return } - t.cl.numHalfOpen++ - t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) + attemptKey := &peer + t.addHalfOpen(addrStr, attemptKey) + go t.cl.outgoingConnection( + opts, + attemptKey, + ) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2271,7 +2454,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // connection. func (t *Torrent) allStats(f func(*ConnStats)) { f(&t.stats) - f(&t.cl.stats) + f(&t.cl.connStats) } func (t *Torrent) hashingPiece(i pieceIndex) bool { @@ -2409,7 +2592,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) { }, }, activeRequests: make(map[Request]webseed.Request, maxRequests), - maxRequests: maxRequests, } ws.peer.initRequestState() for _, opt := range opts { @@ -2467,8 +2649,7 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer { } // TODO: This is a check that an old invariant holds. It can be removed after some testing. //delete(t.pendingRequests, r) - var zeroRequestState requestState - if t.requestState[r] != zeroRequestState { + if _, ok := t.requestState[r]; ok { panic("expected request state to be gone") } return p @@ -2508,7 +2689,217 @@ func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex { return pieceIndex(ri / t.chunksPerRegularPiece()) } +func (t *Torrent) iterUndirtiedRequestIndexesInPiece( + reuseIter *typedRoaring.Iterator[RequestIndex], + piece pieceIndex, + f func(RequestIndex), +) { + reuseIter.Initialize(&t.dirtyChunks) + pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece) + iterBitmapUnsetInRange( + reuseIter, + pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece), + f, + ) +} + type requestState struct { peer *Peer when time.Time } + +// Returns an error if a received chunk is out of bounds in someway. +func (t *Torrent) checkValidReceiveChunk(r Request) error { + if !t.haveInfo() { + return errors.New("torrent missing info") + } + if int(r.Index) >= t.numPieces() { + return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces()) + } + pieceLength := t.pieceLength(pieceIndex(r.Index)) + if r.Begin >= pieceLength { + return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength) + } + // We could check chunk lengths here, but chunk request size is not changed often, and tricky + // for peers to manipulate as they need to send potentially large buffers to begin with. There + // should be considerable checks elsewhere for this case due to the network overhead. We should + // catch most of the overflow manipulation stuff by checking index and begin above. + return nil +} + +func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) { + for pc := range t.conns { + dialAddr, err := pc.remoteDialAddrPort() + if err != nil { + continue + } + if dialAddr != target { + continue + } + ret = append(ret, pc) + } + return +} + +func wrapUtHolepunchMsgForPeerConn( + recipient *PeerConn, + msg utHolepunch.Msg, +) pp.Message { + extendedPayload, err := msg.MarshalBinary() + if err != nil { + panic(err) + } + return pp.Message{ + Type: pp.Extended, + ExtendedID: MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName), + ExtendedPayload: extendedPayload, + } +} + +func sendUtHolepunchMsg( + pc *PeerConn, + msgType utHolepunch.MsgType, + addrPort netip.AddrPort, + errCode utHolepunch.ErrCode, +) { + holepunchMsg := utHolepunch.Msg{ + MsgType: msgType, + AddrPort: addrPort, + ErrCode: errCode, + } + incHolepunchMessagesSent(holepunchMsg) + ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg) + pc.write(ppMsg) +} + +func incHolepunchMessages(msg utHolepunch.Msg, verb string) { + torrent.Add( + fmt.Sprintf( + "holepunch %v %v messages %v", + msg.MsgType, + addrPortProtocolStr(msg.AddrPort), + verb, + ), + 1, + ) +} + +func incHolepunchMessagesReceived(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "received") +} + +func incHolepunchMessagesSent(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "sent") +} + +func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { + incHolepunchMessagesReceived(msg) + switch msg.MsgType { + case utHolepunch.Rendezvous: + t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) + sendMsg := sendUtHolepunchMsg + senderAddrPort, err := sender.remoteDialAddrPort() + if err != nil { + sender.logger.Levelf( + log.Warning, + "error getting ut_holepunch rendezvous sender's dial address: %v", + err, + ) + // There's no better error code. The sender's address itself is invalid. I don't see + // this error message being appropriate anywhere else anyway. + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer) + } + targets := t.peerConnsWithDialAddrPort(msg.AddrPort) + if len(targets) == 0 { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) + return nil + } + for _, pc := range targets { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport) + continue + } + sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) + sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0) + } + return nil + case utHolepunch.Connect: + holepunchAddr := msg.AddrPort + t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender) + if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr) + if g.MapContains(t.cl.accepted, holepunchAddr) { + setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) + return nil + case utHolepunch.Error: + torrent.Add("holepunch error messages received", 1) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + return nil + default: + return fmt.Errorf("unhandled msg type %v", msg.MsgType) + } +} + +func addrPortProtocolStr(addrPort netip.AddrPort) string { + addr := addrPort.Addr() + switch { + case addr.Is4(): + return "ipv4" + case addr.Is6(): + return "ipv6" + default: + panic(addrPort) + } +} + +func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 + for pc := range t.conns { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + continue + } + if pc.supportsExtension(pp.ExtensionNamePex) { + if !g.MapContains(pc.pex.remoteLiveConns, addrPort) { + continue + } + } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) + sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) + rzsSent++ + } + if rzsSent == 0 { + return errors.New("no eligible relays") + } + return nil +} + +func (t *Torrent) numHalfOpenAttempts() (num int) { + for _, attempts := range t.halfOpen { + num += len(attempts) + } + return +} + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +}