.github/workflows/go.yml | 1 + atomic-count.go | 6 ++++-- client-peerconn_test.go | 5 ++--- client-stats.go | 2 +- client.go | 263 ++++++++++++++++++++++++++++++++--------------------- cmd/torrent/main.go | 15 +-------------- cmd/torrent2/main.go | 64 +++++++++++++++++++++++++++++++++++++++++------------ config.go | 42 +++++++++++++++++++++++++++++++++--------- conn-stats.go | 8 ++++++-- deferrwl.go | 16 +++++++++------- deferrwl_test.go | 1 + env.go | 27 +++++++++++++++++++++++++++ errors.go | 8 ++++++++ fmt.go | 14 -------------- fs/cmd/torrentfs/main.go | 2 ++ fs/file_handle.go | 2 ++ fs/filenode.go | 2 ++ fs/stream-sintel_test.go | 12 ++++++++++-- fs/torrentfs.go | 2 ++ fs/torrentfs_test.go | 2 ++ go.mod | 11 ++++++----- go.sum | 20 ++++++++++---------- internal/request-strategy/ajwerner-btree.go | 5 +++++ internal/request-strategy/order.go | 29 +++++++++++++---------------- internal/request-strategy/piece-request-order.go | 12 +++++++++++- internal/request-strategy/tidwall-btree.go | 5 +++++ issue97_test.go | 6 ++---- justfile | 4 ++-- math.go | 5 +++++ metainfo/info.go | 2 -- misc.go | 10 ++++++---- misc_test.go | 18 ------------------ mmap-span/mmap-span.go | 33 ++++++++++++--------------------- peer-impl.go | 3 ++- peer.go | 97 ++++++++++++++++++++++++++--------------------------- peerconn.go | 84 ++++++++++++++++++++++++++++++++++++++++++++--------- peerconn_test.go | 18 ++++++++++-------- piece.go | 49 +++++++++++++++++++++++++++++++++++++------------ rate.go | 24 +++++++++++++++++++----- ratelimitreader.go | 18 +++++++++++++++++- request-strategy-impls.go | 3 ++- request-strategy-impls_test.go | 2 +- requesting.go | 46 +++++++++++++++++++++++++++++++--------------- segments/index.go | 73 +++++++++++++++++++++-------------------------------- segments/segments.go | 75 +++++++++++++++++++++-------------------------------- segments/segments_test.go | 75 +++++++++++++++++++++++++++++++++++++++++------------ smartban.go | 1 + sources.go | 13 ++++++++----- spec.go | 4 ---- storage/file-client.go | 26 +++++++++++++++++++++++++- storage/file-io-classic.go | 32 ++++++++++++++++++++++++++++++++ storage/file-io-common.go | 35 +++++++++++++++++++++++++++++++++++ storage/file-io-mmap.go | 220 +++++++++++++++++++++++++++++++++++++++++++++++++++++ storage/file-io.go | 25 +++++++++++++++++++++++++ storage/file-misc.go | 16 ++++++++-------- storage/file-piece.go | 207 +++++++++++++++++++++++++++++++++-------------------- storage/file-torrent-io.go | 85 ++++++++++++++++++++++++----------------------------- storage/file-torrent.go | 49 ++++++++++++++++++++++++++----------------------- storage/interface.go | 5 ++++- storage/mmap.go | 34 ++++++++++++++++------------------ storage/sys_unix.go | 24 ++++++++++++++++++++++++ storage/sys_windows.go | 17 +++++++++++++++++ storage/wrappers.go | 11 ++++++++--- t.go | 19 +++++++++++-------- test_test.go | 7 ++++--- tests/issue-930/server.go | 1 - time_test.go | 1 + torrent-piece-request-order.go | 1 + torrent-stats.go | 17 +++++++++++++++-- torrent.go | 448 +++++++++++++++++++++++++++++++++++++---------------- torrent_test.go | 15 ++++++++------- webseed-peer.go | 146 +++++++++++++++++++++++++++++++++++------------------ webseed-request.go | 8 +++++--- webseed-requesting.go | 436 +++++++++++++++++++++++++++++++++++------------------ webseed/client.go | 129 ++++++++++++++++++++++++++++++++++------------------- webseed/request.go | 2 +- zero-reader.go | 1 + diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index aec41f05e2df78ac36c0ea58954b868d59531077..1cced5fabee8004c825ebdd93f206c1a82bfbb27 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -4,6 +4,7 @@ on: [push, pull_request] jobs: test: + timeout-minutes: 20 runs-on: ${{ matrix.os }} strategy: matrix: diff --git a/atomic-count.go b/atomic-count.go index 415f427a6d5f2496f9f0b47d635e6b1063022a2b..6a18b8ff3b7bf9d745f2cd3eb642fbb1c7427e6f 100644 --- a/atomic-count.go +++ b/atomic-count.go @@ -31,9 +31,11 @@ } // TODO: Can this use more generics to speed it up? Should we be checking the field types? func copyCountFields[T any](src *T) (dst T) { + srcValue := reflect.ValueOf(src).Elem() + dstValue := reflect.ValueOf(&dst).Elem() for i := 0; i < reflect.TypeFor[T]().NumField(); i++ { - n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64() - reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n) + n := srcValue.Field(i).Addr().Interface().(*Count).Int64() + dstValue.Field(i).Addr().Interface().(*Count).Add(n) } return } diff --git a/client-peerconn_test.go b/client-peerconn_test.go index fa1cff08074c24085d0070dc91fb68516cdff610..372b4af4dfbd60aa51e8252cd90046932bae573c 100644 --- a/client-peerconn_test.go +++ b/client-peerconn_test.go @@ -1,6 +1,7 @@ package torrent import ( + "cmp" "io" "os" "testing" @@ -90,9 +91,7 @@ cfg := TestingConfig(t) cfg.Seed = true // Some test instances don't like this being on, even when there's no cache involved. cfg.DropMutuallyCompletePeers = false - if ps.SeederUploadRateLimiter != nil { - cfg.UploadRateLimiter = ps.SeederUploadRateLimiter - } + cfg.UploadRateLimiter = cmp.Or(ps.SeederUploadRateLimiter, cfg.UploadRateLimiter) cfg.DataDir = greetingTempDir if ps.ConfigureSeeder.Config != nil { ps.ConfigureSeeder.Config(cfg) diff --git a/client-stats.go b/client-stats.go index fb080dbb2bf87530e845d17e809aa287041a8d75..48711e781dfb983c48c52ab87b5a634b23af1c40 100644 --- a/client-stats.go +++ b/client-stats.go @@ -39,7 +39,7 @@ NumPeersProbablyOnlyConnectedDueToHolepunch int } func (cl *Client) statsLocked() (stats ClientStats) { - stats.ConnStats = copyCountFields(&cl.connStats) + stats.AllConnStats = cl.connStats.Copy() stats.TorrentStatCounters = copyCountFields(&cl.counters) for t := range cl.torrents { stats.TorrentGauges.Add(t.gauges()) diff --git a/client.go b/client.go index de598591141ecc97c7f27aff5e3e8091f6e9da3a..99494b2ce5a4fb969c0de5443d0dc64c89d604e2 100644 --- a/client.go +++ b/client.go @@ -16,10 +16,12 @@ "math" "net" "net/http" "net/netip" + "runtime" "slices" "strconv" "time" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -51,19 +53,21 @@ infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "github.com/anacrolix/torrent/webtorrent" ) -const webseedRequestUpdateTimerInterval = time.Second +const webseedRequestUpdateTimerInterval = 5 * time.Second // Clients contain zero or more Torrents. A Client manages a blocklist, the // TCP/UDP protocol ports, and DHT as desired. type Client struct { // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of // fields. See #262. - connStats ConnStats + connStats AllConnStats counters TorrentStatCounters - _mu lockWithDeferreds - event sync.Cond - closed chansync.SetOnce + _mu lockWithDeferreds + // Used in constrained situations when the lock is held. + roaringIntIterator roaring.IntIterator + event sync.Cond + closed chansync.SetOnce config *ClientConfig logger log.Logger @@ -85,7 +89,8 @@ badPeerIPs map[netip.Addr]struct{} // All Torrents once. torrents map[*Torrent]struct{} // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the - // info has been obtained, there's no knowing if an infohash belongs to v1 or v2. + // info has been obtained, there's no knowing if an infohash belongs to v1 or v2. TODO: Make + // this a weak pointer. torrentsByShortHash map[InfoHash]*Torrent // Piece request orderings grouped by storage. Value is value type because all fields are @@ -99,7 +104,8 @@ websocketTrackers websocketTrackers numWebSeedRequests map[webseedHostKeyHandle]int activeAnnounceLimiter limiter.Instance - httpClient *http.Client + // TODO: Move this onto ClientConfig. + httpClient *http.Client clientHolepunchAddrSets @@ -107,7 +113,11 @@ defaultLocalLtepProtocolMap LocalLtepProtocolMap upnpMappings []*upnpMapping - webseedRequestTimer *time.Timer + webseedRequestTimer *time.Timer + webseedUpdateReason updateRequestReason + activeWebseedRequests map[webseedUniqueRequestKey]*webseedRequest + + activePieceHashers int } type ipStr string @@ -240,6 +250,7 @@ // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil. func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg + cfg.setRateLimiterBursts() g.MakeMap(&cl.dopplegangerAddrs) g.MakeMap(&cl.torrentsByShortHash) g.MakeMap(&cl.torrents) @@ -259,6 +270,7 @@ // to other uses of HTTP from the client. MaxConnsPerHost: 10, } } + cfg.MetainfoSourcesClient = cmp.Or(cfg.MetainfoSourcesClient, cl.httpClient) cl.defaultLocalLtepProtocolMap = makeBuiltinLtepProtocols(!cfg.DisablePEX) g.MakeMap(&cl.numWebSeedRequests) } @@ -270,42 +282,83 @@ if cfg == nil { cfg = NewDefaultClientConfig() cfg.ListenPort = 0 } - cfg.setRateLimiterBursts() cl = &Client{} cl.init(cfg) - go cl.acceptLimitClearer() - cl.initLogger() - //cl.logger.Levelf(log.Critical, "test after init") + // Belongs after infallible init defer func() { if err != nil { cl.Close() cl = nil } }() + // Infallible init. Belongs in separate function. + { + go cl.acceptLimitClearer() + cl.initLogger() + //cl.logger.Levelf(log.Critical, "test after init") - storageImpl := cfg.DefaultStorage - if storageImpl == nil { - // We'd use mmap by default but HFS+ doesn't support sparse files. - storageImplCloser := storage.NewFile(cfg.DataDir) - cl.onClose = append(cl.onClose, func() { - if err := storageImplCloser.Close(); err != nil { - cl.logger.Printf("error closing default storage: %s", err) + storageImpl := cfg.DefaultStorage + if storageImpl == nil { + // We'd use mmap by default but HFS+ doesn't support sparse files. + storageImplCloser := storage.NewFile(cfg.DataDir) + cl.onClose = append(cl.onClose, func() { + if err := storageImplCloser.Close(); err != nil { + cl.logger.Printf("error closing default storage: %s", err) + } + }) + storageImpl = storageImplCloser + } + cl.defaultStorage = storage.NewClient(storageImpl) + + if cfg.PeerID != "" { + missinggo.CopyExact(&cl.peerID, cfg.PeerID) + } else { + o := copy(cl.peerID[:], cfg.Bep20) + _, err = rand.Read(cl.peerID[o:]) + if err != nil { + panic("error generating peer id") } - }) - storageImpl = storageImplCloser - } - cl.defaultStorage = storage.NewClient(storageImpl) + } - if cfg.PeerID != "" { - missinggo.CopyExact(&cl.peerID, cfg.PeerID) - } else { - o := copy(cl.peerID[:], cfg.Bep20) - _, err = rand.Read(cl.peerID[o:]) - if err != nil { - panic("error generating peer id") + cl.websocketTrackers = websocketTrackers{ + PeerId: cl.peerID, + Logger: cl.logger.WithNames("websocketTrackers"), + GetAnnounceRequest: func( + event tracker.AnnounceEvent, infoHash [20]byte, + ) ( + tracker.AnnounceRequest, error, + ) { + cl.lock() + defer cl.unlock() + t, ok := cl.torrentsByShortHash[infoHash] + if !ok { + return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") + } + return t.announceRequest(event, infoHash), nil + }, + Proxy: cl.config.HTTPProxy, + WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, + ICEServers: cl.ICEServers(), + DialContext: cl.config.TrackerDialContext, + callbacks: &cl.config.Callbacks, + OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { + cl.lock() + defer cl.unlock() + t, ok := cl.torrentsByShortHash[dcc.InfoHash] + if !ok { + cl.logger.WithDefaultLevel(log.Warning).Printf( + "got webrtc conn for unloaded torrent with infohash %x", + dcc.InfoHash, + ) + dc.Close() + return + } + go t.onWebRtcConn(dc, dcc) + }, } + + cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc) } - builtinListenNetworks := cl.listenNetworks() sockets, err := listenAll( builtinListenNetworks, @@ -352,45 +405,6 @@ } } } - cl.websocketTrackers = websocketTrackers{ - PeerId: cl.peerID, - Logger: cl.logger.WithNames("websocketTrackers"), - GetAnnounceRequest: func( - event tracker.AnnounceEvent, infoHash [20]byte, - ) ( - tracker.AnnounceRequest, error, - ) { - cl.lock() - defer cl.unlock() - t, ok := cl.torrentsByShortHash[infoHash] - if !ok { - return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") - } - return t.announceRequest(event, infoHash), nil - }, - Proxy: cl.config.HTTPProxy, - WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, - ICEServers: cl.ICEServers(), - DialContext: cl.config.TrackerDialContext, - callbacks: &cl.config.Callbacks, - OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { - cl.lock() - defer cl.unlock() - t, ok := cl.torrentsByShortHash[dcc.InfoHash] - if !ok { - cl.logger.WithDefaultLevel(log.Warning).Printf( - "got webrtc conn for unloaded torrent with infohash %x", - dcc.InfoHash, - ) - dc.Close() - return - } - go t.onWebRtcConn(dc, dcc) - }, - } - - cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc) - err = cl.checkConfig() return } @@ -499,19 +513,21 @@ } // Stops the client. All connections to peers are closed and all activity will come to a halt. func (cl *Client) Close() (errs []error) { + // Close atomically, allow systems to break early if we're contended on the Client lock. + cl.closed.Set() + cl.webseedRequestTimer.Stop() var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() for t := range cl.torrents { - err := t.close(&closeGroup) - if err != nil { - errs = append(errs, err) - } + cl.dropTorrent(t, &closeGroup) } + // Can we not modify cl.torrents as we delete from it? + panicif.NotZero(len(cl.torrents)) + panicif.NotZero(len(cl.torrentsByShortHash)) cl.clearPortMappings() for i := range cl.onClose { cl.onClose[len(cl.onClose)-1-i]() } - cl.closed.Set() cl.unlock() cl.event.Broadcast() closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock() @@ -1289,7 +1305,7 @@ } piece := d.Piece switch d.Type { case pp.DataMetadataExtensionMsgType: - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead })) if !c.requestedMetadataPiece(piece) { return fmt.Errorf("got unexpected piece %d", piece) } @@ -1422,6 +1438,7 @@ if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize } t.setChunkSize(opts.ChunkSize) + cl.torrents[t] = struct{}{} return } @@ -1492,7 +1509,6 @@ go t.dhtAnnouncer(s) } }) cl.torrentsByShortHash[infoHash] = t - cl.torrents[t] = struct{}{} t.setInfoBytesLocked(opts.InfoBytes) cl.clearAcceptLimits() t.updateWantPeersEvent() @@ -1505,14 +1521,19 @@ type AddTorrentOpts struct { InfoHash infohash.T InfoHashV2 g.Option[infohash_v2.T] Storage storage.ClientImpl - ChunkSize pp.Integer - InfoBytes []byte + // Only applied for new torrents (check Client.AddTorrent* method bool return value). If 0, the + // default chunk size is used (16 KiB in current modern BitTorrent clients). + ChunkSize pp.Integer + InfoBytes []byte // Don't hash data if piece completion is missing. This is useful for very large torrents that // are dropped in place from an external source and trigger a lot of initial piece checks. DisableInitialPieceCheck bool // Require pieces to be checked as soon as info is available. This is because we have no way to // schedule an initial check only, and don't want to race against use of Torrent.Complete. IgnoreUnverifiedPieceCompletion bool + // Whether to initially allow data download or upload + DisallowDataUpload bool + DisallowDataDownload bool } // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also @@ -1520,11 +1541,8 @@ // Torrent.MergeSpec. func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) { t, new = cl.AddTorrentOpt(spec.AddTorrentOpts) modSpec := *spec - if new { - // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing - // it. - modSpec.ChunkSize = 0 - } + // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing it. + modSpec.ChunkSize = 0 err = t.MergeSpec(&modSpec) if err != nil && new { t.Drop() @@ -1533,8 +1551,9 @@ return } // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set. -// spec.DisallowDataDownload/Upload will be read and applied // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored. +// Many fields in the AddTorrentOpts field in TorrentSpec are ignored because the Torrent is already +// added. func (t *Torrent) MergeSpec(spec *TorrentSpec) error { if spec.DisplayName != "" { t.SetDisplayName(spec.DisplayName) @@ -1566,18 +1585,11 @@ panic("chunk size cannot be changed for existing Torrent") } t.addTrackers(spec.Trackers) t.maybeNewConns() - t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload) - t.dataUploadDisallowed = spec.DisallowDataUpload return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...) } -func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) { - t.eachShortInfohash(func(short [20]byte) { - delete(cl.torrentsByShortHash, short) - }) - err = t.close(wg) - delete(cl.torrents, t) - return +func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) { + t.close(wg) } func (cl *Client) allTorrentsCompleted() bool { @@ -1614,6 +1626,7 @@ return cl.torrentsAsSlice() } func (cl *Client) torrentsAsSlice() (ret []*Torrent) { + ret = make([]*Torrent, 0, len(cl.torrents)) for t := range cl.torrents { ret = append(ret, t) } @@ -1702,6 +1715,7 @@ panic(opts.remoteAddr) } c = &PeerConn{ Peer: Peer{ + cl: cl, outgoing: opts.outgoing, choking: true, peerChoking: true, @@ -1728,10 +1742,7 @@ c.legacyPeerImpl = c c.peerImpl = c c.setPeerLoggers(cl.logger, cl.slogger) c.setRW(connStatsReadWriter{nc, c}) - c.r = &rateLimitedReader{ - l: cl.config.DownloadRateLimiter, - r: c.r, - } + c.r = cl.newDownloadRateLimitedReader(c.r) c.logger.Levelf( log.Debug, "inited with remoteAddr %v network %v outgoing %t", @@ -1743,6 +1754,17 @@ } return } +func (cl *Client) newDownloadRateLimitedReader(r io.Reader) io.Reader { + if cl.config.DownloadRateLimiter == nil { + return r + } + // Why if the limit is Inf? Because it can be dynamically adjusted. + return rateLimitedReader{ + l: cl.config.DownloadRateLimiter, + r: r, + } +} + func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) { cl.lock() defer cl.unlock() @@ -1913,9 +1935,9 @@ return ICEServers } // Returns connection-level aggregate connStats at the Client level. See the comment on -// TorrentStats.ConnStats. +// TorrentStats.ConnStats. You probably want Client.Stats() instead. func (cl *Client) ConnStats() ConnStats { - return cl.connStats.Copy() + return cl.connStats.ConnStats.Copy() } func (cl *Client) Stats() ClientStats { @@ -1926,12 +1948,12 @@ } func (cl *Client) underWebSeedHttpRequestLimit(key webseedHostKeyHandle) bool { panicif.Zero(key) - return cl.numWebSeedRequests[key] < defaultRequestsPerWebseedHost + return cl.numWebSeedRequests[key] < webseedHostRequestConcurrency } // Check for bad arrangements. This is a candidate for an error state check method. func (cl *Client) checkConfig() error { - if cl.config.DownloadRateLimiter.Limit() == 0 { + if EffectiveDownloadRateLimit(cl.config.DownloadRateLimiter) == 0 { if len(cl.dialers) != 0 { return errors.New("download rate limit is zero, but dialers are set") } @@ -1941,3 +1963,40 @@ } } return nil } + +var maxActivePieceHashers = initIntFromEnv("TORRENT_MAX_ACTIVE_PIECE_HASHERS", runtime.NumCPU(), 0) + +func (cl *Client) maxActivePieceHashers() int { + return maxActivePieceHashers +} + +func (cl *Client) belowMaxActivePieceHashers() bool { + return cl.activePieceHashers < cl.maxActivePieceHashers() +} + +func (cl *Client) canStartPieceHashers() bool { + return cl.belowMaxActivePieceHashers() +} + +func (cl *Client) startPieceHashers() { + if !cl.canStartPieceHashers() { + return + } + ts := make([]*Torrent, 0, len(cl.torrents)) + for t := range cl.torrents { + if !t.considerStartingHashers() { + continue + } + ts = append(ts, t) + } + // Sort largest torrents first, as those are preferred by webseeds, and will cause less thrashing. + slices.SortFunc(ts, func(a, b *Torrent) int { + return -cmp.Compare(a.length(), b.length()) + }) + for _, t := range ts { + t.startPieceHashers() + if !cl.canStartPieceHashers() { + break + } + } +} diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index c5804490ebc70ad26540c5d29c3eff738b0a1f2a..087a0aa2e4b4b3a1ef46b286ee1ba31104340aa5 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -9,17 +9,15 @@ "io" stdLog "log" "net/http" "os" - "time" "github.com/anacrolix/bargle" "github.com/anacrolix/envpprof" app "github.com/anacrolix/gostdapp" - "github.com/anacrolix/log" xprometheus "github.com/anacrolix/missinggo/v2/prometheus" + "github.com/davecgh/go-spew/spew" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/otel/sdk/trace" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/version" @@ -29,17 +27,6 @@ func init() { stdLog.SetFlags(stdLog.Flags() | stdLog.Lshortfile) prometheus.MustRegister(xprometheus.NewExpvarCollector()) http.Handle("/metrics", promhttp.Handler()) -} - -func shutdownTracerProvider(ctx context.Context, tp *trace.TracerProvider) { - started := time.Now() - err := tp.Shutdown(ctx) - elapsed := time.Since(started) - logger := log.Default.Slogger() - logger.Debug("shutting down tracer provider", "took", elapsed) - if err != nil && ctx.Err() == nil { - log.Default.Slogger().Error("error shutting down tracer provider", "err", err) - } } func main() { diff --git a/cmd/torrent2/main.go b/cmd/torrent2/main.go index 223c3fba189efed5c07d667d5e6c099ff36aca4e..ff26dea588af7a737a02fc460104d5d159104a7c 100644 --- a/cmd/torrent2/main.go +++ b/cmd/torrent2/main.go @@ -4,20 +4,18 @@ // here. package main import ( + "errors" "fmt" "io" "os" + "github.com/anacrolix/bargle/v2" "github.com/anacrolix/log" "github.com/anacrolix/torrent/merkle" "github.com/anacrolix/torrent/metainfo" ) -type argError struct { - err error -} - func assertOk(err error) { if err != nil { panic(err) @@ -29,12 +27,37 @@ panic(str) } func main() { - args := os.Args[1:] - map[string]func(){ + err := mainErr() + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func mainErr() error { + p := bargle.NewParser() + defer p.DoHelpIfHelping() + runMap := func(m map[string]func()) { + for key, value := range m { + if p.Parse(bargle.Keyword(key)) { + value() + return + } + } + p.Fail() + } + parseFileName := func() (ret string) { + if p.Parse(bargle.Positional("file", bargle.BuiltinUnmarshaler(&ret))) { + return + } + p.SetError(errors.New("file not specified")) + panic(p.Fail()) + } + runMap(map[string]func(){ "metainfo": func() { - map[string]func(){ + runMap(map[string]func(){ "validate-v2": func() { - mi, err := metainfo.LoadFromFile(args[2]) + mi, err := metainfo.LoadFromFile(parseFileName()) assertOk(err) info, err := mi.UnmarshalInfo() assertOk(err) @@ -45,26 +68,37 @@ err = metainfo.ValidatePieceLayers(mi.PieceLayers, &info.FileTree, info.PieceLength) assertOk(err) }, "pprint": func() { - mi, err := metainfo.LoadFromFile(args[2]) + mi, err := metainfo.LoadFromFile(parseFileName()) assertOk(err) info, err := mi.UnmarshalInfo() assertOk(err) + fmt.Printf("name: %q\n", info.Name) + fmt.Printf("# files:\n") files := info.UpvertedFiles() pieceIndex := 0 for _, f := range files { numPieces := int((f.Length + info.PieceLength - 1) / info.PieceLength) endIndex := pieceIndex + numPieces + hash := "no v2 pieces root" + for a := range f.PiecesRoot.Iter() { + hash = a.HexString() + } fmt.Printf( - "%x: %q: pieces (%v-%v)\n", - f.PiecesRoot.Unwrap(), - f.BestPath(), + "%s: %v: pieces (%v-%v)\n", + hash, + func() any { + if info.IsDir() { + return fmt.Sprintf("%q", f.BestPath()) + } + return "(single file torrent)" + }(), pieceIndex, endIndex-1, ) pieceIndex = endIndex } }, - }[args[1]]() + }) }, "merkle": func() { h := merkle.NewHash() @@ -75,5 +109,7 @@ panic(err) } fmt.Printf("%x\n", h.Sum(nil)) }, - }[args[0]]() + }) + p.FailIfArgsRemain() + return p.Err() } diff --git a/config.go b/config.go index 05da2bf4d9a652ac9073742194445d78aebf7bf7..3424d7bc037af10a48e036e5dc179afa545f4800 100644 --- a/config.go +++ b/config.go @@ -12,10 +12,13 @@ "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" + "github.com/pion/webrtc/v4" "golang.org/x/time/rate" "github.com/anacrolix/torrent/iplist" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/mse" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/version" @@ -46,12 +49,13 @@ // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) } -// Probably not safe to modify this after it's given to a Client. +// Probably not safe to modify this after it's given to a Client, or to pass it to multiple Clients. type ClientConfig struct { ClientTrackerConfig ClientDhtConfig + MetainfoSourcesConfig - // Store torrent file data in this directory unless .DefaultStorage is + // Store torrent file data in this directory unless DefaultStorage is // specified. DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"` // The address to listen for new uTP and TCP BitTorrent protocol connections. DHT shares a UDP @@ -79,6 +83,8 @@ // Limiter's burst must be bigger than the largest Read performed on the underlying // rate-limiting io.Reader minus one. This is likely to be the larger of the main read loop // buffer (~4096), and the requested chunk size (~16KiB, see TorrentSpec.ChunkSize). If limit is // not Inf, and burst is left at 0, the implementation will choose a suitable burst. + // + // If the field is nil, no rate limiting is applied. And it can't be adjusted dynamically. DownloadRateLimiter *rate.Limiter // Maximum unverified bytes across all torrents. Not used if zero. MaxUnverifiedBytes int64 @@ -230,7 +236,6 @@ KeepAliveTimeout: time.Minute, MaxAllocPeerRequestDataPerConn: 1 << 20, ListenHost: func(string) string { return "" }, UploadRateLimiter: unlimited, - DownloadRateLimiter: unlimited, DisableAcceptRateLimiting: true, DropMutuallyCompletePeers: true, HeaderObfuscationPolicy: HeaderObfuscationPolicy{ @@ -251,6 +256,9 @@ cc.DhtStartingNodes = func(network string) dht.StartingNodesGetter { return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) } } cc.PeriodicallyAnnounceTorrentsToDht = true + cc.MetainfoSourcesMerger = func(t *Torrent, info *metainfo.MetaInfo) error { + return t.MergeSpec(TorrentSpecFromMetaInfo(info)) + } return cc } @@ -261,10 +269,26 @@ } func (cfg *ClientConfig) setRateLimiterBursts() { // What about chunk size? - setRateLimiterBurstIfZero(cfg.UploadRateLimiter, cfg.MaxAllocPeerRequestDataPerConn) - setRateLimiterBurstIfZero( - cfg.DownloadRateLimiter, - min( - int(cfg.DownloadRateLimiter.Limit()), - defaultDownloadRateLimiterBurst)) + if cfg.UploadRateLimiter.Burst() == 0 { + cfg.UploadRateLimiter.SetBurst(cfg.MaxAllocPeerRequestDataPerConn) + } + setDefaultDownloadRateLimiterBurstIfZero(cfg.DownloadRateLimiter) +} + +// Returns the download rate.Limit handling the special nil case. +func EffectiveDownloadRateLimit(l *rate.Limiter) rate.Limit { + if l == nil { + return rate.Inf + } + return l.Limit() +} + +type MetainfoSourcesConfig struct { + // Used for torrent metainfo sources only. Falls back to the http.Client created to wrap + // WebTransport. + MetainfoSourcesClient *http.Client + // If a sources successfully fetches metainfo, this function is called to apply the metainfo. t + // is provided to prevent a race as the fetcher for the source was bound to it. Returning an + // error will kill the respective sourcer. + MetainfoSourcesMerger func(t *Torrent, info *metainfo.MetaInfo) error } diff --git a/conn-stats.go b/conn-stats.go index 12e00287686a1f3c64728c5437504a2f9a392d12..cb51a1b0cc67fcca934791e0dd1f17674c73e8f2 100644 --- a/conn-stats.go +++ b/conn-stats.go @@ -55,12 +55,16 @@ cs.ChunksRead.Add(1) cs.BytesReadData.Add(size) } -func (cs *ConnStats) incrementPiecesDirtiedGood() { +func (cs *ConnStats) incrementPiecesDirtiedGood() bool { cs.PiecesDirtiedGood.Add(1) + // This method is used as an iterator and should never return early. + return true } -func (cs *ConnStats) incrementPiecesDirtiedBad() { +func (cs *ConnStats) incrementPiecesDirtiedBad() bool { cs.PiecesDirtiedBad.Add(1) + // This method is used as an iterator and should never return early. + return true } func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) { diff --git a/deferrwl.go b/deferrwl.go index 0af2e19675b2e67a3c579cb742c93be7e939d9e9..e3aa93b87d5892279393e12bd712058aba5c6187 100644 --- a/deferrwl.go +++ b/deferrwl.go @@ -5,6 +5,7 @@ "fmt" "reflect" g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/sync" ) @@ -16,15 +17,18 @@ internal sync.RWMutex unlockActions []func() uniqueActions map[any]struct{} // Currently unlocking, defers should not occur? - unlocking bool + allowDefers bool } func (me *lockWithDeferreds) Lock() { me.internal.Lock() + panicif.True(me.allowDefers) + me.allowDefers = true } func (me *lockWithDeferreds) Unlock() { - me.unlocking = true + panicif.False(me.allowDefers) + me.allowDefers = false startLen := len(me.unlockActions) var i int for i = 0; i < len(me.unlockActions); i++ { @@ -34,8 +38,7 @@ if i != len(me.unlockActions) { panic(fmt.Sprintf("num deferred changed while running: %v -> %v", startLen, len(me.unlockActions))) } me.unlockActions = me.unlockActions[:0] - clear(me.uniqueActions) - me.unlocking = false + me.uniqueActions = nil me.internal.Unlock() } @@ -49,19 +52,18 @@ } // Not allowed after unlock has started. func (me *lockWithDeferreds) Defer(action func()) { - if me.unlocking { - panic("defer called while unlocking") - } me.deferInner(action) } // Already guarded. func (me *lockWithDeferreds) deferInner(action func()) { + panicif.False(me.allowDefers) me.unlockActions = append(me.unlockActions, action) } // Protected from looping by once filter. func (me *lockWithDeferreds) deferOnceInner(key any, action func()) { + panicif.False(me.allowDefers) g.MakeMapIfNil(&me.uniqueActions) if g.MapContains(me.uniqueActions, key) { return diff --git a/deferrwl_test.go b/deferrwl_test.go index 8d08ede65220f1c67f0bbbc9318cf0f5e54238dc..55297384e1d0c42c0c89603748bc9f397251ab17 100644 --- a/deferrwl_test.go +++ b/deferrwl_test.go @@ -9,6 +9,7 @@ func TestUniqueDeferOnce(t *testing.T) { var p1, p2 Piece var mu lockWithDeferreds + mu.Lock() mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange) mu.DeferUniqueUnaryFunc(&p1, p1.publishStateChange) qt.Assert(t, qt.HasLen(mu.unlockActions, 1)) diff --git a/env.go b/env.go new file mode 100644 index 0000000000000000000000000000000000000000..996e1a8fde750fe3bf2f05d5bdccce821e694e08 --- /dev/null +++ b/env.go @@ -0,0 +1,27 @@ +package torrent + +import ( + "os" + "strconv" + + "github.com/anacrolix/missinggo/v2/panicif" + "golang.org/x/exp/constraints" +) + +func initIntFromEnv[T constraints.Signed](key string, defaultValue T, bitSize int) T { + return strconvFromEnv(key, defaultValue, bitSize, strconv.ParseInt) +} + +func initUIntFromEnv[T constraints.Unsigned](key string, defaultValue T, bitSize int) T { + return strconvFromEnv(key, defaultValue, bitSize, strconv.ParseUint) +} + +func strconvFromEnv[T, U constraints.Integer](key string, defaultValue T, bitSize int, conv func(s string, base, bitSize int) (U, error)) T { + s := os.Getenv(key) + if s == "" { + return defaultValue + } + i64, err := conv(s, 10, bitSize) + panicif.Err(err) + return T(i64) +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..064301396cc856a424ad02b330c249720df4b484 --- /dev/null +++ b/errors.go @@ -0,0 +1,8 @@ +package torrent + +// I don't trust errors.New with allocations, and I know I can use unique.Handle if I get desperate. +type stringError string + +func (e stringError) Error() string { + return string(e) +} diff --git a/fmt.go b/fmt.go deleted file mode 100644 index 7249f507a5d6417e0c1a952707ec5a0206d13b05..0000000000000000000000000000000000000000 --- a/fmt.go +++ /dev/null @@ -1,14 +0,0 @@ -package torrent - -import ( - "fmt" - "strings" -) - -func formatMap[K comparable, V any](m map[K]V) string { - var sb strings.Builder - for k, v := range m { - fmt.Fprintf(&sb, "%v: %v\n", k, v) - } - return strings.TrimSuffix(sb.String(), "\n") -} diff --git a/fs/cmd/torrentfs/main.go b/fs/cmd/torrentfs/main.go index d35f5c2360bfb267df13fceea025ab9fbba8800f..5bc50adec29e1d5a1f8aa6e21d2e5027fdb8cf60 100644 --- a/fs/cmd/torrentfs/main.go +++ b/fs/cmd/torrentfs/main.go @@ -1,3 +1,5 @@ +//go:build !windows + // Mounts a FUSE filesystem backed by torrents and magnet links. package main diff --git a/fs/file_handle.go b/fs/file_handle.go index ce5ded0b488205f54251e8631fbb72b0a6c9f1e2..9a64bb57cd5c140d712201501a1083773b911189 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -1,3 +1,5 @@ +//go:build !windows + package torrentfs import ( diff --git a/fs/filenode.go b/fs/filenode.go index 28a433e18d086d29f97941ff507370d8afa37d69..7054ee7b8fd003f536a69bc227b202541ce3a246 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,3 +1,5 @@ +//go:build !windows + package torrentfs import ( diff --git a/fs/stream-sintel_test.go b/fs/stream-sintel_test.go index e78ec8a347b96d4ab0593f4992237fa8df729df5..133215234ce1de7a4ded8463590e9d87519211b7 100644 --- a/fs/stream-sintel_test.go +++ b/fs/stream-sintel_test.go @@ -1,3 +1,5 @@ +//go:build !windows + package torrentfs_test import ( @@ -9,6 +11,7 @@ "fmt" "io" "io/fs" "os" + "os/signal" "path/filepath" "testing" "time" @@ -44,6 +47,9 @@ return to.Close() } func TestStreamSintelMagnet(t *testing.T) { + ctx := t.Context() + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) + defer cancel() fileHashes := map[string]string{ "poster.jpg": "f9223791908131c505d7bdafa7a8aaf5", "Sintel.mp4": "083e808d56aa7b146f513b3458658292", @@ -124,11 +130,13 @@ }, } h := md5.New() go func() { - <-t.Context().Done() - t.Log("testing context done") + <-ctx.Done() f.Close() }() _, err = f.WriteTo(io.MultiWriter(h, &w)) + if ctx.Err() != nil { + t.Fatal(ctx.Err()) + } panicif.Err(err) err = f.Close() panicif.Err(err) diff --git a/fs/torrentfs.go b/fs/torrentfs.go index f011d19ba8f5139715c89dadcaaabb4a1986f858..1003c010255d5d8b7c93ca52f259d49771ac2c8c 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -1,3 +1,5 @@ +//go:build !windows + package torrentfs import ( diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 80bd02e7e08ba00fc1378cf0b81fc513c2e727a3..3630d6c6ef87dc777457e721c25df3fdf73fef95 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -1,3 +1,5 @@ +//go:build !windows + package torrentfs import ( diff --git a/go.mod b/go.mod index 63395488f99e99b9045a1f9152d01bda492957b3..22fdf1bbd88e72b2ebf2715a52c99d35fe96cf71 100644 --- a/go.mod +++ b/go.mod @@ -7,16 +7,17 @@ github.com/RoaringBitmap/roaring v1.2.3 github.com/ajwerner/btree v0.0.0-20211221152037-f427b3e689c0 github.com/alexflint/go-arg v1.4.3 github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d - github.com/anacrolix/chansync v0.6.0 + github.com/anacrolix/bargle/v2 v2.0.0 + github.com/anacrolix/chansync v0.6.1-0.20250805140455-89f141559964 github.com/anacrolix/dht/v2 v2.22.2-0.20250623060212-d7b7d8a52b01 github.com/anacrolix/envpprof v1.3.0 github.com/anacrolix/fuse v0.3.2-0.20250603105216-aeb550c91d7a - github.com/anacrolix/generics v0.0.3-0.20250526144502-593be7092deb + github.com/anacrolix/generics v0.0.4-0.20250708073025-68393b391647 github.com/anacrolix/go-libutp v1.3.2 - github.com/anacrolix/gostdapp v0.1.0 + github.com/anacrolix/gostdapp v0.2.0 github.com/anacrolix/log v0.16.1-0.20250526073428-5cb74e15092b github.com/anacrolix/missinggo v1.3.0 - github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 + github.com/anacrolix/missinggo/v2 v2.10.0 github.com/anacrolix/multiless v0.4.0 github.com/anacrolix/possum/go v0.3.2 github.com/anacrolix/squirrel v0.6.4 @@ -64,7 +65,7 @@ github.com/alexflint/go-scalar v1.1.0 // indirect github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e // indirect github.com/anacrolix/missinggo/perf v1.0.0 // indirect github.com/anacrolix/mmsg v1.0.1 // indirect - github.com/anacrolix/stm v0.4.1-0.20221221005312-96d17df0e496 // indirect + github.com/anacrolix/stm v0.5.0 // indirect github.com/benbjohnson/immutable v0.4.1-0.20221220213129-8932b999621d // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.2 // indirect diff --git a/go.sum b/go.sum index d2b2d57fce23d835eedb929790e647ed5ac8ac34..d67f8b9acc6aef2d3140d1605537fc6a9c6e9f9b 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e h1:A0Ty9UeyBDIo29ZMnk0AvPqWDIa4HVvCaJqWNlCrMXA= github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e/go.mod h1:4YFqy+788tLJWtin2jNliYVJi+8aDejG9zcu/2/pONw= github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d h1:ypNOsIwvdumNRlqWj/hsnLs5TyQWQOylwi+T9Qs454A= github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ= -github.com/anacrolix/chansync v0.6.0 h1:/aQVvZ1yLRhmqEYrr9dC92JwzNBQ/SNnFi4uk+fTkQY= -github.com/anacrolix/chansync v0.6.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= +github.com/anacrolix/chansync v0.6.1-0.20250805140455-89f141559964 h1:VC5O4NsAg9An6Eda9aHwtjDNFtvf9yMBcV3Di3LijbM= +github.com/anacrolix/chansync v0.6.1-0.20250805140455-89f141559964/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/dht/v2 v2.22.2-0.20250623060212-d7b7d8a52b01 h1:guAizoaLxE4K4nHysq5GuLJAZoHs1FJI4Dr0kKqFdz0= github.com/anacrolix/dht/v2 v2.22.2-0.20250623060212-d7b7d8a52b01/go.mod h1:seXRz6HLw8zEnxlysf9ye2eQbrKUmch6PyOHpe/Nb/U= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= @@ -80,12 +80,12 @@ github.com/anacrolix/envpprof v1.3.0/go.mod h1:7QIG4CaX1uexQ3tqd5+BRa/9e2D02Wcertl6Yh0jCB0= github.com/anacrolix/fuse v0.3.2-0.20250603105216-aeb550c91d7a h1:bP+SxvpLAWXgpRvDKmB+d8n4XEWYHH5czGlcZ5Kw66Y= github.com/anacrolix/fuse v0.3.2-0.20250603105216-aeb550c91d7a/go.mod h1:vN3X/6E+uHNjg5F8Oy9FD9I+pYxeDWeB8mNjIoxL5ds= github.com/anacrolix/generics v0.0.0-20230113004304-d6428d516633/go.mod h1:ff2rHB/joTV03aMSSn/AZNnaIpUw0h3njetGsaXcMy8= -github.com/anacrolix/generics v0.0.3-0.20250526144502-593be7092deb h1:0GzqbT+KzmrpXsqEp6O3t6qfydTQuqvgo3nTJEC1EGA= -github.com/anacrolix/generics v0.0.3-0.20250526144502-593be7092deb/go.mod h1:MN3ve08Z3zSV/rTuX/ouI4lNdlfTxgdafQJiLzyNRB8= +github.com/anacrolix/generics v0.0.4-0.20250708073025-68393b391647 h1:dDTY2j+pjY0EnF0TIuAxees1FeFpnFVE2dr7BxfWe24= +github.com/anacrolix/generics v0.0.4-0.20250708073025-68393b391647/go.mod h1:MN3ve08Z3zSV/rTuX/ouI4lNdlfTxgdafQJiLzyNRB8= github.com/anacrolix/go-libutp v1.3.2 h1:WswiaxTIogchbkzNgGHuHRfbrYLpv4o290mlvcx+++M= github.com/anacrolix/go-libutp v1.3.2/go.mod h1:fCUiEnXJSe3jsPG554A200Qv+45ZzIIyGEvE56SHmyA= -github.com/anacrolix/gostdapp v0.1.0 h1:sZC+gSLhA7Hdalak5rPCkhO0YSEl0tt/lsovxh6qka4= -github.com/anacrolix/gostdapp v0.1.0/go.mod h1:2pstbgWcpBCY3rFUldM0NbDCrP86vWsh61wj8yY517E= +github.com/anacrolix/gostdapp v0.2.0 h1:UNuF8oKKFIa2tzcXLTiVStCYSUfRith81EskV05gIfk= +github.com/anacrolix/gostdapp v0.2.0/go.mod h1:2pstbgWcpBCY3rFUldM0NbDCrP86vWsh61wj8yY517E= github.com/anacrolix/log v0.3.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.13.1/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= @@ -105,8 +105,8 @@ github.com/anacrolix/missinggo/perf v1.0.0 h1:7ZOGYziGEBytW49+KmYGTaNfnwUqP1HBsy6BqESAJVw= github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ= github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbUBVQQW4QeRkNCGY= github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA= -github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 h1:+VSnod9Zipey/E5mDTrhooV9y8A8ZaUHSzG/TnrIHug= -github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0= +github.com/anacrolix/missinggo/v2 v2.10.0 h1:pg0iO4Z/UhP2MAnmGcaMtp5ZP9kyWsusENWN9aolrkY= +github.com/anacrolix/missinggo/v2 v2.10.0/go.mod h1:nCRMW6bRCMOVcw5z9BnSYKF+kDbtenx+hQuphf4bK8Y= github.com/anacrolix/mmsg v1.0.1 h1:TxfpV7kX70m3f/O7ielL/2I3OFkMPjrRCPo7+4X5AWw= github.com/anacrolix/mmsg v1.0.1/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc= github.com/anacrolix/multiless v0.4.0 h1:lqSszHkliMsZd2hsyrDvHOw4AbYWa+ijQ66LzbjqWjM= @@ -114,8 +114,8 @@ github.com/anacrolix/multiless v0.4.0/go.mod h1:zJv1JF9AqdZiHwxqPgjuOZDGWER6nyE48WBCi/OOrMM= github.com/anacrolix/squirrel v0.6.4 h1:K6ABRMCms0xwpEIdY3kAaDBUqiUeUYCKLKI0yHTr9IQ= github.com/anacrolix/squirrel v0.6.4/go.mod h1:0kFVjOLMOKVOet6ja2ac1vTOrqVbLj2zy2Fjp7+dkE8= github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= -github.com/anacrolix/stm v0.4.1-0.20221221005312-96d17df0e496 h1:aMiRi2kOOd+nG64suAmFMVnNK2E6GsnLif7ia9tI3cA= -github.com/anacrolix/stm v0.4.1-0.20221221005312-96d17df0e496/go.mod h1:DBm8/1OXm4A4RZ6Xa9u/eOsjeAXCaoRYvd2JzlskXeM= +github.com/anacrolix/stm v0.5.0 h1:9df1KBpttF0TzLgDq51Z+TEabZKMythqgx89f1FQJt8= +github.com/anacrolix/stm v0.5.0/go.mod h1:MOwrSy+jCm8Y7HYfMAwPj7qWVu7XoVvjOiYwJmpeB/M= github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk= github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= github.com/anacrolix/sync v0.5.4 h1:yXZLIjXh/G+Rh2mYGCAPmszmF/fvEPadDy7/pPChpKM= diff --git a/internal/request-strategy/ajwerner-btree.go b/internal/request-strategy/ajwerner-btree.go index 73ace30370253d7e3ebc562a40fbd034c3fed54d..81e2d6f3d8c3e40927f61d2503cd84c8483268f7 100644 --- a/internal/request-strategy/ajwerner-btree.go +++ b/internal/request-strategy/ajwerner-btree.go @@ -8,6 +8,11 @@ type ajwernerBtree struct { btree btree.Set[PieceRequestOrderItem] } +func (a *ajwernerBtree) Contains(item PieceRequestOrderItem) bool { + _, ok := a.btree.Get(item) + return ok +} + var _ Btree = (*ajwernerBtree)(nil) func NewAjwernerBtree() *ajwernerBtree { diff --git a/internal/request-strategy/order.go b/internal/request-strategy/order.go index d3ede920ddf6a155a49202253dc1901c73341aa0..e62398bd7f97c12b8374ec94166b9cf8b1e6c06d 100644 --- a/internal/request-strategy/order.go +++ b/internal/request-strategy/order.go @@ -13,7 +13,6 @@ type ( RequestIndex uint32 ChunkIndex = RequestIndex Request = types.Request - pieceIndex = types.PieceIndex piecePriority = types.PiecePriority // This can be made into a type-param later, will be great for testing. ChunkSpec = types.ChunkSpec @@ -25,7 +24,7 @@ // per-client? func pieceOrderLess(i, j *PieceRequestOrderItem) multiless.Computation { return multiless.New().Int( int(j.State.Priority), int(i.State.Priority), - // TODO: Should we match on complete here to prevent churn when availability changes? + // TODO: Should we match on complete here to prevent churn when availability changes? (Answer: Yes). ).Bool( j.State.Partial, i.State.Partial, ).Int( @@ -36,24 +35,22 @@ i.State.Availability, j.State.Availability, ).Int( i.Key.Index, j.Key.Index, ).Lazy(func() multiless.Computation { - return multiless.New().Cmp(bytes.Compare( - i.Key.InfoHash[:], - j.Key.InfoHash[:], - )) + a := i.Key.InfoHash.Value() + b := j.Key.InfoHash.Value() + return multiless.New().Cmp(bytes.Compare(a[:], b[:])) }) } -// Returns true if the piece should be considered against the unverified bytes limit. This is -// based on whether the callee intends to request from the piece. Pieces submitted to this -// callback passed Piece.Request and so are ready for immediate download. +// This did return true if the piece should be considered against the unverified bytes limit. But +// that would cause thrashing on completion: The order should be stable. This is now a 3-tuple +// iterator. type RequestPieceFunc func(ih metainfo.Hash, pieceIndex int, orderState PieceRequestOrderState) bool // Calls f with requestable pieces in order. func GetRequestablePieces( input Input, pro *PieceRequestOrder, - // Returns true if the piece should be considered against the unverified bytes limit. This is - // based on whether the callee intends to request from the piece. Pieces submitted to this - // callback passed Piece.Request and so are ready for immediate download. + // Pieces submitted to this callback passed Piece.Request and so are ready for immediate + // download. requestPiece RequestPieceFunc, ) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage @@ -67,9 +64,9 @@ allTorrentsUnverifiedBytes int64 maxUnverifiedBytes = input.MaxUnverifiedBytes() ) pro.tree.Scan(func(item PieceRequestOrderItem) bool { - ih := item.Key.InfoHash - var t = input.Torrent(ih) - var piece = t.Piece(item.Key.Index) + ih := item.Key.InfoHash.Value() + t := input.Torrent(ih) + piece := t.Piece(item.Key.Index) pieceLength := t.PieceLength() // Storage limits will always apply against requestable pieces, since we need to keep the // highest priority pieces, even if they're complete or in an undesirable state. @@ -83,7 +80,7 @@ if piece.Request() { if !requestPiece(ih, item.Key.Index, item.State) { // No blocks are being considered from this piece, so it won't result in unverified // bytes. - return true + return false } } else if !piece.CountUnverified() { // The piece is pristine, and we're not considering it for requests. diff --git a/internal/request-strategy/piece-request-order.go b/internal/request-strategy/piece-request-order.go index 3e572aedd25ac6fe961b1f3c7bb2819d6ab8e29b..ccd647fe3fb0b2d7aebb44a8c4d2d074fd8d5eda 100644 --- a/internal/request-strategy/piece-request-order.go +++ b/internal/request-strategy/piece-request-order.go @@ -2,8 +2,10 @@ package requestStrategy import ( "iter" + "unique" g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/metainfo" ) @@ -11,7 +13,9 @@ type Btree interface { Delete(PieceRequestOrderItem) Add(PieceRequestOrderItem) + // TODO: Add an iterator variant of this and benchmark. Scan(func(PieceRequestOrderItem) bool) + Contains(PieceRequestOrderItem) bool } func NewPieceOrder(btree Btree, cap int) *PieceRequestOrder { @@ -27,8 +31,8 @@ keys map[PieceRequestOrderKey]PieceRequestOrderState } type PieceRequestOrderKey struct { + InfoHash unique.Handle[metainfo.Hash] Index int - InfoHash metainfo.Hash } type PieceRequestOrderState struct { @@ -94,3 +98,9 @@ return yield(item) }) } } + +func (me *PieceRequestOrder) Get(key PieceRequestOrderKey) (ret g.Option[PieceRequestOrderState]) { + ret.Value, ret.Ok = me.keys[key] + panicif.NotEq(ret.Ok, me.tree.Contains(PieceRequestOrderItem{key, ret.Value})) + return +} diff --git a/internal/request-strategy/tidwall-btree.go b/internal/request-strategy/tidwall-btree.go index 88bbafd2a2dfdad42ad7be4935def1aa88d469bc..b3efe1c0d74b1969994deffd05d6b8a784784e74 100644 --- a/internal/request-strategy/tidwall-btree.go +++ b/internal/request-strategy/tidwall-btree.go @@ -35,3 +35,8 @@ func (me *tidwallBtree) Delete(item PieceRequestOrderItem) { _, deleted := me.tree.DeleteHint(item, me.PathHint) mustValue(deleted, item) } + +func (me *tidwallBtree) Contains(item PieceRequestOrderItem) bool { + _, ok := me.tree.Get(item) + return ok +} diff --git a/issue97_test.go b/issue97_test.go index 8453fad1cdc48efa7cf6a872c88d2cf9f4e6fb2b..b933784638c9df8b9f3a7d9bc75b7dae1eea9cd9 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -17,9 +17,7 @@ cs := storage.NewFile(td) defer cs.Close() tt := cl.newTorrent(metainfo.Hash{1}, cs) mi := testutil.GreetingMetaInfo() - info, err := mi.UnmarshalInfo() - require.NoError(t, err) - require.NoError(t, tt.setInfo(&info)) + require.NoError(t, tt.SetInfoBytes(mi.InfoBytes)) + go tt.piece(0).VerifyDataContext(t.Context()) require.NoError(t, tt.storage.Close()) - tt.hashPiece(0) } diff --git a/justfile b/justfile index 9a52ffbaedb67892ff9b91e183b19f3a2ea62e51..92e52bbdfb5e8ab613e2b9453acaa7cfa3842d9c 100644 --- a/justfile +++ b/justfile @@ -4,8 +4,8 @@ act: act -j test --matrix go-version:'1.24' --env-file .empty.env -test: build-possum - go test -race ./... +test *args: build-possum + go test -race -failfast {{ args }} ./... build-possum: cd storage/possum/lib && cargo build diff --git a/math.go b/math.go index 0aefb4f7b64c682bfd86df2e152a2d149db2549a..0594fd29bae897c299b169721bfbacdf882e35d5 100644 --- a/math.go +++ b/math.go @@ -4,8 +4,13 @@ import ( "golang.org/x/exp/constraints" ) +// a/b rounding up func intCeilDiv[T constraints.Unsigned](a, b T) T { // This still sux for negative numbers due to truncating division. But I don't know that we need // or that ceil division makes sense for negative numbers. return (a + b - 1) / b } + +func nextMultiple[T constraints.Integer](x, multiple T) T { + return x + multiple - x%multiple +} diff --git a/metainfo/info.go b/metainfo/info.go index 6fc46dbb2b39543edd4638d90373fef2c7626aa4..d092d51ff89733675e01c005d7371d6eb9bb180b 100644 --- a/metainfo/info.go +++ b/metainfo/info.go @@ -190,8 +190,6 @@ if !yield(fi) { return } } - return - } } diff --git a/misc.go b/misc.go index ba4597ed7cdc86e4be713431917042bfbbb3d45b..b0016781e1623af71331740e7c0e194bdc662635 100644 --- a/misc.go +++ b/misc.go @@ -6,6 +6,7 @@ "net" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/panicif" "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" @@ -105,11 +106,12 @@ return nil } func chunkIndexSpec(index, pieceLength, chunkSize pp.Integer) ChunkSpec { - ret := ChunkSpec{index * chunkSize, chunkSize} - if ret.Begin+ret.Length > pieceLength { - ret.Length = pieceLength - ret.Begin + begin := index * chunkSize + panicif.GreaterThanOrEqual(begin, pieceLength) + return ChunkSpec{ + Begin: begin, + Length: min(chunkSize, pieceLength-begin), } - return ret } func comparePeerTrust(l, r *Peer) int { diff --git a/misc_test.go b/misc_test.go index d8c0c7aab2850b0abc14c49e2f00102ac1397d16..d81a1f26d8d39756707d2b285096bb0adc74abe7 100644 --- a/misc_test.go +++ b/misc_test.go @@ -5,8 +5,6 @@ "reflect" "strings" "testing" - "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/missinggo/v2/bitmap" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" ) @@ -21,22 +19,6 @@ check(13, 5, 0, newRequest(0, 0, 5), true) check(13, 5, 3, newRequest(0, 0, 5), true) check(13, 5, 11, newRequest(2, 0, 3), true) check(13, 5, 13, Request{}, false) -} - -func BenchmarkIterBitmapsDistinct(t *testing.B) { - t.ReportAllocs() - for i := 0; i < t.N; i += 1 { - var skip, first, second bitmap.Bitmap - skip.Add(1) - first.Add(1, 0, 3) - second.Add(1, 2, 0) - skipCopy := skip.Copy() - t.StartTimer() - output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)) - t.StopTimer() - assert.Equal(t, []interface{}{0, 3, 2}, output) - assert.Equal(t, []bitmap.BitIndex{1}, skip.ToSortedSlice()) - } } func TestSpewConnStats(t *testing.T) { diff --git a/mmap-span/mmap-span.go b/mmap-span/mmap-span.go index 55031040b7f863aeddda843bb323bb19ebf51803..1617d86477318af850cd35eb6a7126d5ba80d15b 100644 --- a/mmap-span/mmap-span.go +++ b/mmap-span/mmap-span.go @@ -2,10 +2,11 @@ package mmapSpan import ( "errors" - "fmt" "io" "io/fs" "sync" + + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/segments" ) @@ -30,14 +31,11 @@ segmentLocater: index, } } -func (ms *MMapSpan) Flush() (errs []error) { +func (ms *MMapSpan) Flush() (err error) { ms.mu.RLock() defer ms.mu.RUnlock() for _, mMap := range ms.mMaps { - err := mMap.Flush() - if err != nil { - errs = append(errs, err) - } + err = errors.Join(err, mMap.Flush()) } return } @@ -78,21 +76,14 @@ copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64, ) (n int) { - ms.segmentLocater.Locate( - segments.Extent{off, int64(len(p))}, - func(i int, e segments.Extent) bool { - mMapBytes := ms.mMaps[i].Bytes()[e.Start:] - // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes)) - _n := copyBytes(copyArgs(p, mMapBytes)) - p = p[_n:] - n += _n - - if segments.Int(_n) != e.Length { - panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length)) - } - return true - }, - ) + for i, e := range ms.segmentLocater.LocateIter(segments.Extent{off, int64(len(p))}) { + mMapBytes := ms.mMaps[i].Bytes()[e.Start:] + // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes)) + _n := copyBytes(copyArgs(p, mMapBytes)) + p = p[_n:] + n += _n + panicif.NotEq(segments.Int(_n), e.Length) + } return } diff --git a/peer-impl.go b/peer-impl.go index aa0fe9feae101d5a1e9b459804d9342598eabfcc..25fe5a72b9591e5ca09938146ef01c9b352ac277 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -31,7 +31,7 @@ onGotInfo(info *metainfo.Info) // Drop connection. This may be a no-op if there is no connection. drop() // Rebuke the peer - ban() + providedBadData() String() string // Per peer-impl lines for WriteStatus. peerImplStatusLines() []string @@ -53,4 +53,5 @@ checkReceivedChunk(ri RequestIndex, msg *pp.Message, req Request) (intended bool, err error) // Whether we're expecting to receive chunks because we have outstanding requests. Used for // example to calculate download rate. expectingChunks() bool + allConnStatsImplField(*AllConnStats) *ConnStats } diff --git a/peer.go b/peer.go index 6bcddd495b06e8914268e324009a9c465c5905c8..251be54a1b0556ea0802482583b542e21aae171e 100644 --- a/peer.go +++ b/peer.go @@ -1,8 +1,10 @@ package torrent import ( + "context" "fmt" "io" + "iter" "log/slog" "net" "strings" @@ -13,8 +15,8 @@ "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" . "github.com/anacrolix/generics" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/internal/alloclim" @@ -29,7 +31,8 @@ Peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats - t *Torrent + cl *Client + t *Torrent legacyPeerImpl peerImpl newHotPeerImpl @@ -39,6 +42,8 @@ RemoteAddr PeerRemoteAddr Discovery PeerSource trusted bool closed chansync.SetOnce + closedCtx context.Context + closedCtxCancel context.CancelFunc lastUsefulChunkReceived time.Time lastStartedExpectingToReceiveChunks time.Time @@ -60,9 +65,6 @@ bannableAddr Option[bannableAddr] // True if the connection is operating over MSE obfuscation. headerEncrypted bool cryptoMethod mse.CryptoMethod - // Set true after we've added our ConnStats generated during handshake to - // other ConnStat instances as determined when the *Torrent became known. - reconciledHandshakeStats bool lastMessageReceived time.Time completedHandshake time.Time @@ -223,6 +225,10 @@ func (p *Peer) close() { if !p.closed.Set() { return } + // Not set until Torrent is known. + if p.closedCtx != nil { + p.closedCtxCancel() + } if p.updateRequestsTimer != nil { p.updateRequestsTimer.Stop() } @@ -281,49 +287,31 @@ // The function takes a message to be sent, and returns true if more messages // are okay. type messageWriter func(pp.Message) bool -// Emits the indices in the Bitmaps bms in order, never repeating any index. -// skip is mutated during execution, and its initial values will never be -// emitted. -func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { - return func(cb iter.Callback) { - for _, bm := range bms { - if !iter.All( - func(_i interface{}) bool { - i := _i.(int) - if skip.Contains(bitmap.BitIndex(i)) { - return true - } - skip.Add(bitmap.BitIndex(i)) - return cb(i) - }, - bm.Iter, - ) { - return - } - } +// All ConnStats that include this connection. Some objects are not known until the handshake is +// complete, after which it's expected to reconcile the differences. +func (cn *Peer) modifyRelevantConnStats(f func(*ConnStats)) { + // Every peer has basic ConnStats for now. + f(&cn._stats) + incAll := func(stats *ConnStats) bool { + f(stats) + return true } + cn.upstreamConnStats()(incAll) } -// After handshake, we know what Torrent and Client stats to include for a -// connection. -func (cn *Peer) postHandshakeStats(f func(*ConnStats)) { - t := cn.t - f(&t.connStats) - f(&t.cl.connStats) -} - -// All ConnStats that include this connection. Some objects are not known -// until the handshake is complete, after which it's expected to reconcile the -// differences. -func (cn *Peer) allStats(f func(*ConnStats)) { - f(&cn._stats) - if cn.reconciledHandshakeStats { - cn.postHandshakeStats(f) +// Yields relevant upstream ConnStats. Skips Torrent if it isn't set. +func (cn *Peer) upstreamConnStats() iter.Seq[*ConnStats] { + return func(yield func(*ConnStats) bool) { + // PeerConn can be nil when it hasn't completed handshake. + if cn.t != nil { + cn.relevantConnStats(&cn.t.connStats)(yield) + } + cn.relevantConnStats(&cn.cl.connStats)(yield) } } func (cn *Peer) readBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) + cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } func (c *Peer) lastHelpful() (ret time.Time) { @@ -355,7 +343,7 @@ } } func (c *Peer) doChunkReadStats(size int64) { - c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) }) + c.modifyRelevantConnStats(func(cs *ConnStats) { cs.receivedChunk(size) }) } // Handle a received chunk from a peer. TODO: Break this out into non-wire protocol specific @@ -395,16 +383,16 @@ // Do we actually want this chunk? if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) ChunksReceived.Add("redundant", 1) - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } piece := t.piece(ppReq.Index.Int()) - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) + c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if intended { - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) + c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { f(ReceivedUsefulDataEvent{c, msg}) @@ -540,10 +528,6 @@ } return ret } -func (cn *Peer) stats() *ConnStats { - return &cn._stats -} - func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { pc, ok := p.legacyPeerImpl.(*PeerConn) return pc, ok @@ -566,3 +550,16 @@ if p.bannableAddr.Ok { p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData) } } + +func (p *Peer) initClosedCtx() { + panicif.NotNil(p.closedCtx) + p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx) +} + +// Iterates base and peer-impl specific ConnStats from all. +func (p *Peer) relevantConnStats(all *AllConnStats) iter.Seq[*ConnStats] { + return func(yield func(*ConnStats) bool) { + yield(&all.ConnStats) + yield(p.peerImpl.allConnStatsImplField(all)) + } +} diff --git a/peerconn.go b/peerconn.go index a8d466ea970e04eca717f41ae69fb1853b6fefdb..573ae30ed47564698a278e29c4b605882b29dc05 100644 --- a/peerconn.go +++ b/peerconn.go @@ -3,7 +3,6 @@ import ( "bufio" "bytes" - "context" "errors" "fmt" "io" @@ -15,12 +14,14 @@ "strconv" "strings" "sync/atomic" "time" + "weak" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/generics" . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/multiless" "golang.org/x/time/rate" @@ -98,7 +99,6 @@ // The peer has everything. This can occur due to a special message, when // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool - // TODO: How are pending cancels handled for webseed peers? requestState requestStrategy.PeerRequestState peerRequestDataAllocLimiter alloclim.Limiter @@ -112,6 +112,15 @@ // Hash pieces received from the peer, mapped from pieces root to piece layer hashes. This way // we can verify all the pieces for a file when they're all arrived before submitting them to // the torrent. receivedHashPieces map[[32]byte][][32]byte + + peerRequestServerRunning bool + // Set true after we've added our ConnStats generated during handshake to other ConnStat + // instances as determined when the *Torrent became known. + reconciledHandshakeStats bool +} + +func (*PeerConn) allConnStatsImplField(stats *AllConnStats) *ConnStats { + return &stats.PeerConns } func (cn *PeerConn) lastWriteUploadRate() float64 { @@ -582,11 +591,11 @@ } torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1) } } - cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) + cn.modifyRelevantConnStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) } func (cn *PeerConn) wroteBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) + cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) } func (c *PeerConn) fastEnabled() bool { @@ -670,17 +679,41 @@ allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)), } c.peerRequests[r] = value if startFetch { - // TODO: Limit peer request data read concurrency. - go c.peerRequestDataReader(r, value) + c.startPeerRequestServer() } return nil } -func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { +func (c *PeerConn) startPeerRequestServer() { + if !c.peerRequestServerRunning { + go c.peerRequestServer() + c.peerRequestServerRunning = true + } +} + +func (c *PeerConn) peerRequestServer() { +again: + c.locker().Lock() + if !c.closed.IsSet() { + for r, state := range c.peerRequests { + if state.data == nil { + c.locker().Unlock() + c.servePeerRequest(r, state) + goto again + } + } + } + panicif.False(c.peerRequestServerRunning) + c.peerRequestServerRunning = false + c.locker().Unlock() +} + +// TODO: Return an error then let caller filter on conditions. +func (c *PeerConn) servePeerRequest(r Request, prs *peerRequestState) { // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, // or fail to read and then cleanup. Also, we used to hang here if the reservation was never // dropped, that was fixed. - ctx := context.Background() + ctx := c.closedCtx err := prs.allocReservation.Wait(ctx) if err != nil { c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err) @@ -689,6 +722,10 @@ } b, err := c.readPeerRequestData(r) c.locker().Lock() defer c.locker().Unlock() + // This function should remove work from peerRequests so peerRequestServer does not stall. + defer func() { + panicif.True(MapContains(c.peerRequests, r) && prs.data == nil) + }() if err != nil { c.peerRequestDataReadFailed(err, r) } else { @@ -1128,7 +1165,7 @@ func (cn *PeerConn) drop() { cn.t.dropConnection(cn) } -func (cn *PeerConn) ban() { +func (cn *PeerConn) providedBadData() { cn.t.cl.banPeerIP(cn.remoteIp()) } @@ -1150,13 +1187,12 @@ }) } func (c *PeerConn) setTorrent(t *Torrent) { - if c.t != nil { - panic("connection already associated with a torrent") - } + panicif.NotNil(c.t) c.t = t + c.initClosedCtx() c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t) c.setPeerLoggers(t.logger, t.slogger()) - t.reconcileHandshakeStats(c.peerPtr()) + c.reconcileHandshakeStats() } func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { @@ -1686,7 +1722,7 @@ cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ cn.t.requestState[r] = requestState{ - peer: cn, + peer: weak.Make(cn), when: time.Now(), } cn.updateExpectingChunks() @@ -1840,3 +1876,23 @@ } return } + +// Reconcile bytes transferred before connection was associated with a torrent. +func (c *PeerConn) reconcileHandshakeStats() { + panicif.True(c.reconciledHandshakeStats) + if c._stats != (ConnStats{ + // Handshakes should only increment these fields: + BytesWritten: c._stats.BytesWritten, + BytesRead: c._stats.BytesRead, + }) { + panic("bad stats") + } + // Add the stat data so far to relevant Torrent stats that were skipped before the handshake + // completed. + c.relevantConnStats(&c.t.connStats)(func(cs *ConnStats) bool { + cs.BytesRead.Add(c._stats.BytesRead.Int64()) + cs.BytesWritten.Add(c._stats.BytesWritten.Int64()) + return true + }) + c.reconciledHandshakeStats = true +} diff --git a/peerconn_test.go b/peerconn_test.go index 396948538842230508e3be5baaadd989c6ae6b58..4a4e7740f72c99d7a3acf9545ce3a65646f9c577 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -11,7 +11,7 @@ "sync" "testing" g "github.com/anacrolix/generics" - qt "github.com/go-quicktest/qt" + "github.com/go-quicktest/qt" "github.com/stretchr/testify/require" "golang.org/x/time/rate" @@ -28,7 +28,9 @@ cl.init(TestingConfig(t)) cl.initLogger() c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrentForTesting()) + cl.lock() err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}) + cl.unlock() qt.Assert(t, qt.IsNil(err)) r, w := io.Pipe() // c.r = r @@ -90,9 +92,7 @@ } func BenchmarkConnectionMainReadLoop(b *testing.B) { var cl Client - cl.init(&ClientConfig{ - DownloadRateLimiter: unlimited, - }) + cl.init(&ClientConfig{}) cl.initLogger() ts := &torrentStorage{} t := cl.newTorrentForTesting() @@ -267,23 +267,25 @@ func TestHaveAllThenBitfield(t *testing.T) { cl := newTestingClient(t) tt := cl.newTorrentForTesting() - // cl.newConnection() + //pc := cl.newConnection(nil, newConnectionOpts{}) pc := PeerConn{ Peer: Peer{t: tt}, } pc.initRequestState() pc.legacyPeerImpl = &pc tt.conns[&pc] = struct{}{} + g.InitNew(&pc.callbacks) qt.Assert(t, qt.IsNil(pc.onPeerSentHaveAll())) qt.Check(t, qt.DeepEquals(pc.t.connsWithAllPieces, map[*Peer]struct{}{&pc.Peer: {}})) pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) qt.Check(t, qt.Equals(pc.peerMinPieces, 6)) qt.Check(t, qt.HasLen(pc.t.connsWithAllPieces, 0)) - qt.Assert(t, qt.IsNil(pc.t.setInfo(&metainfo.Info{ - PieceLength: 0, + qt.Assert(t, qt.IsNil(pc.t.setInfoUnlocked(&metainfo.Info{ + Name: "herp", + Length: 7, + PieceLength: 1, Pieces: make([]byte, pieceHash.Size()*7), }))) - pc.t.onSetInfo() qt.Check(t, qt.Equals(tt.numPieces(), 7)) qt.Check(t, qt.DeepEquals(tt.pieceAvailabilityRuns(), []pieceAvailabilityRun{ // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 diff --git a/piece.go b/piece.go index fc17714b1b05c5a9e7e606ed6b32a8807d4eba48..1a039750f5095bb276966856686e6f2546c610de 100644 --- a/piece.go +++ b/piece.go @@ -7,6 +7,7 @@ "fmt" "iter" "sync" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" @@ -61,7 +62,7 @@ // Currently being hashed. hashing bool // The piece state may have changed, and is being synchronized with storage. marking bool - // The Completion.Ok field from the storage layer. + // The Completion.Ok field cached from the storage layer. storageCompletionOk bool } @@ -85,10 +86,8 @@ } return p.t.storage.PieceWithHash(p.Info(), pieceHash) } -func (p *Piece) Flush() { - if p.t.storage.Flush != nil { - _ = p.t.storage.Flush() - } +func (p *Piece) Flush() error { + return p.Storage().Flush() } func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool { @@ -107,7 +106,8 @@ func (p *Piece) numDirtyChunks() chunkIndexType { return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex]( &p.t.dirtyChunks, p.requestIndexBegin(), - p.requestIndexMaxEnd())) + p.t.pieceRequestIndexBegin(p.index+1), + )) } func (p *Piece) unpendChunkIndex(i chunkIndexType) { @@ -155,9 +155,9 @@ func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool { return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk) } -func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] { +func (p *Piece) iterCleanChunks(it *roaring.IntIterator) iter.Seq[chunkIndexType] { return func(yield func(chunkIndexType) bool) { - it := p.t.dirtyChunks.Iterator() + it.Initialize(&p.t.dirtyChunks.Bitmap) begin := uint32(p.requestIndexBegin()) end := uint32(p.requestIndexMaxEnd()) it.AdvanceIfNeeded(begin) @@ -173,7 +173,7 @@ } } func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) { - for some := range p.iterCleanChunks() { + for some := range p.iterCleanChunks(&p.t.cl.roaringIntIterator) { return g.Some(some) } return @@ -228,6 +228,9 @@ // external control of hashing concurrency. func (p *Piece) VerifyDataContext(ctx context.Context) error { locker := p.t.cl.locker() locker.Lock() + if p.t.closed.IsSet() { + return errTorrentClosed + } target, err := p.t.queuePieceCheck(p.index) locker.Unlock() if err != nil { @@ -292,7 +295,24 @@ return } func (p *Piece) ignoreForRequests() bool { - return p.hashing || p.marking || !p.haveHash() || p.t.pieceComplete(p.index) || p.queuedForHash() + // Ordered by cheapest checks and most likely to persist first. + + // There's a method that gets this with complete, but that requires a bitmap lookup. Defer that. + if !p.storageCompletionOk { + // Piece completion unknown. + return true + } + if p.hashing || p.marking || !p.haveHash() || p.t.dataDownloadDisallowed.IsSet() { + return true + } + // This is valid after we know that storage completion has been cached. + if p.t.pieceComplete(p.index) { + return true + } + if p.queuedForHash() { + return true + } + return false } // This is the priority adjusted for piece state like completion, hashing etc. @@ -313,6 +333,7 @@ defer p.t.cl.unlock() p.t.updatePieceCompletion(p.index) } +// TODO: Probably don't include Completion.Err? func (p *Piece) completion() (ret storage.Completion) { ret.Ok = p.storageCompletionOk if ret.Ok { @@ -337,10 +358,14 @@ // The maximum end request index for the piece. Some of the requests might not be valid, it's for // cleaning up arrays and bitmaps in broad strokes. func (p *Piece) requestIndexMaxEnd() RequestIndex { - return p.t.pieceRequestIndexBegin(p.index + 1) + new := min(p.t.pieceRequestIndexBegin(p.index+1), p.t.maxEndRequest()) + if false { + old := p.t.pieceRequestIndexBegin(p.index + 1) + panicif.NotEq(new, old) + } + return new } -// TODO: Make this peer-only? func (p *Piece) availability() int { return len(p.t.connsWithAllPieces) + p.relativeAvailability } diff --git a/rate.go b/rate.go index b711cc915c2f90dc0f5ba2c19d627a12da40acbc..d81eda20e92ac3498a5b7b6a473fb73efbfe3178 100644 --- a/rate.go +++ b/rate.go @@ -1,18 +1,32 @@ package torrent import ( + "math" + "golang.org/x/time/rate" ) -// 64 KiB used to be a rough default buffer for sockets on Windows. I'm sure it's bigger -// these days. What about the read buffer size mentioned elsewhere? Note this is also used for -// webseeding since that shares the download rate limiter by default. -const defaultDownloadRateLimiterBurst = 1 << 16 +// 64 KiB used to be a rough default buffer for sockets on Windows. I'm sure it's bigger these days. +// What about the read buffer size mentioned elsewhere? Note this is also used for webseeding since +// that shares the download rate limiter by default. 1 MiB is the default max read frame size for +// HTTP/2, +const defaultMinDownloadRateLimiterBurst = 1 << 20 // Sets rate limiter burst if it's set to zero which is used to request the default by our API. func setRateLimiterBurstIfZero(l *rate.Limiter, def int) { - if l.Burst() == 0 && l.Limit() != rate.Inf { + // Set it to something reasonable if the limit is Inf, in case the limit is dynamically adjusted + // and the user doesn't know what value to use. Assume the original limit is in a reasonable + // ballpark. + if l != nil && l.Burst() == 0 { // What if the limit is greater than what can be represented by int? l.SetBurst(def) } } + +// Sets rate limiter burst if it's set to zero which is used to request the default by our API. +func setDefaultDownloadRateLimiterBurstIfZero(l *rate.Limiter) { + setRateLimiterBurstIfZero(l, int( + max( + min(EffectiveDownloadRateLimit(l), math.MaxInt), + defaultMinDownloadRateLimiterBurst))) +} diff --git a/ratelimitreader.go b/ratelimitreader.go index 63a561762068b9ea6f46baaaba54b22aeb7ded8c..39df9326283f7d19c5a5843b494a1f17a2dc7666 100644 --- a/ratelimitreader.go +++ b/ratelimitreader.go @@ -8,12 +8,28 @@ "github.com/anacrolix/missinggo/v2/panicif" "golang.org/x/time/rate" ) +func newRateLimitedReader(r io.Reader, l *rate.Limiter) io.Reader { + if l == nil { + // Avoids taking Limiter lock to check limit, and allows type assertions to bypass Read. + return r + } + return rateLimitedReader{ + l: l, + r: r, + } +} + type rateLimitedReader struct { l *rate.Limiter r io.Reader } -func (me *rateLimitedReader) Read(b []byte) (n int, err error) { +func (me rateLimitedReader) Read(b []byte) (n int, err error) { + // Avoid truncating the read if everything is permitted anyway. + if me.l.Limit() == rate.Inf { + return me.r.Read(b) + } + // If the burst is zero, let the limiter method handle errors. if me.l.Burst() != 0 { b = b[:min(len(b), me.l.Burst())] } diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 3db71c49e2d8e367d4f376cd788544dbf9db793e..e67bb9f0e3c0d12c7999074adc172d11846ae879 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -32,7 +32,8 @@ } // I don't think we need this for correctness purposes, but it must be faster to look up the Torrent // input because it's locked to a given Torrent. It would be easy enough to drop in the -// multi-torrent version in this place and compare. +// multi-torrent version in this place and compare. TODO: With unique.Handle on infohash, this would +// not be necessary anymore. I don't think it's provided any performance benefit for some time now. type requestStrategyInputSingleTorrent struct { requestStrategyInputCommon t *Torrent diff --git a/request-strategy-impls_test.go b/request-strategy-impls_test.go index cd0f85eaad15b7729ada382e2af6233dcf98884b..e3b9ef36089361cea08d615847396aed525e0b59 100644 --- a/request-strategy-impls_test.go +++ b/request-strategy-impls_test.go @@ -112,7 +112,7 @@ peer.peerChoking = false //b.StopTimer() b.ResetTimer() //b.ReportAllocs() - for _ = range iter.N(b.N) { + for range b.N { storageClient.completed = 0 for pieceIndex := range iter.N(numPieces) { tor.updatePieceCompletion(pieceIndex) diff --git a/requesting.go b/requesting.go index c45a502b06d297958c919dfa782752d0033c2688..44327445959cec45ec99e1006fda64f90c826a11 100644 --- a/requesting.go +++ b/requesting.go @@ -1,6 +1,7 @@ package torrent import ( + "cmp" "context" "encoding/gob" "fmt" @@ -78,7 +79,8 @@ // A request index is a chunk indexed across the entire torrent. It's a single integer and can // be converted to a protocol request. TODO: This should be private. RequestIndex = requestStrategy.RequestIndex // This is request index but per-piece. - chunkIndexType = requestStrategy.ChunkIndex + chunkIndexType = requestStrategy.ChunkIndex + webseedSliceIndex RequestIndex ) type desiredPeerRequests struct { @@ -127,8 +129,8 @@ return ml.MustLess() } leftRequestState := t.requestState[leftRequest] rightRequestState := t.requestState[rightRequest] - leftPeer := leftRequestState.peer - rightPeer := rightRequestState.peer + leftPeer := leftRequestState.peer.Value() + rightPeer := rightRequestState.peer.Value() // Prefer chunks already requested from this peer. ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) // Prefer unrequested chunks. @@ -153,18 +155,32 @@ // in received order, so the most recently-requested is the one that has the longest until // it will be served and therefore is the best candidate to cancel. ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) } - ml = ml.Int( - leftPiece.Availability, - rightPiece.Availability) - if priority == PiecePriorityReadahead { - // TODO: For readahead in particular, it would be even better to consider distance from the - // reader position so that reads earlier in a torrent don't starve reads later in the - // torrent. This would probably require reconsideration of how readahead priority works. - ml = ml.Int(leftPieceIndex, rightPieceIndex) + // Just trigger on any webseed requests present on the Torrent. That suggests that the Torrent + // or files are prioritized enough to compete with PeerConn requests. Later we could filter on + // webseeds actually requesting or supporting requests for the pieces we're comparing. + if t.hasActiveWebseedRequests() { + // Prefer the highest possible request index, since webseeds prefer the lowest. Additionally, + // this should mean remote clients serve in reverse order so we meet webseeds responses in + // the middle. + ml = ml.Cmp(-cmp.Compare(leftRequest, rightRequest)) } else { - ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + ml = ml.Int( + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } + ml = multiless.EagerOrdered(ml, leftRequest, rightRequest) } - return ml.Less() + // Prefer request indexes in order for storage write performance. Since the heap request heap + // does not contain duplicates, if we order at the request index level we should never have any + // ambiguity. + return ml.MustLess() } type desiredRequestState struct { @@ -208,10 +224,10 @@ var it typedRoaring.Iterator[RequestIndex] t.getRequestablePieces( func(ih metainfo.Hash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool { if ih != *t.canonicalShortInfohash() { - return false + return true } if !p.peerHasPiece(pieceIndex) { - return false + return true } requestHeap.pieceStates[pieceIndex].Set(pieceExtra) allowedFast := p.peerAllowedFast.Contains(pieceIndex) diff --git a/segments/index.go b/segments/index.go index acbba430e2672c3bb8ab53bd98febc2f7eb96137..c897411ed38e4ab9e1933e722577004aa423f986 100644 --- a/segments/index.go +++ b/segments/index.go @@ -1,8 +1,9 @@ package segments import ( + "cmp" "iter" - "sort" + "slices" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/panicif" @@ -10,7 +11,7 @@ ) func NewIndex(segments LengthIter) (ret Index) { var start Length - for l, ok := segments(); ok; l, ok = segments() { + for l := range segments { ret.segments = append(ret.segments, Extent{start, l}) start += l } @@ -25,64 +26,48 @@ func NewIndexFromSegments(segments []Extent) Index { return Index{segments} } -func (me Index) iterSegments() func() (Extent, bool) { - var lastEnd g.Option[Int] - return func() (ret Extent, ok bool) { - if len(me.segments) == 0 { - return +// Yields segments as extents with Start relative to the previous segment's end. +func (me Index) iterSegments(startIndex int) iter.Seq[Extent] { + return func(yield func(Extent) bool) { + var lastEnd g.Option[Int] + for _, cur := range me.segments[startIndex:] { + ret := Extent{ + // Why ignore initial start on the first segment? + Start: cur.Start - lastEnd.UnwrapOr(cur.Start), + Length: cur.Length, + } + lastEnd.Set(cur.End()) + if !yield(ret) { + return + } } - cur := me.segments[0] - me.segments = me.segments[1:] - ret.Start = cur.Start - lastEnd.UnwrapOr(cur.Start) - ret.Length = cur.Length - lastEnd.Set(cur.End()) - ok = true - return } } -// Returns true if the callback returns false early, or extents are found in the index for all parts -// of the given extent. TODO: This might not handle discontiguous extents. To be tested. Needed for -// BitTorrent v2 possibly. -func (me Index) Locate(e Extent, output Callback) bool { - first := sort.Search(len(me.segments), func(i int) bool { - _e := me.segments[i] - return _e.End() > e.Start - }) - if first == len(me.segments) { - return e.Length == 0 - } - e.Start -= me.segments[first].Start - // The extent is before the first segment. - if e.Start < 0 { - e.Length += e.Start - e.Start = 0 - } - me.segments = me.segments[first:] - return ScanConsecutive(me.iterSegments(), e, func(i int, e Extent) bool { - return output(i+first, e) - }) -} - func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] { return func(yield func(int, Extent) bool) { - first := sort.Search(len(me.segments), func(i int) bool { - _e := me.segments[i] - return _e.End() > e.Start + // We find the first segment that ends after the start of the target extent. + first, eq := slices.BinarySearchFunc(me.segments, e.Start, func(elem Extent, target Int) int { + return cmp.Compare(elem.End(), target+1) }) + //fmt.Printf("binary search for %v in %v returned %v\n", e.Start, me.segments, first) if first == len(me.segments) { return } + _ = eq e.Start -= me.segments[first].Start // The extent is before the first segment. if e.Start < 0 { e.Length += e.Start e.Start = 0 } - me.segments = me.segments[first:] - ScanConsecutive(me.iterSegments(), e, func(i int, e Extent) bool { - return yield(i+first, e) - }) + i := first + for cons := range scanConsecutive(me.iterSegments(first), e) { + if !yield(i, cons) { + return + } + i++ + } } } diff --git a/segments/segments.go b/segments/segments.go index 83f1ea5c18b15d91531484f6dea2a82029938717..eb7fedb260cbd17da46aa5640c1c740e98ee1fc6 100644 --- a/segments/segments.go +++ b/segments/segments.go @@ -1,5 +1,9 @@ package segments +import ( + "iter" +) + type Int = int64 type Length = Int @@ -14,56 +18,37 @@ } type ( Callback = func(segmentIndex int, segmentBounds Extent) bool - LengthIter = func() (Length, bool) - ConsecutiveExtentIter = func() (Extent, bool) + LengthIter = iter.Seq[Length] + ConsecutiveExtentIter = iter.Seq[Extent] ) -// Returns true if callback returns false early, or all segments in the haystack for the needle are -// found. -func Scan(haystack LengthIter, needle Extent, callback Callback) bool { - return ScanConsecutive( - func() (Extent, bool) { - l, ok := haystack() - return Extent{0, l}, ok - }, - needle, - callback, - ) -} +// TODO: Does this handle discontiguous extents? +func scanConsecutive(haystack ConsecutiveExtentIter, needle Extent) iter.Seq[Extent] { + return func(yield func(Extent) bool) { + // Extents have been found in the haystack, and we're waiting for the needle to end. This is + // kind of for backwards compatibility for some tests that expect to have zero-length extents. + startedNeedle := false + next, stop := iter.Pull(haystack) + defer stop() + for needle.Length != 0 { + l, ok := next() + if !ok { + return + } -// Returns true if callback returns false early, or all segments in the haystack for the needle are -// found. TODO: Does this handle discontiguous extents? -func ScanConsecutive(haystack ConsecutiveExtentIter, needle Extent, callback Callback) bool { - i := 0 - // Extents have been found in the haystack, and we're waiting for the needle to end. This is - // kind of for backwards compatibility for some tests that expect to have zero-length extents. - startedNeedle := false - for needle.Length != 0 { - l, ok := haystack() - if !ok { - return false - } - - e1 := Extent{ - Start: max(needle.Start-l.Start, 0), - } - e1.Length = max(min(l.Length, needle.End()-l.Start)-e1.Start, 0) - needle.Start = max(0, needle.Start-l.End()) - needle.Length -= e1.Length + l.Start - if e1.Length > 0 || (startedNeedle && needle.Length != 0) { - if !callback(i, e1) { - return true + e1 := Extent{ + Start: max(needle.Start-l.Start, 0), + } + e1.Length = max(min(l.Length, needle.End()-l.Start)-e1.Start, 0) + needle.Start = max(0, needle.Start-l.End()) + needle.Length -= e1.Length + l.Start + if e1.Length > 0 || (startedNeedle && needle.Length != 0) { + if !yield(e1) { + return + } + startedNeedle = true } - startedNeedle = true } - i++ - } - return true -} - -func LocaterFromLengthIter(li LengthIter) Locater { - return func(e Extent, c Callback) bool { - return Scan(li, e, c) } } diff --git a/segments/segments_test.go b/segments/segments_test.go index 1620f77bd5cba9b0c09a164a3e822bc15de7678c..4723926d2a7995ec95b394f64a814aab8c6fc829 100644 --- a/segments/segments_test.go +++ b/segments/segments_test.go @@ -1,22 +1,14 @@ package segments import ( + "slices" "testing" - qt "github.com/go-quicktest/qt" + "github.com/go-quicktest/qt" ) func LengthIterFromSlice(ls []Length) LengthIter { - return func() (Length, bool) { - switch len(ls) { - case 0: - return -1, false - default: - l := ls[0] - ls = ls[1:] - return l, true - } - } + return slices.Values(ls) } type ScanCallbackValue struct { @@ -86,14 +78,63 @@ {0, 1537}, {0, 1536}, {0, 667}, }) -} - -func TestScan(t *testing.T) { - testLocater(t, LocaterFromLengthIter) + checkContiguous(t, newLocater, + []Length{0, 2, 0, 2, 0}, // 128737588 + Extent{1, 2}, + 1, + []Extent{ + {1, 1}, + {0, 0}, + {0, 1}, + }) + checkContiguous(t, newLocater, + []Length{2, 0, 2, 0}, // 128737588 + Extent{1, 3}, + 0, + []Extent{ + {1, 1}, + {0, 0}, + {0, 2}, + }) + checkContiguous(t, newLocater, + []Length{2, 0, 1, 0, 0, 1}, + Extent{3, 2}, + 5, + []Extent{ + {0, 1}, + }) + checkContiguous(t, newLocater, + []Length{2, 0, 1, 0, 0, 1}, + Extent{2, 2}, + 2, + []Extent{ + {0, 1}, + {0, 0}, + {0, 0}, + {0, 1}, + }) + checkContiguous(t, newLocater, + []Length{}, + Extent{1, 1}, + 0, + []Extent{}) + checkContiguous(t, newLocater, + []Length{0}, + Extent{1, 1}, + 0, + []Extent{}) } -func TestIndex(t *testing.T) { +func TestIndexLocateIter(t *testing.T) { testLocater(t, func(li LengthIter) Locater { - return NewIndex(li).Locate + index := NewIndex(li) + return func(extent Extent, callback Callback) bool { + for i, e := range index.LocateIter(extent) { + if !callback(i, e) { + return false + } + } + return true + } }) } diff --git a/smartban.go b/smartban.go index 857ca2914d3cbd97ddcca9995bc7b491a44461d7..dd7d6662f7e0d07ab261094a79ef64e000aa1d63 100644 --- a/smartban.go +++ b/smartban.go @@ -11,6 +11,7 @@ ) type bannableAddr = netip.Addr +// TODO: Should be keyed on weak[Peer]. type smartBanCache = smartban.Cache[bannableAddr, RequestIndex, uint64] type blockCheckingWriter struct { diff --git a/sources.go b/sources.go index 18aa6975478a0f97183f03ec1af14d3cfa7164fa..46eafbdbf67e765cb53d654104091440dc2c2c43 100644 --- a/sources.go +++ b/sources.go @@ -36,12 +36,15 @@ ctx := t.getInfoCtx for { var retry g.Option[time.Duration] retry, err = t.trySource(source) - if err == nil || ctx.Err() != nil || !retry.Ok { + if err == nil || ctx.Err() != nil { return } t.slogger().Warn("error using torrent source", "source", source, "err", err) + if !retry.Ok { + return + } select { - case <-time.After(retry.Value): + case <-time.After(retry.Unwrap()): case <-ctx.Done(): } } @@ -56,15 +59,15 @@ if ctx.Err() != nil { return } var mi metainfo.MetaInfo - mi, err = getTorrentSource(ctx, source, t.cl.httpClient) + mi, err = getTorrentSource(ctx, source, t.cl.config.MetainfoSourcesClient) if ctx.Err() != nil { return } if err != nil { - retry.Set(time.Duration(rand.Int64N(int64(time.Minute)))) + retry.Set(time.Minute + time.Duration(rand.Int64N(int64(time.Minute)))) return } - err = t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) + err = t.cl.config.MetainfoSourcesMerger(t, &mi) return } diff --git a/spec.go b/spec.go index e8381943179795c152f46a0b8594780205897fed..27274c5eeb0954a6e33aa0eb5f4b96441d052dd2 100644 --- a/spec.go +++ b/spec.go @@ -27,10 +27,6 @@ // The combination of the "xs" and "as" fields in magnet links, for now. Sources []string // BEP 52 "piece layers" from metainfo PieceLayers map[string]string - - // Whether to allow data download or upload - DisallowDataUpload bool - DisallowDataDownload bool } func TorrentSpecFromMagnetUri(uri string) (spec *TorrentSpec, err error) { diff --git a/storage/file-client.go b/storage/file-client.go index f4ffb0378b091ea01ee7f1aaaac459a560b1d2f9..793b7e58d90e31bd0ad261bee9139ec83999e37e 100644 --- a/storage/file-client.go +++ b/storage/file-client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "os" "path/filepath" g "github.com/anacrolix/generics" @@ -70,6 +71,29 @@ func (me *fileClientImpl) Close() error { return me.opts.PieceCompletion.Close() } +var defaultFileIo func() fileIo = func() fileIo { + return &mmapFileIo{} +} + +func init() { + s, ok := os.LookupEnv("TORRENT_STORAGE_DEFAULT_FILE_IO") + if !ok { + return + } + switch s { + case "mmap": + defaultFileIo = func() fileIo { + return &mmapFileIo{} + } + case "classic": + defaultFileIo = func() fileIo { + return classicFileIo{} + } + default: + panic(s) + } +} + func (fs *fileClientImpl) OpenTorrent( ctx context.Context, info *metainfo.Info, @@ -104,6 +128,7 @@ files, metainfoFileInfos, info.FileSegmentsIndex(), infoHash, + defaultFileIo(), fs, } if t.partFiles() { @@ -116,6 +141,5 @@ } return TorrentImpl{ Piece: t.Piece, Close: t.Close, - Flush: t.Flush, }, nil } diff --git a/storage/file-io-classic.go b/storage/file-io-classic.go new file mode 100644 index 0000000000000000000000000000000000000000..bb54e5a773a01c1d00a7e9d5eb8402278a933ce4 --- /dev/null +++ b/storage/file-io-classic.go @@ -0,0 +1,32 @@ +package storage + +import ( + "os" +) + +type classicFileIo struct{} + +func (me classicFileIo) flush(name string, offset, nbytes int64) error { + return fsync(name) +} + +func (me classicFileIo) openForSharedRead(name string) (sharedFileIf, error) { + return sharedFiles.Open(name) +} + +func (me classicFileIo) openForRead(name string) (fileReader, error) { + f, err := os.Open(name) + return classicFileReader{f}, err +} + +func (classicFileIo) openForWrite(p string, size int64) (f fileWriter, err error) { + return openFileExtra(p, os.O_WRONLY) +} + +type classicFileReader struct { + *os.File +} + +func (c classicFileReader) seekData(offset int64) (ret int64, err error) { + return seekData(c.File, offset) +} diff --git a/storage/file-io-common.go b/storage/file-io-common.go new file mode 100644 index 0000000000000000000000000000000000000000..ffc87cc672a96ee30ad84609ce0e9e14983d2ea1 --- /dev/null +++ b/storage/file-io-common.go @@ -0,0 +1,35 @@ +package storage + +import ( + "errors" + "io/fs" + "os" + "path/filepath" + + "github.com/anacrolix/missinggo/v2/panicif" +) + +// Opens file for write, creating dirs and fixing permissions as necessary. +func openFileExtra(p string, osRdwr int) (f *os.File, err error) { + panicif.NotZero(osRdwr & ^(os.O_RDONLY | os.O_RDWR | os.O_WRONLY)) + flag := osRdwr | os.O_CREATE + f, err = os.OpenFile(p, flag, filePerm) + if err == nil { + return + } + if errors.Is(err, fs.ErrNotExist) { + err = os.MkdirAll(filepath.Dir(p), dirPerm) + if err != nil { + return + } + } else if errors.Is(err, fs.ErrPermission) { + err = os.Chmod(p, filePerm) + if err != nil { + return + } + } else { + return + } + f, err = os.OpenFile(p, flag, filePerm) + return +} diff --git a/storage/file-io-mmap.go b/storage/file-io-mmap.go new file mode 100644 index 0000000000000000000000000000000000000000..422c1c0af118ad026fa26022f2091e4222490ab9 --- /dev/null +++ b/storage/file-io-mmap.go @@ -0,0 +1,220 @@ +package storage + +import ( + "fmt" + "io" + "io/fs" + "os" + "sync" + "sync/atomic" + + g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" + "github.com/edsrzf/mmap-go" +) + +type mmapFileIo struct { + mu sync.RWMutex + paths map[string]*fileMmap +} + +func (me *mmapFileIo) flush(name string, offset, nbytes int64) error { + // Since we are only flushing writes that we created, and we don't currently unmap files after + // we've opened them, then if the mmap doesn't exist yet then there's nothing to flush. + me.mu.RLock() + defer me.mu.RUnlock() + v, ok := me.paths[name] + if !ok { + return nil + } + if !v.writable { + return nil + } + return msync(v.m, int(offset), int(nbytes)) +} + +type fileMmap struct { + m mmap.MMap + writable bool + refs atomic.Int32 +} + +func (me *fileMmap) dec() error { + if me.refs.Add(-1) == 0 { + return me.m.Unmap() + } + return nil +} + +func (me *fileMmap) inc() { + panicif.LessThanOrEqual(me.refs.Add(1), 0) +} + +func (m *mmapFileIo) openForSharedRead(name string) (_ sharedFileIf, err error) { + return m.openReadOnly(name) +} + +func (m *mmapFileIo) openForRead(name string) (_ fileReader, err error) { + sh, err := m.openReadOnly(name) + if err != nil { + return + } + return &mmapFileHandle{ + shared: sh, + }, nil +} + +func (m *mmapFileIo) openReadOnly(name string) (_ *mmapSharedFileHandle, err error) { + m.mu.Lock() + defer m.mu.Unlock() + v, ok := m.paths[name] + if ok { + return newMmapFile(v), nil + } + f, err := os.Open(name) + if err != nil { + return + } + defer f.Close() + mm, err := mmap.Map(f, mmap.RDONLY, 0) + if err != nil { + err = fmt.Errorf("mapping file: %w", err) + return + } + v = m.addNewMmap(name, mm, false) + return newMmapFile(v), nil +} + +func (m *mmapFileIo) openForWrite(name string, size int64) (_ fileWriter, err error) { + m.mu.Lock() + defer m.mu.Unlock() + v, ok := m.paths[name] + if ok { + if int64(len(v.m)) == size && v.writable { + v.inc() + return newMmapFile(v), nil + } else { + v.dec() + g.MustDelete(m.paths, name) + } + } + f, err := openFileExtra(name, os.O_RDWR) + if err != nil { + return + } + defer f.Close() + err = f.Truncate(size) + if err != nil { + err = fmt.Errorf("error truncating file: %w", err) + return + } + mm, err := mmap.Map(f, mmap.RDWR, 0) + if err != nil { + return + } + // This can happen due to filesystem changes outside our control. Don't be naive. + if int64(len(mm)) != size { + err = fmt.Errorf("new mmap has wrong size %v, expected %v", len(mm), size) + mm.Unmap() + return + } + return newMmapFile(m.addNewMmap(name, mm, true)), nil +} + +func newMmapFile(f *fileMmap) *mmapSharedFileHandle { + ret := &mmapSharedFileHandle{ + f: f, + } + ret.f.inc() + return ret +} + +func (me *mmapFileIo) addNewMmap(name string, mm mmap.MMap, writable bool) *fileMmap { + v := &fileMmap{ + m: mm, + writable: writable, + } + // One for the store, one for the caller. + v.refs.Store(1) + g.MakeMapIfNil(&me.paths) + g.MapMustAssignNew(me.paths, name, v) + return v +} + +var _ fileIo = (*mmapFileIo)(nil) + +type mmapSharedFileHandle struct { + f *fileMmap + close sync.Once +} + +func (m *mmapSharedFileHandle) WriteAt(p []byte, off int64) (n int, err error) { + //fmt.Println("mmapSharedFileHandle.WriteAt", off, len(p), len(m.f.m)) + n = copy(m.f.m[off:], p) + return +} + +func (m *mmapSharedFileHandle) WriteTo(w io.Writer) (n int64, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mmapSharedFileHandle) ReadAt(p []byte, off int64) (n int, err error) { + n = copy(p, m.f.m[off:]) + if n < len(p) { + if off < 0 { + err = fs.ErrInvalid + return + } + } + if off+int64(n) == int64(len(m.f.m)) { + err = io.EOF + } + return +} + +func (m *mmapSharedFileHandle) Close() (err error) { + m.close.Do(func() { + err = m.f.dec() + }) + return +} + +type mmapFileHandle struct { + shared *mmapSharedFileHandle + pos int64 +} + +func (me *mmapFileHandle) WriteTo(w io.Writer) (n int64, err error) { + b := me.shared.f.m + if me.pos >= int64(len(b)) { + return + } + n1, err := w.Write(b[me.pos:]) + n = int64(n1) + me.pos += n + return +} + +func (me *mmapFileHandle) Close() error { + return me.shared.Close() +} + +func (me *mmapFileHandle) Read(p []byte) (n int, err error) { + if me.pos > int64(len(me.shared.f.m)) { + err = io.EOF + return + } + n = copy(p, me.shared.f.m[me.pos:]) + me.pos += int64(n) + if me.pos >= int64(len(me.shared.f.m)) { + err = io.EOF + } + return +} + +func (me *mmapFileHandle) seekData(offset int64) (ret int64, err error) { + me.pos = offset + ret = offset + return +} diff --git a/storage/file-io.go b/storage/file-io.go new file mode 100644 index 0000000000000000000000000000000000000000..f7c4495eb6dda458f8d663a775e1bbec79ac574b --- /dev/null +++ b/storage/file-io.go @@ -0,0 +1,25 @@ +package storage + +import ( + "io" +) + +type fileWriter interface { + io.WriterAt + io.Closer +} + +type fileReader interface { + // Seeks to the next data in the file. If hole-seeking/sparse-files are not supported, should + // seek to the offset. + seekData(offset int64) (ret int64, err error) + io.WriterTo + io.ReadCloser +} + +type fileIo interface { + openForSharedRead(name string) (sharedFileIf, error) + openForRead(name string) (fileReader, error) + openForWrite(name string, size int64) (fileWriter, error) + flush(name string, offset, nbytes int64) error +} diff --git a/storage/file-misc.go b/storage/file-misc.go index 27e0c1266def7fdb4d965ed646532a3d06475dd8..73396200bd0c8d5567d89ea30493be940a35e463 100644 --- a/storage/file-misc.go +++ b/storage/file-misc.go @@ -1,9 +1,7 @@ package storage import ( - "errors" "io" - "io/fs" "os" "path/filepath" "sync" @@ -18,18 +16,20 @@ func minFileLengthsForTorrentExtent( fileSegmentsIndex segments.Index, off, n int64, each func(fileIndex int, length int64) bool, -) bool { - return fileSegmentsIndex.Locate(segments.Extent{ +) { + for fileIndex, segmentBounds := range fileSegmentsIndex.LocateIter(segments.Extent{ Start: off, Length: n, - }, func(fileIndex int, segmentBounds segments.Extent) bool { - return each(fileIndex, segmentBounds.Start+segmentBounds.Length) - }) + }) { + if !each(fileIndex, segmentBounds.Start+segmentBounds.Length) { + return + } + } } func fsync(filePath string) (err error) { f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm) - if err != nil && !errors.Is(err, fs.ErrNotExist) { + if err != nil { return } defer f.Close() diff --git a/storage/file-piece.go b/storage/file-piece.go index 654ea155c9060c1de3cfadbd899991345d924e70..76ec477e25c30c3c9241dffbb2c166894fb89bf1 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -2,6 +2,7 @@ package storage import ( "errors" + "expvar" "fmt" "io" "io/fs" @@ -28,8 +29,22 @@ var _ interface { PieceImpl //PieceReaderer io.WriterTo + Flusher } = (*filePieceImpl)(nil) +func (me *filePieceImpl) Flush() (err error) { + for fileIndex, extent := range me.fileExtents() { + file := me.t.file(fileIndex) + name := me.t.pathForWrite(&file) + err1 := me.t.io.flush(name, extent.Start, extent.Length) + if err1 != nil { + err = errors.Join(err, fmt.Errorf("flushing %q: %w", name, err1)) + return + } + } + return nil +} + func (me *filePieceImpl) logger() *slog.Logger { return me.t.client.opts.Logger } @@ -45,9 +60,13 @@ Length: me.p.Length(), } } +func (me *filePieceImpl) fileExtents() iter.Seq2[int, segments.Extent] { + return me.t.segmentLocater.LocateIter(me.extent()) +} + func (me *filePieceImpl) pieceFiles() iter.Seq[file] { return func(yield func(file) bool) { - for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) { + for fileIndex := range me.fileExtents() { f := me.t.file(fileIndex) if !yield(f) { return @@ -73,16 +92,15 @@ } func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] { return func(yield func(int, segments.Extent) bool) { + pieceExtent := me.extent() noFiles := true - for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) { + for i, extent := range me.t.segmentLocater.LocateIter(pieceExtent) { noFiles = false if !yield(i, extent) { return } } - if noFiles { - panic("files do not cover piece extent") - } + panicif.NotEq(noFiles, pieceExtent.Length == 0) } } @@ -214,95 +232,51 @@ } func (me *filePieceImpl) promotePartFile(f file) (err error) { + if !me.partFiles() { + return nil + } f.mu.Lock() defer f.mu.Unlock() f.race++ - if me.partFiles() { - err = me.exclRenameIfExists(f.partFilePath(), f.safeOsPath) - if err != nil { - return - } - } - info, err := os.Stat(f.safeOsPath) + renamed, err := me.exclRenameIfExists(f.partFilePath(), f.safeOsPath) if err != nil { - err = fmt.Errorf("statting file: %w", err) return } - // Clear writability for the file. - err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222) - if err != nil { - err = fmt.Errorf("setting file to read-only: %w", err) + if !renamed { return } + err = os.Chmod(f.safeOsPath, filePerm&^0o222) + if err != nil { + me.logger().Info("error setting promoted file to read-only", "file", f.safeOsPath, "err", err) + err = nil + } return } // Rename from if exists, and if so, to must not exist. -func (me *filePieceImpl) exclRenameIfExists(from, to string) error { - if true { - // Might be cheaper to check source exists than to create destination regardless. - _, err := os.Stat(from) - if errors.Is(err, fs.ErrNotExist) { - return nil - } - if err != nil { - return err - } - } - panicif.Eq(from, to) - // We don't want anyone reading or writing to this until the rename completes. - f, err := os.OpenFile(to, os.O_CREATE|os.O_EXCL, 0) - if errors.Is(err, fs.ErrExist) { - _, err = os.Stat(from) - if errors.Is(err, fs.ErrNotExist) { - return nil - } - if err != nil { - return err - } - return errors.New("source and destination files both exist") - } - if err != nil { - return fmt.Errorf("exclusively creating destination file: %w", err) - } - f.Close() +func (me *filePieceImpl) exclRenameIfExists(from, to string) (renamed bool, err error) { err = os.Rename(from, to) if err != nil { if errors.Is(err, fs.ErrNotExist) { - // Someone else has moved it already. - return nil + err = nil } - // If we can't rename it, remove the blocking destination file we made. Maybe the remove - // error should be logged separately since it's not actionable. - return errors.Join(err, os.Remove(to)) + return } + renamed = true me.logger().Debug("renamed file", "from", from, "to", to) - return nil + return } func (me *filePieceImpl) onFileNotComplete(f file) (err error) { + if !me.partFiles() { + return + } f.mu.Lock() defer f.mu.Unlock() f.race++ - if me.partFiles() { - err = me.exclRenameIfExists(f.safeOsPath, f.partFilePath()) - if err != nil { - err = fmt.Errorf("restoring part file: %w", err) - return - } - } - info, err := os.Stat(me.pathForWrite(&f)) - if errors.Is(err, fs.ErrNotExist) { - return nil - } + _, err = me.exclRenameIfExists(f.safeOsPath, f.partFilePath()) if err != nil { - err = fmt.Errorf("statting file: %w", err) - return - } - // Ensure the file is writable - err = os.Chmod(me.pathForWrite(&f), info.Mode().Perm()|(filePerm&0o222)) - if err != nil { - err = fmt.Errorf("setting file writable: %w", err) + err = fmt.Errorf("restoring part file: %w", err) return } return @@ -316,23 +290,104 @@ func (me *filePieceImpl) partFiles() bool { return me.t.partFiles() } +type zeroReader struct{} + +func (me zeroReader) Read(p []byte) (n int, err error) { + clear(p) + return len(p), nil +} + func (me *filePieceImpl) WriteTo(w io.Writer) (n int64, err error) { for fileIndex, extent := range me.iterFileSegments() { - file := me.t.file(fileIndex) - var f *os.File - f, err = me.t.openFile(file) + var n1 int64 + n1, err = me.writeFileTo(w, fileIndex, extent) + n += n1 if err != nil { return } - f.Seek(extent.Start, io.SeekStart) + panicif.GreaterThan(n1, extent.Length) + if n1 < extent.Length { + return + } + panicif.NotEq(n1, extent.Length) + } + return +} + +var ( + packageExpvarMap = expvar.NewMap("torrentStorage") +) + +type limitWriter struct { + rem int64 + w io.Writer +} + +func (me *limitWriter) Write(p []byte) (n int, err error) { + n, err = me.w.Write(p[:min(int64(len(p)), me.rem)]) + me.rem -= int64(n) + if err != nil { + return + } + p = p[n:] + if len(p) > 0 { + err = io.ErrShortWrite + } + return +} + +func (me *filePieceImpl) writeFileTo(w io.Writer, fileIndex int, extent segments.Extent) (written int64, err error) { + if extent.Length == 0 { + return + } + file := me.t.file(fileIndex) + // Do we want io.WriterTo here, or are we happy to let that be type asserted in io.CopyN? + var f fileReader + f, err = me.t.openFile(file) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + err = nil + } + return + } + defer f.Close() + panicif.GreaterThan(extent.End(), file.FileInfo.Length) + extentRemaining := extent.Length + var dataOffset int64 + dataOffset, err = f.seekData(extent.Start) + if err == io.EOF { + return + } + panicif.Err(err) + panicif.LessThan(dataOffset, extent.Start) + if dataOffset > extent.Start { + // Write zeroes until the end of the hole we're in. var n1 int64 - n1, err = io.CopyN(w, f, extent.Length) - n += n1 - f.Close() + n := min(dataOffset-extent.Start, extent.Length) + n1, err = io.CopyN(w, zeroReader{}, n) + packageExpvarMap.Add("bytesReadSkippedHole", n1) + written += n1 if err != nil { return } + panicif.NotEq(n1, n) + extentRemaining -= n1 } + var n1 int64 + if true { + n1, err = f.WriteTo(&limitWriter{ + rem: extentRemaining, + w: w, + }) + // limitWriter will block f from writing too much. + if n1 == extentRemaining { + err = nil + } + } else { + n1, err = io.CopyN(w, f, extentRemaining) + } + packageExpvarMap.Add("bytesReadNotSkipped", n1) + written += n1 return } diff --git a/storage/file-torrent-io.go b/storage/file-torrent-io.go index c2080b05f83635af79dfe4c6344db48f77ae6f98..2c2b1b63bd89521e31be029bd966934da1d901ca 100644 --- a/storage/file-torrent-io.go +++ b/storage/file-torrent-io.go @@ -4,8 +4,8 @@ import ( "errors" "io" "io/fs" - "os" - "path/filepath" + + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/segments" ) @@ -48,65 +48,56 @@ } // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF. func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { - fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool { + for i, e := range fst.fts.segmentLocater.LocateIter( + segments.Extent{off, int64(len(b))}, + ) { n1, err1 := fst.readFileAt(fst.fts.file(i), b[:e.Length], e.Start) n += n1 b = b[n1:] - err = err1 - return err == nil // && int64(n1) == e.Length - }) - if len(b) != 0 && err == nil { + if segments.Int(n1) == e.Length { + switch err1 { + // ReaderAt.ReadAt contract. + case nil, io.EOF: + default: + err = err1 + return + } + } else { + panicif.Nil(err1) + err = err1 + return + } + } + if len(b) != 0 { + // We're at the end of the torrent. err = io.EOF } return } -func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) { - // It might be possible to have a writable handle shared files cache if we need it. - fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath) - p := fst.fts.pathForWrite(&file) - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - if err == nil { - return - } - if errors.Is(err, fs.ErrNotExist) { - err = os.MkdirAll(filepath.Dir(p), dirPerm) +func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { + for i, e := range fst.fts.segmentLocater.LocateIter( + segments.Extent{off, int64(len(p))}, + ) { + var f fileWriter + f, err = fst.fts.openForWrite(fst.fts.file(i)) if err != nil { return } - } else if errors.Is(err, fs.ErrPermission) { - err = os.Chmod(p, filePerm) + var n1 int + n1, err = f.WriteAt(p[:e.Length], e.Start) + closeErr := f.Close() + n += n1 + p = p[n1:] + if err == nil { + err = closeErr + } + if err == nil && int64(n1) != e.Length { + err = io.ErrShortWrite + } if err != nil { return } } - f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm) - return -} - -func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { - // log.Printf("write at %v: %v bytes", off, len(p)) - fst.fts.segmentLocater.Locate( - segments.Extent{off, int64(len(p))}, - func(i int, e segments.Extent) bool { - var f *os.File - f, err = fst.openForWrite(fst.fts.file(i)) - if err != nil { - return false - } - var n1 int - n1, err = f.WriteAt(p[:e.Length], e.Start) - // log.Printf("%v %v wrote %v: %v", i, e, n1, err) - closeErr := f.Close() - n += n1 - p = p[n1:] - if err == nil { - err = closeErr - } - if err == nil && int64(n1) != e.Length { - err = io.ErrShortWrite - } - return err == nil - }) return } diff --git a/storage/file-torrent.go b/storage/file-torrent.go index 55200d4f2a2c0aaabbf97fc7947441f7d8520083..fac6ab7f62dc1ed406031fd58511a4b8860a8011 100644 --- a/storage/file-torrent.go +++ b/storage/file-torrent.go @@ -9,6 +9,7 @@ "log/slog" "os" "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" @@ -20,6 +21,7 @@ files []fileExtra metainfoFileInfos []metainfo.FileInfo segmentLocater segments.Index infoHash metainfo.Hash + io fileIo // Save memory by pointing to the other data. client *fileClientImpl } @@ -57,18 +59,24 @@ fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length()) } else if !errors.Is(err, fs.ErrNotExist) { fts.logger().Warn("error checking file size", "err", err) } + // Ensure all pieces associated with a file are not marked as complete (at most unknown). for pieceIndex := f.beginPieceIndex(); pieceIndex < f.endPieceIndex(); pieceIndex++ { notComplete[pieceIndex] = true } } for i, nc := range notComplete { if nc { - // Use whatever the piece completion has, or trigger a hash. - continue - } - err := fts.setPieceCompletion(i, true) - if err != nil { - return fmt.Errorf("setting piece %v completion: %w", i, err) + c := fts.getCompletion(i) + if c.Complete { + // TODO: We need to set unknown so that verification of the data we do have could + // occur naturally but that'll be a big change. + panicif.Err(fts.setPieceCompletion(i, false)) + } + } else { + err := fts.setPieceCompletion(i, true) + if err != nil { + return fmt.Errorf("setting piece %v completion: %w", i, err) + } } } return nil @@ -107,17 +115,6 @@ func (fs *fileTorrentImpl) Close() error { return nil } -func (fts *fileTorrentImpl) Flush() error { - for i := range fts.files { - f := fts.file(i) - fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath) - if err := fsync(fts.pathForWrite(&f)); err != nil { - return err - } - } - return nil -} - func (fts *fileTorrentImpl) file(index int) file { return file{ Info: fts.info, @@ -132,26 +129,32 @@ file.mu.RLock() // Fine to open once under each name on a unix system. We could make the shared file keys more // constrained, but it shouldn't matter. TODO: Ensure at most one of the names exist. if me.partFiles() { - f, err = sharedFiles.Open(file.partFilePath()) + f, err = me.io.openForSharedRead(file.partFilePath()) } if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) { - f, err = sharedFiles.Open(file.safeOsPath) + f, err = me.io.openForSharedRead(file.safeOsPath) } file.mu.RUnlock() return } -// Open file for reading. -func (me *fileTorrentImpl) openFile(file file) (f *os.File, err error) { +// Open file for reading. Not a shared handle if that matters. +func (me *fileTorrentImpl) openFile(file file) (f fileReader, err error) { file.mu.RLock() // Fine to open once under each name on a unix system. We could make the shared file keys more // constrained, but it shouldn't matter. TODO: Ensure at most one of the names exist. if me.partFiles() { - f, err = os.Open(file.partFilePath()) + f, err = me.io.openForRead(file.partFilePath()) } if err == nil && f == nil || errors.Is(err, fs.ErrNotExist) { - f, err = os.Open(file.safeOsPath) + f, err = me.io.openForRead(file.safeOsPath) } file.mu.RUnlock() return } + +func (fst *fileTorrentImpl) openForWrite(file file) (_ fileWriter, err error) { + // It might be possible to have a writable handle shared files cache if we need it. + fst.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath) + return fst.io.openForWrite(fst.pathForWrite(&file), file.FileInfo.Length) +} diff --git a/storage/interface.go b/storage/interface.go index 9aadfc8a4988fc3e2c6d66111f71ca28a5207297..a5acc56f62ebf92e786b2075c72178da50ed8b55 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -32,7 +32,6 @@ Piece func(p metainfo.Piece) PieceImpl // Preferred over PieceWithHash. Called with the piece hash if it's available. PieceWithHash func(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl Close func() error - Flush func() error // Storages that share the same space, will provide equal pointers. The function is called once // to determine the storage for torrents sharing the same function pointer, and mutated in // place. @@ -62,6 +61,10 @@ // Returns the state of a piece. Typically, this is implemented in some kind of storage to avoid // rehashing, and cheap checks are performed here. (The implementation maintains a cache in // Torrent). Completion() Completion +} + +type Flusher interface { + Flush() error } // Completion state of a piece. diff --git a/storage/mmap.go b/storage/mmap.go index 9587927608739af7fee6d7533a680ded145357be..6b8478d8460fdc5e15928ddf94c6e8da6327a2c2 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -46,7 +46,7 @@ infoHash: infoHash, span: span, pc: s.pc, } - return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err + return TorrentImpl{Piece: t.Piece, Close: t.Close}, err } func (s *mmapClientImpl) Close() error { @@ -61,9 +61,8 @@ } func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl { return mmapStoragePiece{ - pc: ts.pc, + t: ts, p: p, - ih: ts.infoHash, ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()), WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()), } @@ -73,28 +72,27 @@ func (ts *mmapTorrentStorage) Close() error { return ts.span.Close() } -func (ts *mmapTorrentStorage) Flush() error { - errs := ts.span.Flush() - if len(errs) > 0 { - return errs[0] - } - return nil -} - type mmapStoragePiece struct { - pc PieceCompletionGetSetter - p metainfo.Piece - ih metainfo.Hash + t *mmapTorrentStorage + p metainfo.Piece io.ReaderAt io.WriterAt +} + +var _ Flusher = mmapStoragePiece{} + +func (me mmapStoragePiece) Flush() error { + // TODO: Flush just the regions of the files we care about. At least this is no worse than it + // was previously. + return me.t.span.Flush() } func (me mmapStoragePiece) pieceKey() metainfo.PieceKey { - return metainfo.PieceKey{me.ih, me.p.Index()} + return metainfo.PieceKey{me.t.infoHash, me.p.Index()} } func (sp mmapStoragePiece) Completion() Completion { - c, err := sp.pc.Get(sp.pieceKey()) + c, err := sp.t.pc.Get(sp.pieceKey()) if err != nil { panic(err) } @@ -102,11 +100,11 @@ return c } func (sp mmapStoragePiece) MarkComplete() error { - return sp.pc.Set(sp.pieceKey(), true) + return sp.t.pc.Set(sp.pieceKey(), true) } func (sp mmapStoragePiece) MarkNotComplete() error { - return sp.pc.Set(sp.pieceKey(), false) + return sp.t.pc.Set(sp.pieceKey(), false) } func mMapTorrent(md *metainfo.Info, location string) (mms *mmapSpan.MMapSpan, err error) { diff --git a/storage/sys_unix.go b/storage/sys_unix.go new file mode 100644 index 0000000000000000000000000000000000000000..625f420f10ee3e8314bb7ec273c1b87386d87191 --- /dev/null +++ b/storage/sys_unix.go @@ -0,0 +1,24 @@ +//go:build unix + +package storage + +import ( + "io" + "os" + + "github.com/edsrzf/mmap-go" + "golang.org/x/sys/unix" +) + +func seekData(f *os.File, offset int64) (ret int64, err error) { + ret, err = unix.Seek(int(f.Fd()), offset, unix.SEEK_DATA) + if err == unix.ENXIO { + // File has no more data. Treat as short write like io.CopyN. + err = io.EOF + } + return +} + +func msync(mm mmap.MMap, offset, nbytes int) error { + return unix.Msync(mm[offset:offset+nbytes], unix.MS_SYNC) +} diff --git a/storage/sys_windows.go b/storage/sys_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..10ce40f868051c888d1b01b9c79d80d874f1783b --- /dev/null +++ b/storage/sys_windows.go @@ -0,0 +1,17 @@ +package storage + +import ( + "io" + "os" + + "github.com/edsrzf/mmap-go" +) + +func seekData(f *os.File, offset int64) (ret int64, err error) { + return f.Seek(offset, io.SeekStart) +} + +func msync(mm mmap.MMap, offset, nbytes int) error { + // Fuck you Windows you suck. TODO: Use windows.FlushViewOfFile. + return mm.Flush() +} diff --git a/storage/wrappers.go b/storage/wrappers.go index bcb9d21ec588b1fd4ac139816d81c099fe22e63d..22e3812d9790626d020b756153248aae6cc209dc 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -7,7 +7,6 @@ "io" "os" g "github.com/anacrolix/generics" - "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent/metainfo" ) @@ -90,7 +89,6 @@ // } if off+int64(len(b)) > p.mip.Length() { panic("write overflows piece") } - b = missinggo.LimitLen(b, p.mip.Length()-off) return p.PieceImpl.WriteAt(b, off) } @@ -105,7 +103,7 @@ if off >= p.mip.Length() { err = io.EOF return } - b = missinggo.LimitLen(b, p.mip.Length()-off) + b = b[:min(int64(len(b)), p.mip.Length()-off)] if len(b) == 0 { return } @@ -144,6 +142,13 @@ }{ p, nopCloser{}, }, nil +} + +func (p Piece) Flush() error { + if fl, ok := p.PieceImpl.(Flusher); ok { + return fl.Flush() + } + return nil } type nopCloser struct{} diff --git a/t.go b/t.go index 5b42a10170ed1b41e59ce71fa88f796128ec8901..ceea328ad0a409a7d3b91e27546efd50bfc28171 100644 --- a/t.go +++ b/t.go @@ -96,18 +96,20 @@ return int64(t.pieces[piece].bytesLeft()) } -// Drop the torrent from the client, and close it. It's always safe to do -// this. No data corruption can, or should occur to either the torrent's data, -// or connected peers. +// Drop the torrent from the client, and close it. It's always safe to do this. No data corruption +// can, or should occur to either the torrent's data, or connected peers. func (t *Torrent) Drop() { - var wg sync.WaitGroup - defer wg.Wait() + if t.closed.IsSet() { + return + } t.cl.lock() defer t.cl.unlock() - err := t.cl.dropTorrent(t, &wg) - if err != nil { - panic(err) + if t.closed.IsSet() { + return } + var wg sync.WaitGroup + t.close(&wg) + wg.Wait() } // Number of bytes of the entire torrent we have completed. This is the sum of @@ -291,6 +293,7 @@ } return ret } +// TODO: Misleading method name. Webseed peers are not PeerConns. func (t *Torrent) WebseedPeerConns() []*Peer { t.cl.rLock() defer t.cl.rUnlock() diff --git a/test_test.go b/test_test.go index 11ebb3b3d11fe65f69966f28499de59ab389e6d1..c65039004288269f155c43a7ea6a5837975d4da8 100644 --- a/test_test.go +++ b/test_test.go @@ -5,16 +5,17 @@ import ( "testing" + "github.com/go-quicktest/qt" + "github.com/anacrolix/torrent/metainfo" ) func newTestingClient(t testing.TB) *Client { - cl := new(Client) - cl.init(TestingConfig(t)) + cl, err := NewClient(TestingConfig(t)) + qt.Assert(t, qt.IsNil(err)) t.Cleanup(func() { cl.Close() }) - cl.initLogger() return cl } diff --git a/tests/issue-930/server.go b/tests/issue-930/server.go index 5507f53da18c5104c6f662c759bd8d32e1419b1f..c5bedf386d1772954863cdda0a29c6a0b7f0c90d 100644 --- a/tests/issue-930/server.go +++ b/tests/issue-930/server.go @@ -39,7 +39,6 @@ }() index++ w.Write([]byte("OK")) - return }) if err := http.ListenAndServe(":8080", mux); err != nil { diff --git a/time_test.go b/time_test.go new file mode 100644 index 0000000000000000000000000000000000000000..10cbafc73d7b9237c864a6f3c625bf9c74366b21 --- /dev/null +++ b/time_test.go @@ -0,0 +1 @@ +package torrent diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go index 1e6cde0b50c5711e8156e25c3499427ac9222711..f50f6664b108f1afd8361ad149a9f8c99ca1f20e 100644 --- a/torrent-piece-request-order.go +++ b/torrent-piece-request-order.go @@ -20,6 +20,7 @@ key := t.pieceRequestOrderKey(pieceIndex) if t.hasStorageCap() { return pro.pieces.Update(key, t.requestStrategyPieceOrderState(pieceIndex)) } + // TODO: This might eject a piece that could count toward being unverified? pending := !t.ignorePieceForRequests(pieceIndex) if pending { newState := t.requestStrategyPieceOrderState(pieceIndex) diff --git a/torrent-stats.go b/torrent-stats.go index 999206078d87cc6b30ef2bde497251eaaa8db9a5..2f67ac29a0e3ee5d5001a20aca8b36523666cab2 100644 --- a/torrent-stats.go +++ b/torrent-stats.go @@ -7,11 +7,24 @@ // Due to ConnStats, may require special alignment on some platforms. See // https://github.com/anacrolix/torrent/issues/383. type TorrentStats struct { + AllConnStats + TorrentStatCounters + TorrentGauges +} + +type AllConnStats struct { // Aggregates stats over all connections past and present. Some values may not have much meaning // in the aggregate context. ConnStats - TorrentStatCounters - TorrentGauges + WebSeeds ConnStats + PeerConns ConnStats +} + +func (me *AllConnStats) Copy() (ret AllConnStats) { + ret.ConnStats = me.ConnStats.Copy() + ret.WebSeeds = me.WebSeeds.Copy() + ret.PeerConns = me.PeerConns.Copy() + return } // Instantaneous metrics in Torrents, and aggregated for Clients. diff --git a/torrent.go b/torrent.go index bbbbf63459029b3eaf8176696df567dc2c92aac3..53c87c498314582de506e9a3d9d273fbbe0e39e8 100644 --- a/torrent.go +++ b/torrent.go @@ -21,6 +21,7 @@ "text/tabwriter" "time" "unique" "unsafe" + "weak" "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" @@ -66,7 +67,7 @@ // available, see .Info and .GotInfo. type Torrent struct { // Torrent-level aggregate statistics. First in struct to ensure 64-bit // alignment. See #262. - connStats ConnStats + connStats AllConnStats counters TorrentStatCounters cl *Client @@ -179,14 +180,14 @@ activePieceHashes int connsWithAllPieces map[*Peer]struct{} - // Last active request for each chunks. TODO: Change to PeerConn specific? + // Last active PeerConn request for each chunk. requestState map[RequestIndex]requestState // Chunks we've written to since the corresponding piece was last checked. dirtyChunks typedRoaring.Bitmap[RequestIndex] pex pexState - // Is On when all pieces are complete. + // Is On when all pieces are complete, no hashing is pending or occurring. complete chansync.Flag // Torrent sources in use keyed by the source string. string -> error. If the slot is occupied @@ -246,7 +247,7 @@ t.updatePieceRequestOrderPiece(i) } func (t *Torrent) incPieceAvailability(i pieceIndex) { - // If we don't the info, this should be reconciled when we do. + // If we don't have the info, this should be reconciled when we do. if t.haveInfo() { p := t.piece(i) p.relativeAvailability++ @@ -327,7 +328,7 @@ if t.ignoreUnverifiedPieceCompletion && p.numVerifies == 0 { return } if t.storage == nil { - return storage.Completion{Complete: false, Ok: true} + return storage.Completion{Complete: false, Ok: false} } return p.Storage().Completion() } @@ -557,7 +558,7 @@ } func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey { return request_strategy.PieceRequestOrderKey{ - InfoHash: *t.canonicalShortInfohash(), + InfoHash: unique.Make(*t.canonicalShortInfohash()), Index: i, } } @@ -576,7 +577,7 @@ panic(p.relativeAvailability) } p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i) t.addRequestOrderPiece(i) - t.updatePieceCompletion(i) + t.setInitialPieceCompletionFromStorage(i) t.queueInitialPieceCheck(i) } t.cl.event.Broadcast() @@ -656,6 +657,18 @@ return } t.onSetInfo() return nil +} + +// Used in tests. +func (t *Torrent) setInfoUnlocked(info *metainfo.Info) (err error) { + t.cl.lock() + defer t.cl.unlock() + err = t.setInfo(info) + if err != nil { + return + } + t.onSetInfo() + return } func (t *Torrent) haveAllMetadataPieces() bool { @@ -853,6 +866,14 @@ }()) } fmt.Fprintln(w) } + // Note this might be shared with other torrents. + fmt.Fprintf(w, "Piece request order length: %v\n", func() any { + pro := t.getPieceRequestOrder() + if pro == nil { + return nil + } + return pro.Len() + }()) fmt.Fprintf(w, "Piece length: %s\n", func() string { if t.haveInfo() { @@ -990,7 +1011,7 @@ }(), UrlList: func() []string { ret := make([]string, 0, len(t.webSeeds)) for url := range t.webSeeds { - ret = append(ret, string(url)) + ret = append(ret, url.Value()) } return ret }(), @@ -1057,11 +1078,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) { return pieceIndex(t._completedPieces.GetCardinality()) } -func (t *Torrent) close(wg *sync.WaitGroup) (err error) { - if !t.closed.Set() { - err = errors.New("already closed") - return - } +func (t *Torrent) close(wg *sync.WaitGroup) { + // Should only be called from the Client. + panicif.False(t.closed.Set()) + t.eachShortInfohash(func(short [20]byte) { + delete(t.cl.torrentsByShortHash, short) + }) t.closedCtxCancel(errTorrentClosed) t.getInfoCtxCancel(errTorrentClosed) for _, f := range t.onClose { @@ -1092,6 +1114,14 @@ t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() t.updateWantPeersEvent() + g.MustDelete(t.cl.torrents, t) + // This doesn't work yet because requests remove themselves after they close, and we don't + // remove them synchronously. + if false { + if len(t.cl.torrents) == 0 { + panicif.NotZero(len(t.cl.activeWebseedRequests)) + } + } return } @@ -1148,7 +1178,7 @@ return } func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType { - return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) + return chunkIndexType(intCeilDiv(t.pieceLength(piece), t.chunkSize)) } func (t *Torrent) chunksPerRegularPiece() chunkIndexType { @@ -1202,7 +1232,8 @@ } func (t *Torrent) hashPiece(piece pieceIndex) ( correct bool, - // These are peers that sent us blocks that differ from what we hash here. + // These are peers that sent us blocks that differ from what we hash here. TODO: Track Peer not + // bannable addr for peer types that are rebuked differently. differingPeers map[bannableAddr]struct{}, err error, ) { @@ -1250,8 +1281,8 @@ h := merkle.NewHash() differingPeers, err = t.hashPieceWithSpecificHash(piece, h) var sum [32]byte // What about the final piece in a torrent? From BEP 52: "The layer is chosen so that one - // hash covers piece length bytes.". Note that if a piece doesn't have a hash in piece - // layers it's because it's not larger than the piece length. + // hash covers piece length bytes". Note that if a piece doesn't have a hash in piece layers + // it's because it's not larger than the piece length. sumExactly(sum[:], func(b []byte) []byte { return h.SumMinLength(b, int(t.info.PieceLength)) }) @@ -1300,9 +1331,6 @@ p := t.piece(piece) storagePiece := p.Storage() var written int64 written, err = storagePiece.WriteTo(w) - if err == nil && written != int64(p.length()) { - err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length()) - } t.countBytesHashed(written) return } @@ -1542,13 +1570,16 @@ return } func (t *Torrent) updatePiecePriority(piece pieceIndex, reason updateRequestReason) { - t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason) + //t.logger.Slogger().Debug("updatePiecePriority", "piece", piece, "reason", reason) if t.updatePiecePriorityNoRequests(piece) && !t.disableTriggers { t.updatePeerRequestsForPiece(piece, reason) } } func (t *Torrent) updateAllPiecePriorities(reason updateRequestReason) { + if !t.haveInfo() { + return + } t.updatePiecePriorities(0, t.numPieces(), reason) } @@ -1695,38 +1726,72 @@ } return } -// Pulls piece completion state from storage and performs any state updates if it changes. -func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { - p := t.piece(piece) +func (t *Torrent) setPieceCompletion(piece pieceIndex, uncached g.Option[bool]) { + changed := t.setCachedPieceCompletion(piece, uncached) + t.afterSetPieceCompletion(piece, changed) +} + +func (t *Torrent) setPieceCompletionFromStorage(piece pieceIndex) bool { + changed := t.setCachedPieceCompletionFromStorage(piece) + t.afterSetPieceCompletion(piece, changed) + return changed +} + +func (t *Torrent) setInitialPieceCompletionFromStorage(piece pieceIndex) { + t.setCachedPieceCompletionFromStorage(piece) + t.afterSetPieceCompletion(piece, true) +} + +// Sets the cached piece completion directly from storage. +func (t *Torrent) setCachedPieceCompletionFromStorage(piece pieceIndex) bool { uncached := t.pieceCompleteUncached(piece) - // This isn't being handled. Here we should probably be storing Option[bool] for completion and - // filtering out errors. Also, errors should probably disable downloading here too. - panicif.Err(uncached.Err) + if uncached.Err != nil { + t.slogger().Error("error getting piece completion", "err", uncached.Err) + t.disallowDataDownloadLocked() + } + return t.setCachedPieceCompletion(piece, g.OptionFromTuple(uncached.Complete, uncached.Ok)) +} + +// Returns true if the value was changed. +func (t *Torrent) setCachedPieceCompletion(piece int, uncached g.Option[bool]) bool { + p := t.piece(piece) + // TODO: Here we should probably be storing Option[bool] for completion and filtering out + // errors. cached := p.completion() - changed := cached != uncached - complete := uncached.Ok && uncached.Complete + cachedOpt := g.OptionFromTuple(cached.Complete, cached.Ok) + changed := cachedOpt != uncached p.storageCompletionOk = uncached.Ok x := uint32(piece) - if complete { + if uncached.Ok && uncached.Value { t._completedPieces.Add(x) - t.openNewConns() } else { t._completedPieces.Remove(x) } + return changed + +} + +// Pulls piece completion state from storage and performs any state updates if it changes. +func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { + return t.setPieceCompletionFromStorage(piece) +} + +// Pulls piece completion state from storage and performs any state updates if it changes. +func (t *Torrent) afterSetPieceCompletion(piece pieceIndex, changed bool) { + p := t.piece(piece) + cmpl := p.completion() + complete := cmpl.Ok && cmpl.Complete + if complete { + t.openNewConns() + } p.t.updatePieceRequestOrderPiece(piece) - t.updateComplete() + t.deferUpdateComplete() if complete && len(p.dirtiers) != 0 { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } if changed { - //slog.Debug( - // "piece completion changed", - // slog.Int("piece", piece), - // slog.Any("from", cached), - // slog.Any("to", uncached)) t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion") } - return changed } // Non-blocking read. Client lock is not required. @@ -1863,6 +1928,9 @@ } } torrent.Add("deleted connections", 1) c.deleteAllRequests("Torrent.deletePeerConn") + if len(t.conns) == 0 { + panicif.NotZero(len(t.requestState)) + } t.assertPendingRequests() if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { panic(t.connsWithAllPieces) @@ -2318,7 +2386,7 @@ return } func (t *Torrent) statsLocked() (ret TorrentStats) { - ret.ConnStats = copyCountFields(&t.connStats) + ret.AllConnStats = t.connStats.Copy() ret.TorrentStatCounters = copyCountFields(&t.counters) ret.TorrentGauges = t.gauges() return @@ -2342,23 +2410,6 @@ t.peers.Each(func(peer PeerInfo) { peers[peer.Addr.String()] = struct{}{} }) return len(peers) -} - -// Reconcile bytes transferred before connection was associated with a -// torrent. -func (t *Torrent) reconcileHandshakeStats(c *Peer) { - if c._stats != (ConnStats{ - // Handshakes should only increment these fields: - BytesWritten: c._stats.BytesWritten, - BytesRead: c._stats.BytesRead, - }) { - panic("bad stats") - } - c.postHandshakeStats(func(cs *ConnStats) { - cs.BytesRead.Add(c._stats.BytesRead.Int64()) - cs.BytesWritten.Add(c._stats.BytesWritten.Int64()) - }) - c.reconciledHandshakeStats = true } // Returns true if the connection is added. @@ -2419,20 +2470,14 @@ } if t.closed.IsSet() { return false } - if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { - return false + if t.needData() { + return true } - return true + return t.seeding() && t.haveAnyPieces() } func (t *Torrent) wantAnyConns() bool { - if !t.networkingEnabled.Bool() { - return false - } - if t.closed.IsSet() { - return false - } - if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { + if !t.newConnsAllowed() { return false } return len(t.conns) < t.maxEstablishedConns @@ -2443,6 +2488,7 @@ if !t.newConnsAllowed() { return false } if len(t.conns) < t.maxEstablishedConns { + // Shortcut: We can take any connection direction right now. return true } numIncomingConns := len(t.conns) - t.numOutgoingConns() @@ -2457,6 +2503,7 @@ if !t.newConnsAllowed() { return false } if len(t.conns) < t.maxEstablishedConns { + // Shortcut: We can take any connection direction right now. return true } numIncomingConns := len(t.conns) - t.numOutgoingConns() @@ -2486,9 +2533,6 @@ return oldMax } func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { - t.logger.LazyLog(log.Debug, func() log.Msg { - return log.Fstr("hashed piece %d (passed=%t)", piece, passed) - }) p := t.piece(piece) p.numVerifies++ p.numVerifiesCond.Broadcast() @@ -2517,18 +2561,17 @@ t.deferPublishPieceStateChange(piece) }() if passed { - if len(p.dirtiers) != 0 { - // Don't increment stats above connection-level for every involved connection. - t.allStats((*ConnStats).incrementPiecesDirtiedGood) - } - for c := range p.dirtiers { - c._stats.incrementPiecesDirtiedGood() - } + t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedGood) t.clearPieceTouchers(piece) hasDirty := p.hasDirtyChunks() t.cl.unlock() if hasDirty { - p.Flush() // You can be synchronous here! + // This could return fs.ErrNotExist, and that would be unexpected since we haven't + // marked it complete yet, and nobody should have moved it. + err := p.Flush() // You can be synchronous here! + if err != nil { + t.slogger().Warn("error flushing piece storage", "piece", piece, "err", err) + } } p.race++ err := p.Storage().MarkComplete() @@ -2541,17 +2584,12 @@ if t.closed.IsSet() { return } t.pendAllChunkSpecs(piece) + t.setPieceCompletion(piece, g.Some(true)) } else { if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil { // Peers contributed to all the data for this piece hash failure, and the failure was // not due to errors in the storage (such as data being dropped in a cache). - - // Increment Torrent and above stats, and then specific connections. - t.allStats((*ConnStats).incrementPiecesDirtiedBad) - for c := range p.dirtiers { - // Y u do dis peer?! - c.stats().incrementPiecesDirtiedBad() - } + t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedBad) bannableTouchers := make([]*Peer, 0, len(p.dirtiers)) for c := range p.dirtiers { @@ -2588,7 +2626,7 @@ t.slogger().Info( "piece failed hash. banning peer", "piece", piece, "peer", c) - c.ban() + c.providedBadData() // TODO: Check if we now have no available peers for pieces we want. } } @@ -2605,10 +2643,12 @@ t.cl.lock() if t.closed.IsSet() { return } - t.onIncompletePiece(piece) + // Set it directly without querying storage again. It makes no difference if the lock is + // held since it can be clobbered right after again anyway. This comes after inCompletePiece + // because that's how it was before. + t.setPieceCompletion(p.index, g.Some(false)) } - t.updatePieceCompletion(piece) } func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { @@ -2634,7 +2674,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { if t.pieceAllDirty(piece) { t.pendAllChunkSpecs(piece) } - if !t.wantPieceIndex(piece) { + if t.ignorePieceForRequests(piece) { // t.logger.Printf("piece %d incomplete and unwanted", piece) return } @@ -2656,31 +2696,31 @@ } }) } +// Torrent piece hashers are sticky and will try to keep hashing pieces in the same Torrent to keep +// the storage hot. func (t *Torrent) startPieceHashers() error { if t.closed.IsSet() { return errTorrentClosed } - for t.startPieceHasher() { + for t.considerStartingHashers() { + if !t.startSinglePieceHasher() { + break + } } return nil } -func (t *Torrent) startPieceHasher() bool { - if t.storage == nil { - return false - } - if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent { +func (t *Torrent) startSinglePieceHasher() bool { + pi := t.getPieceToHash() + if !pi.Ok { return false } - pi := t.getPieceToHash() - if pi.Ok { - t.startHash(pi.Value) - go t.pieceHasher(pi.Value) - return true - } - return false + t.startHash(pi.Value) + go t.pieceHasher(pi.Value) + return true } +// Sticky to a Torrent. Might as well since that keeps the storage hot. func (t *Torrent) pieceHasher(initial pieceIndex) { t.finishHash(initial) for { @@ -2693,18 +2733,19 @@ t.startHash(pi) t.cl.unlock() t.finishHash(pi) } + t.cl.startPieceHashers() t.cl.unlock() } func (t *Torrent) startHash(pi pieceIndex) { p := t.piece(pi) t.piecesQueuedForHash.Remove(pi) - t.deferUpdateComplete() p.hashing = true t.deferPublishPieceStateChange(pi) - t.updatePiecePriority(pi, "Torrent.startPieceHasher") + t.updatePiecePriority(pi, "Torrent.startHash") t.storageLock.RLock() t.activePieceHashes++ + t.cl.activePieceHashers++ } func (t *Torrent) getPieceToHash() (_ g.Option[pieceIndex]) { @@ -2758,17 +2799,23 @@ } t.storageLock.RUnlock() t.cl.lock() if correct { - for peer := range failedPeers { - t.cl.banPeerIP(peer.AsSlice()) - t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index) + if len(failedPeers) > 0 { + for peer := range failedPeers { + t.cl.banPeerIP(peer.AsSlice()) + t.slogger().Info("smart banned peer", "peer", peer, "piece", index) + } + t.dropBannedPeers() } - t.dropBannedPeers() t.smartBanCache.ForgetBlockSeq(iterRange(t.pieceRequestIndexBegin(index), t.pieceRequestIndexBegin(index+1))) } p.hashing = false t.pieceHashed(index, correct, copyErr) t.updatePiecePriority(index, "Torrent.finishHash") t.activePieceHashes-- + if t.activePieceHashes == 0 { + t.deferUpdateComplete() + } + t.cl.activePieceHashers-- } // Return the connections that touched a piece, and clear the entries while doing it. @@ -2915,13 +2962,6 @@ return }()) } -// All stats that include this Torrent. Useful when we want to increment ConnStats but not for every -// connection. -func (t *Torrent) allStats(f func(*ConnStats)) { - f(&t.connStats) - f(&t.cl.connStats) -} - func (t *Torrent) hashingPiece(i pieceIndex) bool { return t.pieces[i].hashing } @@ -2971,7 +3011,12 @@ func (t *Torrent) AllowDataDownload() { t.cl.lock() defer t.cl.unlock() - t.dataDownloadDisallowed.Clear() + // Can't move this outside the lock because other users require it to be unchanged while the + // Client lock is held? + if !t.dataDownloadDisallowed.Clear() { + return + } + t.updateAllPiecePriorities("data download allowed") t.iterPeers(func(p *Peer) { p.onNeedUpdateRequests("allow data download") }) @@ -2981,6 +3026,9 @@ // Enables uploading data, if it was disabled. func (t *Torrent) AllowDataUpload() { t.cl.lock() defer t.cl.unlock() + if !t.dataUploadDisallowed { + return + } t.dataUploadDisallowed = false t.iterPeers(func(p *Peer) { p.onNeedUpdateRequests("allow data upload") @@ -3049,7 +3097,8 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { if t.cl.config.DisableWebseeds { return false } - if _, ok := t.webSeeds[webseedUrlKey(url)]; ok { + urlKey := webseedUrlKey(unique.Make(url)) + if _, ok := t.webSeeds[urlKey]; ok { return false } // I don't think Go http supports pipelining requests. However, we can have more ready to go @@ -3062,10 +3111,10 @@ // anything. const defaultMaxRequests = 2 ws := webseedPeer{ peer: Peer{ - t: t, - outgoing: true, - Network: "http", - reconciledHandshakeStats: true, + cl: t.cl, + t: t, + outgoing: true, + Network: "http", // TODO: Set ban prefix? RemoteAddr: remoteAddrFromUrl(url), callbacks: t.callbacks(), @@ -3077,16 +3126,15 @@ MaxRequests: defaultMaxRequests, ResponseBodyRateLimiter: t.cl.config.DownloadRateLimiter, }, hostKey: t.deriveWebSeedHostKey(url), + url: urlKey, } + ws.peer.initClosedCtx() for _, opt := range opts { opt(&ws.client) } - setRateLimiterBurstIfZero(ws.client.ResponseBodyRateLimiter, defaultDownloadRateLimiterBurst) + setDefaultDownloadRateLimiterBurstIfZero(ws.client.ResponseBodyRateLimiter) ws.client.ResponseBodyWrapper = func(r io.Reader) io.Reader { - return &rateLimitedReader{ - l: ws.client.ResponseBodyRateLimiter, - r: r, - } + return newRateLimitedReader(r, ws.client.ResponseBodyRateLimiter) } g.MakeMapWithCap(&ws.activeRequests, ws.client.MaxRequests) ws.locker = t.cl.locker() @@ -3102,7 +3150,7 @@ ws.peer.peerImpl = &ws if t.haveInfo() { ws.onGotInfo(t.info) } - t.webSeeds[webseedUrlKey(url)] = &ws + t.webSeeds[urlKey] = &ws ws.peer.onNeedUpdateRequests("Torrent.addWebSeed") return true } @@ -3150,7 +3198,20 @@ } func (t *Torrent) updateComplete() { // TODO: Announce complete to trackers? - t.complete.SetBool(t.haveAllPieces()) + t.complete.SetBool(t.isComplete()) +} + +func (t *Torrent) isComplete() bool { + if t.activePieceHashes != 0 { + return false + } + if !t.piecesQueuedForHash.IsEmpty() { + return false + } + if !t.haveAllPieces() { + return false + } + return true } func (t *Torrent) cancelRequest(r RequestIndex) *PeerConn { @@ -3166,8 +3227,14 @@ } return p } -func (t *Torrent) requestingPeer(r RequestIndex) *PeerConn { - return t.requestState[r].peer +func (t *Torrent) requestingPeer(r RequestIndex) (ret *PeerConn) { + state, ok := t.requestState[r] + if !ok { + return nil + } + ret = state.peer.Value() + panicif.Nil(ret) + return } func (t *Torrent) addConnWithAllPieces(p *Peer) { @@ -3184,6 +3251,8 @@ return ok } func (t *Torrent) numActivePeers() int { + // TODO: Webseeds are "active" if they can serve any data. That means we need to track what + // pieces they're able to provide. return len(t.conns) + len(t.webSeeds) } @@ -3228,7 +3297,7 @@ return stats } type requestState struct { - peer *PeerConn + peer weak.Pointer[PeerConn] when time.Time } @@ -3526,7 +3595,7 @@ } func (t *Torrent) wantReceiveChunk(reqIndex RequestIndex) bool { pi := t.pieceIndexOfRequestIndex(reqIndex) - if !t.wantPieceIndex(pi) { + if t.ignorePieceForRequests(pi) { return false } if t.haveRequestIndexChunk(reqIndex) { @@ -3541,3 +3610,120 @@ return errTorrentClosed } return nil } + +func (t *Torrent) considerStartingHashers() bool { + if t.storage == nil { + return false + } + if t.activePieceHashes >= t.cl.config.PieceHashersPerTorrent { + return false + } + if !t.cl.canStartPieceHashers() { + return false + } + if t.piecesQueuedForHash.IsEmpty() { + return false + } + return true +} + +func (t *Torrent) getFile(fileIndex int) *File { + return (*t.files)[fileIndex] +} + +func (t *Torrent) fileMightBePartial(fileIndex int) bool { + f := t.getFile(fileIndex) + return t.piecesMightBePartial(f.BeginPieceIndex(), f.EndPieceIndex()) +} + +func (t *Torrent) expandPieceRangeToFullFiles(beginPieceIndex, endPieceIndex pieceIndex) (expandedBegin, expandedEnd pieceIndex) { + // Expand the piece range to include all pieces of the files in the original range. + firstFile := t.getFile(t.piece(beginPieceIndex).beginFile) + lastFile := t.getFile(t.piece(endPieceIndex-1).endFile - 1) + expandedBegin = firstFile.BeginPieceIndex() + expandedEnd = lastFile.EndPieceIndex() + return +} + +// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete. +func (t *Torrent) filesInPieceRangeMightBePartial(begin, end pieceIndex) bool { + begin, end = t.expandPieceRangeToFullFiles(begin, end) + return t.piecesMightBePartial(begin, end) +} + +// Pieces in the range [begin, end) may have partially complete files. Note we only check for dirty chunks and either all or no pieces being complete. +func (t *Torrent) filesInRequestRangeMightBePartial(beginRequest, endRequest RequestIndex) bool { + if beginRequest == endRequest { + return false + } + beginPiece := t.pieceIndexOfRequestIndex(beginRequest) + endPiece := pieceIndex(intCeilDiv(endRequest, t.chunksPerRegularPiece())) + return t.filesInPieceRangeMightBePartial(beginPiece, endPiece) +} + +// Pieces in the range [begin, end) are dirty, or in a mixed completion state. +func (t *Torrent) piecesMightBePartial(beginPieceIndex, endPieceIndex int) bool { + // Check for dirtied chunks. + if t.dirtyChunks.IntersectsWithInterval( + uint64(t.pieceRequestIndexBegin(beginPieceIndex)), + uint64(t.pieceRequestIndexBegin(endPieceIndex)), + ) { + // We have dirty chunks. Even if the file is complete, this could mean a partial file has + // been started. + return true + } + // Check for mixed completion. + var r roaring.Bitmap + r.AddRange(uint64(beginPieceIndex), uint64(endPieceIndex)) + switch t._completedPieces.AndCardinality(&r) { + case 0, uint64(endPieceIndex - beginPieceIndex): + // We have either no pieces or all pieces and no dirty chunks. + return false + default: + // We're somewhere in-between. + return true + } +} + +func (t *Torrent) hasActiveWebseedRequests() bool { + for _, p := range t.webSeeds { + for req := range p.activeRequests { + if !req.cancelled.Load() { + return true + } + } + } + return false +} + +// Increment pieces dirtied for conns and aggregate upstreams. +func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStats) bool) { + if len(p.dirtiers) == 0 { + // Avoid allocating map. + return + } + // 4 == 2 peerImpls (PeerConn and webseedPeer) and 1 base * one AllConnStats for each of Torrent + // and Client. + distinctUpstreamConnStats := make(map[*ConnStats]struct{}, 6) + for c := range p.dirtiers { + // Apply directly for each peer to avoid allocation. + inc(&c._stats) + // Collect distinct upstream connection stats. + count := 0 + for cs := range c.upstreamConnStats() { + distinctUpstreamConnStats[cs] = struct{}{} + count++ + } + // All dirtiers should have both Torrent and Client stats for both base and impl-ConnStats. + panicif.NotEq(count, 4) + } + // TODO: Have a debug assert/dev logging version of this. + panicif.GreaterThan(len(distinctUpstreamConnStats), 6) + maps.Keys(distinctUpstreamConnStats)(inc) +} + +// Maximum end request index for the torrent (one past the last). There might be other requests that +// don't make sense if padding files and v2 are in use. +func (t *Torrent) maxEndRequest() RequestIndex { + return RequestIndex(intCeilDiv(uint64(t.length()), t.chunkSize.Uint64())) +} diff --git a/torrent_test.go b/torrent_test.go index 6a3d7ca1f395971d096ea8c93c9fd5c253f47fcf..eacf5c44d702063af58fd66ca89fefcbab2fea59 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -7,13 +7,12 @@ "io" "net" "os" "path/filepath" - "sync" "testing" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" - qt "github.com/go-quicktest/qt" + "github.com/go-quicktest/qt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -227,7 +226,7 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) { var err error cl := newTestingClient(t) - mi, info := testutil.Greeting.Generate(5) + mi, _ := testutil.Greeting.Generate(5) tt := cl.newTorrentOpt(AddTorrentOpts{InfoHash: mi.HashInfoBytes()}) tt.setChunkSize(2) g.MakeMapIfNil(&tt.conns) @@ -236,15 +235,17 @@ pc.t = tt pc.legacyPeerImpl = &pc pc.initRequestState() g.InitNew(&pc.callbacks) + tt.cl.lock() tt.conns[&pc] = struct{}{} err = pc.peerSentHave(0) + tt.cl.unlock() qt.Assert(t, qt.IsNil(err)) - err = tt.setInfo(&info) + err = tt.SetInfoBytes(mi.InfoBytes) qt.Assert(t, qt.IsNil(err)) - tt.onSetInfo() + tt.cl.lock() err = pc.peerSentHaveNone() + tt.cl.unlock() qt.Assert(t, qt.IsNil(err)) - var wg sync.WaitGroup - tt.close(&wg) + tt.Drop() tt.assertAllPiecesRelativeAvailabilityZero() } diff --git a/webseed-peer.go b/webseed-peer.go index 35fd8ffef86a251634b3f8612efeb26627340661..a8f3d2eab225c4e150d3edbc33adc78c6c6ef92f 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -5,9 +5,9 @@ "context" "errors" "fmt" "io" - "iter" "log/slog" "math/rand" + "runtime/pprof" "strings" "sync" "time" @@ -24,19 +24,41 @@ ) type webseedPeer struct { // First field for stats alignment. - peer Peer - client webseed.Client - activeRequests map[*webseedRequest]struct{} - locker sync.Locker - lastUnhandledErr time.Time - hostKey webseedHostKeyHandle + peer Peer + logger *slog.Logger + client webseed.Client + activeRequests map[*webseedRequest]struct{} + locker sync.Locker + hostKey webseedHostKeyHandle + // We need this to look ourselves up in the Client.activeWebseedRequests map. + url webseedUrlKey + + // When requests are allowed to resume. If Zero, then anytime. + penanceComplete time.Time + lastCrime error +} + +func (me *webseedPeer) suspended() bool { + return me.lastCrime != nil && time.Now().Before(me.penanceComplete) +} + +func (me *webseedPeer) convict(err error, term time.Duration) { + if me.suspended() { + return + } + me.lastCrime = err + me.penanceComplete = time.Now().Add(term) +} + +func (*webseedPeer) allConnStatsImplField(stats *AllConnStats) *ConnStats { + return &stats.WebSeeds } func (me *webseedPeer) cancelAllRequests() { // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we // Close requests instead? for req := range me.activeRequests { - req.Cancel() + req.Cancel("all requests cancelled") } } @@ -48,7 +70,11 @@ return false } // Webseed requests are issued globally so per-connection reasons or handling make no sense. -func (me *webseedPeer) onNeedUpdateRequests(updateRequestReason) {} +func (me *webseedPeer) onNeedUpdateRequests(reason updateRequestReason) { + // Too many reasons here: Can't predictably determine when we need to rerun updates. + // TODO: Can trigger this when we have Client-level active-requests map. + //me.peer.cl.scheduleImmediateWebseedRequestUpdate(reason) +} func (me *webseedPeer) expectingChunks() bool { return len(me.activeRequests) > 0 @@ -58,11 +84,6 @@ func (me *webseedPeer) checkReceivedChunk(RequestIndex, *pp.Message, Request) (bool, error) { return true, nil } -func (me *webseedPeer) numRequests() int { - // What about unassigned requests? TODO: Don't allow those. - return len(me.activeRequests) -} - func (me *webseedPeer) lastWriteUploadRate() float64 { // We never upload to webseeds. return 0 @@ -73,7 +94,12 @@ func (me *webseedPeer) peerImplStatusLines() []string { lines := []string{ me.client.Url, - fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } + if me.lastCrime != nil { + lines = append(lines, fmt.Sprintf("last crime: %v", me.lastCrime)) + } + if me.suspended() { + lines = append(lines, fmt.Sprintf("suspended for %v more", time.Until(me.penanceComplete))) } if len(me.activeRequests) > 0 { elems := make([]string, 0, len(me.activeRequests)) @@ -102,18 +128,6 @@ // Webseeds check the next request is wanted before reading it. func (ws *webseedPeer) handleCancel(RequestIndex) {} -func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] { - return func(yield func(*webseedRequest) bool) { - for wr := range ws.activeRequests { - if r >= wr.next && r < wr.end { - if !yield(wr) { - return - } - } - } - } -} - func (ws *webseedPeer) requestIndexTorrentOffset(r RequestIndex) int64 { return ws.peer.t.requestIndexBegin(r) } @@ -125,9 +139,10 @@ endOff := t.requestIndexEnd(end - 1) return webseed.RequestSpec{start, endOff - start} } -func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) { - extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end)) +func (ws *webseedPeer) spawnRequest(begin, end RequestIndex, logger *slog.Logger) { + extWsReq := ws.client.StartNewRequest(ws.peer.closedCtx, ws.intoSpec(begin, end), logger) wsReq := webseedRequest{ + logger: logger, request: extWsReq, begin: begin, next: begin, @@ -135,11 +150,15 @@ end: end, } if ws.hasOverlappingRequests(begin, end) { if webseed.PrintDebug { - fmt.Printf("webseedPeer.spawnRequest: overlapping request for %v[%v-%v)\n", ws.peer.t.name(), begin, end) + logger.Warn("webseedPeer.spawnRequest: request overlaps existing") } ws.peer.t.cl.dumpCurrentWebseedRequests() } ws.activeRequests[&wsReq] = struct{}{} + t := ws.peer.t + cl := t.cl + g.MakeMapIfNil(&cl.activeWebseedRequests) + g.MapMustAssignNew(cl.activeWebseedRequests, ws.getRequestKey(&wsReq), &wsReq) ws.peer.updateExpectingChunks() panicif.Zero(ws.hostKey) ws.peer.t.cl.numWebSeedRequests[ws.hostKey]++ @@ -149,7 +168,20 @@ "begin", begin, "end", end, "len", end-begin, ) - go ws.runRequest(&wsReq) + go func() { + // Detach cost association from webseed update requests routine. + pprof.SetGoroutineLabels(context.Background()) + ws.runRequest(&wsReq) + }() +} + +func (me *webseedPeer) getRequestKey(wr *webseedRequest) webseedUniqueRequestKey { + // This is used to find the request in the Client's active requests map. + return webseedUniqueRequestKey{ + url: me.url, + t: me.peer.t, + sliceIndex: me.peer.t.requestIndexToWebseedSliceIndex(wr.begin), + } } func (me *webseedPeer) hasOverlappingRequests(begin, end RequestIndex) bool { @@ -164,8 +196,11 @@ } return false } -func readChunksErrorLevel(err error, req *webseedRequest) slog.Level { +func (ws *webseedPeer) readChunksErrorLevel(err error, req *webseedRequest) slog.Level { if req.cancelled.Load() { + return slog.LevelDebug + } + if ws.peer.closedCtx.Err() != nil { return slog.LevelDebug } var h2e http2.GoAwayError @@ -191,7 +226,7 @@ } // Ensure the body reader and response are closed. webseedRequest.Close() if err != nil { - level := readChunksErrorLevel(err, webseedRequest) + level := ws.readChunksErrorLevel(err, webseedRequest) ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err) torrent.Add("webseed request error count", 1) // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any @@ -206,26 +241,23 @@ ws.slogger().Debug("webseed request ended") locker.Lock() // Delete this entry after waiting above on an error, to prevent more requests. ws.deleteActiveRequest(webseedRequest) - if err != nil { - ws.peer.onNeedUpdateRequests("webseedPeer request errored") + cl := ws.peer.cl + if err == nil && cl.numWebSeedRequests[ws.hostKey] == webseedHostRequestConcurrency/2 { + cl.updateWebseedRequestsWithReason("webseedPeer.runRequest low water") + } else if cl.numWebSeedRequests[ws.hostKey] == 0 { + cl.updateWebseedRequestsWithReason("webseedPeer.runRequest zero requests") } - ws.peer.t.cl.updateWebseedRequestsWithReason("webseedPeer request completed") locker.Unlock() } func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) { g.MustDelete(ws.activeRequests, wr) - ws.peer.t.cl.numWebSeedRequests[ws.hostKey]-- + cl := ws.peer.cl + cl.numWebSeedRequests[ws.hostKey]-- + g.MustDelete(cl.activeWebseedRequests, ws.getRequestKey(wr)) ws.peer.updateExpectingChunks() } -func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool { - for range ws.activeRequestsForIndex(index) { - return false - } - return true -} - func (ws *webseedPeer) connectionFlags() string { return "WS" } @@ -233,8 +265,8 @@ // Maybe this should drop all existing connections, or something like that. func (ws *webseedPeer) drop() {} -func (cn *webseedPeer) ban() { - cn.peer.close() +func (cn *webseedPeer) providedBadData() { + cn.convict(errors.New("provided bad data"), time.Minute) } func (ws *webseedPeer) onClose() { @@ -254,7 +286,9 @@ func (ws *webseedPeer) maxChunkDiscard() RequestIndex { return RequestIndex(int(intCeilDiv(webseed.MaxDiscardBytes, ws.peer.t.chunkSize))) } -func (ws *webseedPeer) keepReading(wr *webseedRequest) bool { +func (ws *webseedPeer) wantedChunksInDiscardWindow(wr *webseedRequest) bool { + // Shouldn't call this if request is at the end already. + panicif.GreaterThanOrEqual(wr.next, wr.end) for ri := wr.next; ri < wr.end && ri <= wr.next+ws.maxChunkDiscard(); ri++ { if ws.wantChunk(ri) { return true @@ -277,13 +311,25 @@ buf = buf[:chunkLen] var n int n, err = io.ReadFull(wr.request.Body, buf) ws.peer.readBytes(int64(n)) + reqCtxErr := context.Cause(wr.request.Context()) + if errors.Is(err, reqCtxErr) { + err = reqCtxErr + } if webseed.PrintDebug && wr.cancelled.Load() { fmt.Printf("webseed read %v after cancellation: %v\n", n, err) } if err != nil { + // TODO: Pick out missing files or associate error with file. See also + // webseed.ReadRequestPartError. + var badResponse webseed.ErrBadResponse + if errors.As(err, &badResponse) { + ws.convict(badResponse, time.Minute) + } err = fmt.Errorf("reading chunk: %w", err) return } + // TODO: This happens outside Client lock, and stats can be written out of sync with each + // other. Why even bother with atomics? ws.peer.doChunkReadStats(int64(n)) // TODO: Clean up the parameters for receiveChunk. msg.Piece = buf @@ -297,8 +343,10 @@ wr.next++ err = ws.peer.receiveChunk(&msg) stop := err != nil || wr.next >= wr.end if !stop { - if !ws.keepReading(wr) { - wr.Cancel() + if !ws.wantedChunksInDiscardWindow(wr) { + // This cancels the stream, but we don't stop su--reading to make the most of the + // buffered body. + wr.Cancel("no wanted chunks in discard window") } } ws.peer.locker().Unlock() diff --git a/webseed-request.go b/webseed-request.go index 62e2f1eaf60c1fe5ea79c3982dab368aeacbe718..bf267fa3c69058b19f5bfe6a1c993f9f8bc017c6 100644 --- a/webseed-request.go +++ b/webseed-request.go @@ -2,6 +2,7 @@ package torrent import ( "fmt" + "log/slog" "sync/atomic" "github.com/anacrolix/torrent/webseed" @@ -11,6 +12,7 @@ // A wrapper around webseed.Request with extra state for webseedPeer. type webseedRequest struct { // Fingers out. request webseed.Request + logger *slog.Logger // First assigned in the range. begin RequestIndex // The next to be read. @@ -25,11 +27,11 @@ me.request.Close() } // Record that it was exceptionally cancelled. -func (me *webseedRequest) Cancel() { - me.request.Cancel() +func (me *webseedRequest) Cancel(cause string) { + me.request.Cancel(stringError(cause)) if !me.cancelled.Swap(true) { if webseed.PrintDebug { - fmt.Printf("cancelled webseed request\n") + me.logger.Debug("cancelled", "cause", cause) } } } diff --git a/webseed-requesting.go b/webseed-requesting.go index 71a1ec7079de481c285eac68c65b9d63b4d3bed6..81166b5ee9867b955c4d5a6d4577437f43cb7c42 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -1,31 +1,47 @@ package torrent import ( + "bytes" "cmp" + "context" "fmt" "iter" + "log/slog" "maps" + "os" + "runtime/pprof" "strings" "sync" + "time" "unique" g "github.com/anacrolix/generics" "github.com/anacrolix/generics/heap" "github.com/anacrolix/missinggo/v2/panicif" + "github.com/davecgh/go-spew/spew" "github.com/anacrolix/torrent/internal/request-strategy" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/webseed" ) -const defaultRequestsPerWebseedHost = 10 +// Default is based on experience with CloudFlare. +var webseedHostRequestConcurrency = initIntFromEnv("TORRENT_WEBSEED_HOST_REQUEST_CONCURRENCY", 25, 0) type ( webseedHostKey string webseedHostKeyHandle = unique.Handle[webseedHostKey] - webseedUrlKey string + webseedUrlKey unique.Handle[string] ) +func (me webseedUrlKey) Value() string { + return unique.Handle[string](me).Value() +} + +func (me webseedUrlKey) String() string { + return me.Value() +} + /* - Go through all the requestable pieces in order of priority, availability, whether there are peer requests, partial, infohash. - For each piece calculate files involved. Record each file not seen before and the piece index. @@ -33,19 +49,14 @@ - Cancel any outstanding requests that don't match a final file/piece-index pair. - Initiate missing requests that fit into the available limits. */ func (cl *Client) updateWebseedRequests() { - type aprioriMapValue struct { - // Change to request index? - startOffset int64 - webseedRequestOrderValue - } - aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue) + aprioriMap := make(map[webseedUniqueRequestKey]aprioriMapValue) for uniqueKey, value := range cl.iterPossibleWebseedRequests() { - cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey] + cur, ok := aprioriMap[uniqueKey] if ok { // Shared in the lookup above. t := uniqueKey.t - hasPeerConnRequest := func(offset int64) bool { - reqIndex := t.getRequestIndexContainingOffset(offset) + // TODO: Change to "slice has requests" + hasPeerConnRequest := func(reqIndex RequestIndex) bool { return t.requestingPeer(reqIndex) != nil } // Skip the webseed request unless it has a higher priority, is less requested by peer @@ -53,62 +64,97 @@ // conns, or has a lower start offset. Including peer conn requests here will bump // webseed requests in favour of peer conns unless there's nothing else to do. if cmp.Or( cmp.Compare(value.priority, cur.priority), - compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)), - cmp.Compare(cur.startOffset, uniqueKey.startOffset), + compareBool(hasPeerConnRequest(cur.startRequest), hasPeerConnRequest(value.startRequest)), + cmp.Compare(cur.startRequest, value.startRequest), ) <= 0 { continue } } - aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value} + aprioriMap[uniqueKey] = value + } + // This includes startRequest in the key. This means multiple webseed requests can exist in the + // same webseed request slice. + existingRequests := make(map[webseedUniqueRequestKey]webseedRequestOrderValue) + existingRequestCount := 0 + // TODO: Maintain a cache of active requests in the Client, and use them to filter proposed + // requests (same result but less allocations). + for key, value := range cl.iterCurrentWebseedRequests() { + existingRequests[key] = value + existingRequestCount++ } - existingRequests := maps.Collect(cl.iterCurrentWebseedRequests()) + // Check "active" current webseed request cardinality matches expectation. + panicif.NotEq(len(existingRequests), existingRequestCount) // We don't need the value but maybe cloning is just faster anyway? unusedExistingRequests := maps.Clone(existingRequests) type heapElem struct { webseedUniqueRequestKey webseedRequestOrderValue + // Not sure this is even worth it now. + mightHavePartialFiles bool } // Build the request heap, merging existing requests if they match. heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests)) for key, value := range aprioriMap { - fullKey := webseedUniqueRequestKey{key, value.startOffset} - heapValue := value.webseedRequestOrderValue - // If there's a matching existing request, make sure to include a reference to it in the - // heap value and deduplicate it. - existingValue, ok := existingRequests[fullKey] - if ok { - // Priorities should have been generated the same. - panicif.NotEq(value.priority, existingValue.priority) - // A-priori map should not have existing request associated with it. TODO: a-priori map - // value shouldn't need some fields. - panicif.NotZero(value.existingWebseedRequest) - heapValue.existingWebseedRequest = existingValue.existingWebseedRequest - // Now the values should match exactly. - panicif.NotEq(heapValue, existingValue) - g.MustDelete(unusedExistingRequests, fullKey) + if g.MapContains(existingRequests, key) { + // Prefer the existing request always + continue } heapSlice = append(heapSlice, heapElem{ - fullKey, - heapValue, + key, + webseedRequestOrderValue{ + aprioriMapValue: value, + }, + key.t.filesInRequestRangeMightBePartial( + value.startRequest, + key.t.endRequestForAlignedWebseedResponse(value.startRequest), + ), }) } // Add remaining existing requests. - for key := range unusedExistingRequests { - heapSlice = append(heapSlice, heapElem{key, existingRequests[key]}) + for key, value := range unusedExistingRequests { + // Don't reconsider existing requests that aren't wanted anymore. + if key.t.dataDownloadDisallowed.IsSet() { + continue + } + wr := value.existingWebseedRequest + heapSlice = append(heapSlice, heapElem{ + key, + existingRequests[key], + key.t.filesInRequestRangeMightBePartial(wr.next, wr.end), + }) } aprioriHeap := heap.InterfaceForSlice( &heapSlice, func(l heapElem, r heapElem) bool { - // Prefer the highest priority, then existing requests, then longest remaining file extent. - return cmp.Or( + // Not stable ordering but being sticky to existing webseeds should be enough. + ret := cmp.Or( + // Prefer highest priority -cmp.Compare(l.priority, r.priority), - // Existing requests are assigned the priority of the piece they're reading next. + // Then existing requests compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil), - // This won't thrash because we already preferred existing requests, so we'll finish out small extents. - -cmp.Compare( - l.t.Files()[l.fileIndex].length-l.startOffset, - r.t.Files()[r.fileIndex].length-r.startOffset), - ) < 0 + // Prefer not competing with active peer connections. + compareBool(len(l.t.conns) > 0, len(r.t.conns) > 0), + // Try to complete partial slices first. + -compareBool(l.mightHavePartialFiles, r.mightHavePartialFiles), + // No need to prefer longer files anymore now that we're using slices? + //// Longer files first. + //-cmp.Compare(l.longestFile().Unwrap(), r.longestFile().Unwrap()), + // Easier to debug than infohashes... + cmp.Compare(l.t.info.Name, r.t.info.Name), + bytes.Compare(l.t.canonicalShortInfohash()[:], r.t.canonicalShortInfohash()[:]), + // It's possible for 2 heap elements to have the same slice index from the same + // torrent, but they'll differ in existingWebseedRequest and be sorted before this. + // Doing earlier chunks first means more compact files for partial file hashing. + cmp.Compare(l.sliceIndex, r.sliceIndex), + ) + // Requests should be unique unless they're for different URLs. + if ret == 0 && l.url == r.url { + cfg := spew.NewDefaultConfig() + cfg.Dump(l) + cfg.Dump(r) + panic("webseed request heap ordering is not stable") + } + return ret < 0 }, ) @@ -123,22 +169,28 @@ // Pulling the pregenerated form avoids unique.Handle, and possible URL parsing and error // handling overhead. Need the value to avoid looking this up again. costKey := elem.costKey panicif.Zero(costKey) - if len(plan.byCost[costKey]) >= defaultRequestsPerWebseedHost { + if elem.existingWebseedRequest == nil { + // Existing requests might be within the allowed discard range. + panicif.Eq(elem.priority, PiecePriorityNone) + } + panicif.True(elem.t.dataDownloadDisallowed.IsSet()) + panicif.True(elem.t.closed.IsSet()) + if len(plan.byCost[costKey]) >= webseedHostRequestConcurrency { continue } g.MakeMapIfNil(&plan.byCost) requestKey := elem.webseedUniqueRequestKey - plan.byCost[costKey] = append(plan.byCost[costKey], requestKey) + plan.byCost[costKey] = append(plan.byCost[costKey], plannedWebseedRequest{ + url: elem.url, + t: elem.t, + startIndex: elem.startRequest, + }) delete(unwantedExistingRequests, requestKey) } // Cancel any existing requests that are no longer wanted. - for key, value := range unwantedExistingRequests { - if webseed.PrintDebug { - fmt.Printf("cancelling deprioritized existing webseed request %v\n", key) - } - key.t.slogger().Debug("cancelling deprioritized existing webseed request", "webseedUrl", key.url, "fileIndex", key.fileIndex) - value.existingWebseedRequest.Cancel() + for _, value := range unwantedExistingRequests { + value.existingWebseedRequest.Cancel("deprioritized") } printPlan := sync.OnceFunc(func() { @@ -148,57 +200,123 @@ //fmt.Println(formatMap(existingRequests)) } }) - for costKey, requestKeys := range plan.byCost { - for _, requestKey := range requestKeys { + // TODO: Do we deduplicate requests across different webseeds? + + for costKey, plannedRequests := range plan.byCost { + for _, request := range plannedRequests { // This could happen if a request is cancelled but hasn't removed itself from the active // list yet. This helps with backpressure as the requests can sleep to rate limit. if !cl.underWebSeedHttpRequestLimit(costKey) { break } - if g.MapContains(existingRequests, requestKey) { + existingRequestKey := request.toChunkedWebseedRequestKey() + if g.MapContains(existingRequests, existingRequestKey) { + // A request exists to the webseed slice already. This doesn't check the request + // indexes match. + + // Check we didn't just cancel the same request. + panicif.True(g.MapContains(unwantedExistingRequests, existingRequestKey)) continue } - t := requestKey.t - peer := t.webSeeds[requestKey.url] + t := request.t + peer := t.webSeeds[request.url] panicif.NotEq(peer.hostKey, costKey) printPlan() - begin := t.getRequestIndexContainingOffset(requestKey.startOffset) - fileEnd := t.endRequestIndexForFileIndex(requestKey.fileIndex) - last := begin - for { - if !t.wantReceiveChunk(last) { - break - } - if last >= fileEnd-1 { - break - } - last++ - } - // Request shouldn't exist if this occurs. - panicif.LessThan(last, begin) - // Hello C++ my old friend. - end := last + 1 - if webseed.PrintDebug && end != fileEnd { - fmt.Printf("shortened webseed request for %v: [%v-%v) to [%v-%v)\n", - requestKey.filePath(), begin, fileEnd, begin, end) - } - panicif.GreaterThan(end, fileEnd) - peer.spawnRequest(begin, end) + + debugLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + AddSource: true, + })).With( + "webseedUrl", request.url, + "webseedChunkIndex", request.sliceIndex) + + begin := request.startIndex + end := t.getWebseedRequestEnd(begin, debugLogger) + panicif.LessThanOrEqual(end, begin) + + peer.spawnRequest(begin, end, debugLogger) } } } +func (t *Torrent) getWebseedRequestEnd(begin RequestIndex, debugLogger *slog.Logger) RequestIndex { + chunkEnd := t.endRequestForAlignedWebseedResponse(begin) + if true { + // Pending fix to pendingPieces matching piece request order due to missing initial pieces + // checks? + return chunkEnd + } + panicif.False(t.wantReceiveChunk(begin)) + last := begin + for { + if !t.wantReceiveChunk(last) { + break + } + if last >= chunkEnd-1 { + break + } + last++ + } + end := last + 1 + panicif.GreaterThan(end, chunkEnd) + if webseed.PrintDebug && end != chunkEnd { + debugLogger.Debug( + "shortened webseed request", + "from", endExclusiveString(begin, chunkEnd), + "to", endExclusiveString(begin, end)) + } + return end +} + +// Cloudflare caches up to 512 MB responses by default. This is also an alignment. Making this +// smaller will allow requests to complete a smaller set of files faster. +var webseedRequestChunkSize = initUIntFromEnv[uint64]("TORRENT_WEBSEED_REQUEST_CHUNK_SIZE", 64<<20, 64) + +// Can return the same as start if the request is at the end of the torrent. +func (t *Torrent) endRequestForAlignedWebseedResponse(start RequestIndex) RequestIndex { + end := min(t.maxEndRequest(), nextMultiple(start, t.chunksPerAlignedWebseedResponse())) + return end +} + +func (t *Torrent) chunksPerAlignedWebseedResponse() RequestIndex { + // This is the same as webseedRequestChunkSize, but in terms of RequestIndex. + return RequestIndex(webseedRequestChunkSize / t.chunkSize.Uint64()) +} + +func (t *Torrent) requestIndexToWebseedSliceIndex(requestIndex RequestIndex) webseedSliceIndex { + return webseedSliceIndex(requestIndex / t.chunksPerAlignedWebseedResponse()) +} + func (cl *Client) dumpCurrentWebseedRequests() { if webseed.PrintDebug { fmt.Println("current webseed requests:") for key, value := range cl.iterCurrentWebseedRequests() { - fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority) + fmt.Printf("\t%v: %v, priority %v\n", key, value.existingWebseedRequest, value.priority) } } } type webseedRequestPlan struct { - byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey + byCost map[webseedHostKeyHandle][]plannedWebseedRequest +} + +// Needed components to generate a webseed request. +type plannedWebseedRequest struct { + url webseedUrlKey + t *Torrent + startIndex RequestIndex +} + +func (me *plannedWebseedRequest) sliceIndex() webseedSliceIndex { + return me.t.requestIndexToWebseedSliceIndex(me.startIndex) +} + +func (me *plannedWebseedRequest) toChunkedWebseedRequestKey() webseedUniqueRequestKey { + return webseedUniqueRequestKey{ + url: me.url, + t: me.t, + sliceIndex: me.sliceIndex(), + } } func (me webseedRequestPlan) String() string { @@ -213,38 +331,28 @@ return strings.TrimSuffix(sb.String(), "\n") } // Distinct webseed request data when different offsets are not allowed. -type aprioriWebseedRequestKey struct { - t *Torrent - fileIndex int - url webseedUrlKey +type webseedUniqueRequestKey struct { + url webseedUrlKey + t *Torrent + sliceIndex webseedSliceIndex } -func (me *aprioriWebseedRequestKey) filePath() string { - return me.t.Files()[me.fileIndex].Path() -} - -func (me *aprioriWebseedRequestKey) String() string { - return fmt.Sprintf("%v from %v", me.filePath(), me.url) -} - -// Distinct webseed request when different offsets to the same object are allowed. -type webseedUniqueRequestKey struct { - aprioriWebseedRequestKey - startOffset int64 +type aprioriMapValue struct { + costKey webseedHostKeyHandle + priority PiecePriority + startRequest RequestIndex } -func (me webseedUniqueRequestKey) String() string { - return me.aprioriWebseedRequestKey.String() + " at " + fmt.Sprintf("0x%x", me.startOffset) +func (me *webseedUniqueRequestKey) String() string { + return fmt.Sprintf("slice %v from %v", me.sliceIndex, me.url) } // Non-distinct proposed webseed request data. type webseedRequestOrderValue struct { - // The associated webseed request per host limit. - costKey webseedHostKeyHandle + aprioriMapValue // Used for cancellation if this is deprioritized. Also, a faster way to sort for existing // requests. existingWebseedRequest *webseedRequest - priority PiecePriority } func (me webseedRequestOrderValue) String() string { @@ -252,8 +360,8 @@ return fmt.Sprintf("%#v", me) } // Yields possible webseed requests by piece. Caller should filter and prioritize these. -func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { - return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { +func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, aprioriMapValue] { + return func(yield func(webseedUniqueRequestKey, aprioriMapValue) bool) { for key, value := range cl.pieceRequestOrder { input := key.getRequestStrategyInput(cl) requestStrategy.GetRequestablePieces( @@ -261,38 +369,43 @@ input, value.pieces, func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool { t := cl.torrentsByShortHash[ih] + if len(t.webSeeds) == 0 { + return true + } p := t.piece(pieceIndex) cleanOpt := p.firstCleanChunk() if !cleanOpt.Ok { - // Could almost return true here, as clearly something is going on with the piece. - return false + return true } // Pretty sure we want this and not the order state priority. That one is for // client piece request order and ignores other states like hashing, marking // etc. Order state priority would be faster otherwise. priority := p.effectivePriority() - for i, e := range p.fileExtents(int64(cleanOpt.Value) * int64(t.chunkSize)) { - for url, ws := range t.webSeeds { - // Return value from this function (RequestPieceFunc) doesn't terminate - // iteration, so propagate that to not handling the yield return value. - yield( - webseedUniqueRequestKey{ - aprioriWebseedRequestKey{ - t: t, - fileIndex: i, - url: url, - }, - e.Start, - }, - webseedRequestOrderValue{ - priority: priority, - costKey: ws.hostKey, - }, - ) + firstRequest := p.requestIndexBegin() + cleanOpt.Value + panicif.GreaterThanOrEqual(firstRequest, t.maxEndRequest()) + webseedSliceIndex := t.requestIndexToWebseedSliceIndex(firstRequest) + for url, ws := range t.webSeeds { + if ws.suspended() { + continue + } + // Return value from this function (RequestPieceFunc) doesn't terminate + // iteration, so propagate that to not handling the yield return value. + if !yield( + webseedUniqueRequestKey{ + t: t, + sliceIndex: webseedSliceIndex, + url: url, + }, + aprioriMapValue{ + priority: priority, + costKey: ws.hostKey, + startRequest: firstRequest, + }, + ) { + return false } } - // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen. - return false + return true }, ) } @@ -302,11 +415,12 @@ } func (cl *Client) updateWebseedRequestsWithReason(reason updateRequestReason) { // Should we wrap this with pprof labels? - cl.scheduleImmediateWebseedRequestUpdate() + cl.scheduleImmediateWebseedRequestUpdate(reason) } func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { + // TODO: This entire thing can be a single map on Client ("active webseed requests"). for t := range cl.torrents { for url, ws := range t.webSeeds { for ar := range ws.activeRequests { @@ -314,25 +428,29 @@ if ar.next >= ar.end { // This request is done, so don't yield it. continue } - off := t.requestIndexBegin(ar.next) - opt := t.fileSegmentsIndex.Unwrap().LocateOffset(off) - if !opt.Ok { - continue + // Don't spawn requests before old requests are cancelled. + if false { + if ar.cancelled.Load() { + cl.slogger.Debug("iter current webseed requests: skipped cancelled webseed request") + // This should prevent overlapping webseed requests that are just filling + // slots waiting to cancel from conflicting. + continue + } } - p := t.pieceForOffset(off) + p := t.piece(t.pieceIndexOfRequestIndex(ar.next)) if !yield( webseedUniqueRequestKey{ - aprioriWebseedRequestKey{ - t: t, - fileIndex: opt.Value.Index, - url: url, - }, - opt.Value.Offset, + t: t, + sliceIndex: t.requestIndexToWebseedSliceIndex(ar.next), + url: url, }, webseedRequestOrderValue{ - priority: p.effectivePriority(), - existingWebseedRequest: ar, - costKey: ws.hostKey, + aprioriMapValue{ + priority: p.effectivePriority(), + costKey: ws.hostKey, + startRequest: ar.next, + }, + ar, }, ) { return @@ -343,26 +461,58 @@ } } } -func (cl *Client) scheduleImmediateWebseedRequestUpdate() { +func (cl *Client) scheduleImmediateWebseedRequestUpdate(reason updateRequestReason) { if !cl.webseedRequestTimer.Stop() { // Timer function already running, let it do its thing. return } // Set the timer to fire right away (this will coalesce consecutive updates without forcing an // update on every call to this method). Since we're holding the Client lock, and we cancelled - // the timer and it wasn't active, nobody else should have reset it before us. + // the timer, and it wasn't active, nobody else should have reset it before us. Do we need to + // introduce a "reason" field here, (albeit Client-level?). + cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, reason) panicif.True(cl.webseedRequestTimer.Reset(0)) } func (cl *Client) updateWebseedRequestsTimerFunc() { + if cl.closed.IsSet() { + return + } + // This won't get set elsewhere if the timer has fired, which it has for us to be here. + cl.webseedUpdateReason = cmp.Or(cl.webseedUpdateReason, "timer") cl.lock() defer cl.unlock() cl.updateWebseedRequestsAndResetTimer() } func (cl *Client) updateWebseedRequestsAndResetTimer() { - cl.updateWebseedRequests() - // Timer should always be stopped before the last call. + pprof.Do(context.Background(), pprof.Labels( + "reason", string(cl.webseedUpdateReason), + ), func(_ context.Context) { + started := time.Now() + reason := cl.webseedUpdateReason + cl.webseedUpdateReason = "" + cl.updateWebseedRequests() + panicif.NotZero(cl.webseedUpdateReason) + if webseed.PrintDebug { + now := time.Now() + fmt.Printf("%v: updateWebseedRequests took %v (reason: %v)\n", now, now.Sub(started), reason) + } + }) + // Timer should always be stopped before the last call. TODO: Don't reset timer if there's + // nothing to do (no possible requests in update). panicif.True(cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval)) } + +type endExclusive[T any] struct { + start, end T +} + +func (me endExclusive[T]) String() string { + return fmt.Sprintf("[%v-%v)", me.start, me.end) +} + +func endExclusiveString[T any](start, end T) string { + return endExclusive[T]{start, end}.String() +} diff --git a/webseed/client.go b/webseed/client.go index a4d432747d38cb6e0a05e417b38d0f38b5018892..acbad8f20448d99e308c96d5b6e6bb2312f3cafa 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -5,13 +5,15 @@ "context" "errors" "fmt" "io" - "log" "log/slog" "net/http" "os" + "runtime/pprof" "strings" + "sync" "github.com/RoaringBitmap/roaring" + g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/panicif" "github.com/dustin/go-humanize" "golang.org/x/time/rate" @@ -36,25 +38,30 @@ type RequestSpec = segments.Extent type requestPart struct { - req *http.Request - e segments.Extent - do func() (*http.Response, error) - // Wrap http response bodies for such things as download rate limiting. - responseBodyWrapper ResponseBodyWrapper + req *http.Request + e segments.Extent + do func() (*http.Response, error) + fileIndex int } type Request struct { - cancel func() + // So you can view it from externally. + ctx context.Context + cancel context.CancelCauseFunc Body io.Reader // Closed with error to unstick copy routine when context isn't checked. bodyPipe *io.PipeReader } -func (r Request) Cancel() { - r.cancel() +func (r *Request) Context() context.Context { + return r.ctx } -func (r Request) Close() { +func (r *Request) Cancel(cause error) { + r.cancel(cause) +} + +func (r *Request) Close() { // We aren't cancelling because we want to know if we can keep receiving buffered data after // cancellation. PipeReader.Close always returns nil. _ = r.bodyPipe.Close() @@ -64,7 +71,7 @@ type Client struct { Logger *slog.Logger HttpClient *http.Client Url string - // Max concurrent requests to a WebSeed for a given torrent. + // Max concurrent requests to a WebSeed for a given torrent. TODO: Unused. MaxRequests int fileIndex *segments.Index @@ -73,7 +80,7 @@ // The pieces we can request with the Url. We're more likely to ban/block at the file-level // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece // level. We can map our file-level adjustments to the pieces here. This probably need to be // private in the future, if Client ever starts removing pieces. TODO: This belongs in - // webseedPeer. + // webseedPeer. TODO: Unused. Pieces roaring.Bitmap // This wraps http.Response bodies, for example to limit the download rate. ResponseBodyWrapper ResponseBodyWrapper @@ -94,56 +101,56 @@ me.info = info me.Pieces.AddRange(0, uint64(info.NumPieces())) } -type RequestResult struct { - Bytes []byte - Err error -} - // Returns the URL for the given file index. This is assumed to be globally unique. func (ws *Client) UrlForFileIndex(fileIndex int) string { return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper) } -func (ws *Client) StartNewRequest(r RequestSpec) Request { - ctx, cancel := context.WithCancel(context.TODO()) +func (ws *Client) StartNewRequest(ctx context.Context, r RequestSpec, debugLogger *slog.Logger) Request { + ctx, cancel := context.WithCancelCause(ctx) var requestParts []requestPart - if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool { + for i, e := range ws.fileIndex.LocateIter(r) { req, err := newRequest( ctx, ws.Url, i, ws.info, e.Start, e.Length, ws.PathEscaper, ) - if err != nil { - panic(err) - } + panicif.Err(err) part := requestPart{ - req: req, - e: e, - responseBodyWrapper: ws.ResponseBodyWrapper, + req: req, + e: e, + fileIndex: i, } - part.do = func() (*http.Response, error) { + part.do = func() (resp *http.Response, err error) { + resp, err = ws.HttpClient.Do(req) if PrintDebug { - fmt.Printf( - "doing request for %q (file size %v), Range: %q\n", - req.URL, - humanize.Bytes(uint64(ws.fileIndex.Index(i).Length)), - req.Header.Get("Range"), - ) + if err == nil { + debugLogger.Debug( + "request for part", + "url", req.URL, + "part-length", humanize.IBytes(uint64(e.Length)), + "part-file-offset", humanize.IBytes(uint64(e.Start)), + "CF-Cache-Status", resp.Header.Get("CF-Cache-Status"), + ) + } } - return ws.HttpClient.Do(req) + return } requestParts = append(requestParts, part) - return true - }) { - panic("request out of file bounds") } + // Technically what we want to ensure is that all parts exist consecutively. If the file data + // isn't consecutive, then it is piece aligned and we wouldn't need to be doing multiple + // requests. TODO: Assert this. + panicif.Zero(len(requestParts)) body, w := io.Pipe() req := Request{ + ctx: ctx, cancel: cancel, Body: body, bodyPipe: body, } go func() { + pprof.SetGoroutineLabels(context.Background()) err := ws.readRequestPartResponses(ctx, w, requestParts) panicif.Err(w.CloseWithError(err)) }() @@ -177,23 +184,29 @@ "url", part.req.URL) } } +var bufPool = &sync.Pool{New: func() any { + return g.PtrTo(make([]byte, 128<<10)) // 128 KiB. 4x the default. +}} + // Reads the part in full. All expected bytes must be returned or there will an error returned. func (me *Client) recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error { defer resp.Body.Close() var body io.Reader = resp.Body - if part.responseBodyWrapper != nil { - body = part.responseBodyWrapper(body) + if a := me.ResponseBodyWrapper; a != nil { + body = a(body) } - // Prevent further accidental use - resp.Body = nil + // We did set resp.Body to nil here, but I'm worried the HTTP machinery might do something + // funny. if ctx.Err() != nil { - return ctx.Err() + return context.Cause(ctx) } switch resp.StatusCode { case http.StatusPartialContent: // The response should be just as long as we requested. me.checkContentLength(resp, part, part.e.Length) - copied, err := io.Copy(w, body) + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + copied, err := io.CopyBuffer(w, body, *buf) if err != nil { return err } @@ -209,9 +222,14 @@ if discard > MaxDiscardBytes { return ErrBadResponse{"resp status ok but requested range", resp} } if discard != 0 { - log.Printf("resp status ok but requested range [url=%q, range=%q]", - part.req.URL, - part.req.Header.Get("Range")) + if PrintDebug { + fmt.Printf("resp status ok but requested range [url=%q, range=%q]", + part.req.URL, + part.req.Header.Get("Range")) + } + me.Logger.Debug("resp status ok but requested range", + "url", part.req.URL, + "range", part.req.Header.Get("Range")) } // Instead of discarding, we could try receiving all the chunks present in the response // body. I don't know how one would handle multiple chunk requests resulting in an OK @@ -239,15 +257,34 @@ } var ErrTooFast = errors.New("making requests too fast") +// Contains info for callers to act (like ignoring particular files or rate limiting). +type ReadRequestPartError struct { + FileIndex int + Err error +} + +func (me ReadRequestPartError) Unwrap() error { + return me.Err +} + +func (r ReadRequestPartError) Error() string { + return fmt.Sprintf("reading request part for file index %v: %v", r.FileIndex, r.Err) +} + func (me *Client) readRequestPartResponses(ctx context.Context, w io.Writer, parts []requestPart) (err error) { for _, part := range parts { var resp *http.Response resp, err = part.do() + // TODO: Does debugging caching belong here? if err == nil { err = me.recvPartResult(ctx, w, part, resp) } if err != nil { err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err) + err = ReadRequestPartError{ + FileIndex: part.fileIndex, + Err: err, + } break } } diff --git a/webseed/request.go b/webseed/request.go index ce8e0cd58894c3616d60fc7c752e06588fb082f1..ff679aceb4f3b17a177619a3b5898f427e1c3ea4 100644 --- a/webseed/request.go +++ b/webseed/request.go @@ -75,7 +75,7 @@ if err != nil { return nil, err } // We avoid Range requests if we can. We check the Content-Length elsewhere so that early - // detection is not lost. + // detection is not lost. TODO: Try disabling this for CloudFlare? if offset != 0 || length != fileInfo.Length { req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) } diff --git a/zero-reader.go b/zero-reader.go index 1d0a899b3afa1b01551edeb24d232b1108e2e1cc..92fc8059b253dcf0843701c15ddcae51aca2ddab 100644 --- a/zero-reader.go +++ b/zero-reader.go @@ -1,5 +1,6 @@ package torrent +// TODO: This should implement extra methods to make io.CopyN more efficient. var zeroReader zeroReaderType type zeroReaderType struct{}