From 87b3196ec2b7002276519b1aefb80ab37ee59ed1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 26 Jun 2025 22:51:05 +1000 Subject: [PATCH] Add global webseed requests on a timer --- client.go | 28 +--- cmp.go | 11 ++ dht.go | 8 + go.mod | 2 +- go.sum | 2 + internal/request-strategy/order.go | 3 +- piece.go | 19 ++- request-strategy-impls.go | 2 +- segments/index.go | 22 +++ t.go | 3 +- torrent.go | 24 ++- webseed-peer.go | 44 +----- webseed-requesting.go | 233 +++++++++++++++++++++++++++-- webseed/client.go | 5 + webseed/request.go | 19 ++- 15 files changed, 333 insertions(+), 92 deletions(-) create mode 100644 cmp.go diff --git a/client.go b/client.go index 74d5d7e1..18919d9e 100644 --- a/client.go +++ b/client.go @@ -33,7 +33,6 @@ import ( "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" "github.com/cespare/xxhash" - "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" gbtree "github.com/google/btree" "github.com/pion/webrtc/v4" @@ -52,6 +51,8 @@ import ( "github.com/anacrolix/torrent/webtorrent" ) +const webseedRequestUpdateTimerInterval = time.Second + // Clients contain zero or more Torrents. A Client manages a blocklist, the // TCP/UDP protocol ports, and DHT as desired. type Client struct { @@ -105,6 +106,8 @@ type Client struct { defaultLocalLtepProtocolMap LocalLtepProtocolMap upnpMappings []*upnpMapping + + webseedRequestTimer *time.Timer } type ipStr string @@ -142,24 +145,7 @@ func (cl *Client) LocalPort() (port int) { return } -func writeDhtServerStatus(w io.Writer, s DhtServer) { - dhtStats := s.Stats() - fmt.Fprintf(w, " ID: %x\n", s.ID()) - spew.Fdump(w, dhtStats) -} - -func compareBool(a, b bool) int { - if a == b { - return 0 - } - if b { - return -1 - } - return 1 -} - -// Writes out a human readable status of the client, such as for writing to a -// HTTP status page. +// Writes out a human-readable status of the client, such as for writing to an HTTP status page. func (cl *Client) WriteStatus(_w io.Writer) { cl.rLock() defer cl.rUnlock() @@ -402,6 +388,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, } + cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc) + return } @@ -1937,7 +1925,7 @@ func (cl *Client) Stats() ClientStats { func (cl *Client) underWebSeedHttpRequestLimit(key webseedHostKeyHandle) bool { panicif.Zero(key) - return cl.numWebSeedRequests[key] < 5 + return cl.numWebSeedRequests[key] < defaultRequestsPerWebseedHost } func (cl *Client) countWebSeedHttpRequests() (num int) { diff --git a/cmp.go b/cmp.go new file mode 100644 index 00000000..31cc2bf3 --- /dev/null +++ b/cmp.go @@ -0,0 +1,11 @@ +package torrent + +func compareBool(a, b bool) int { + if a == b { + return 0 + } + if b { + return -1 + } + return 1 +} diff --git a/dht.go b/dht.go index 77975a2f..e001bd66 100644 --- a/dht.go +++ b/dht.go @@ -1,12 +1,14 @@ package torrent import ( + "fmt" "io" "net" "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" peer_store "github.com/anacrolix/dht/v2/peer-store" + "github.com/davecgh/go-spew/spew" ) // DHT server interface for use by a Torrent or Client. It's reasonable for this to make assumptions @@ -60,3 +62,9 @@ func (me AnacrolixDhtServerWrapper) Ping(addr *net.UDPAddr) { } var _ DhtServer = AnacrolixDhtServerWrapper{} + +func writeDhtServerStatus(w io.Writer, s DhtServer) { + dhtStats := s.Stats() + fmt.Fprintf(w, " ID: %x\n", s.ID()) + spew.Fdump(w, dhtStats) +} diff --git a/go.mod b/go.mod index ea4c4c3c..38d4c9f7 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/anacrolix/gostdapp v0.1.0 github.com/anacrolix/log v0.16.1-0.20250526073428-5cb74e15092b github.com/anacrolix/missinggo v1.3.0 - github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797 + github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 github.com/anacrolix/multiless v0.4.0 github.com/anacrolix/possum/go v0.3.2 github.com/anacrolix/squirrel v0.6.4 diff --git a/go.sum b/go.sum index 73a83a0a..c9b7cd27 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbU github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA= github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797 h1:VAfIW3RwRBTZM7V6auEZC0eBPo94ht/R6ywrADNA0q8= github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0= +github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 h1:+VSnod9Zipey/E5mDTrhooV9y8A8ZaUHSzG/TnrIHug= +github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0= github.com/anacrolix/mmsg v1.0.1 h1:TxfpV7kX70m3f/O7ielL/2I3OFkMPjrRCPo7+4X5AWw= github.com/anacrolix/mmsg v1.0.1/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc= github.com/anacrolix/multiless v0.4.0 h1:lqSszHkliMsZd2hsyrDvHOw4AbYWa+ijQ66LzbjqWjM= diff --git a/internal/request-strategy/order.go b/internal/request-strategy/order.go index bffe6a7a..d3ede920 100644 --- a/internal/request-strategy/order.go +++ b/internal/request-strategy/order.go @@ -30,7 +30,8 @@ func pieceOrderLess(i, j *PieceRequestOrderItem) multiless.Computation { j.State.Partial, i.State.Partial, ).Int( // If this is done with relative availability, do we lose some determinism? If completeness - // is used, would that push this far enough down? + // is used, would that push this far enough down? What happens if we have a piece in the + // order, but it has availability 0? i.State.Availability, j.State.Availability, ).Int( i.Key.Index, j.Key.Index, diff --git a/piece.go b/piece.go index 37bb588b..b1982f0a 100644 --- a/piece.go +++ b/piece.go @@ -11,6 +11,7 @@ import ( g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/panicif" + "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/merkle" "github.com/anacrolix/torrent/metainfo" @@ -38,7 +39,8 @@ type Piece struct { numVerifiesCond chansync.BroadcastCond publicPieceState PieceState - priority PiecePriority + // Piece-specific priority. There are other priorities like File and Reader. + priority PiecePriority // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and // the Peer isn't recorded in Torrent.connsWithAllPieces. @@ -249,7 +251,7 @@ func (p *Piece) SetPriority(prio PiecePriority) { // This is priority based only on piece, file and reader priorities. func (p *Piece) purePriority() (ret PiecePriority) { - for f := range p.files() { + for _, f := range p.files() { ret.Raise(f.prio) } if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) { @@ -379,10 +381,10 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) { return } -func (p *Piece) files() iter.Seq[*File] { - return func(yield func(*File) bool) { +func (p *Piece) files() iter.Seq2[int, *File] { + return func(yield func(int, *File) bool) { for i := p.beginFile; i < p.endFile; i++ { - if !yield((*p.t.files)[i]) { + if !yield(i, (*p.t.files)[i]) { return } } @@ -423,3 +425,10 @@ func (p *Piece) publishStateChange() { }) } } + +func (p *Piece) fileExtents() iter.Seq2[int, segments.Extent] { + return p.t.info.FileSegmentsIndex().LocateIter(segments.Extent{ + p.torrentBeginOffset(), + segments.Int(p.length()), + }) +} diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 500110c3..3db71c49 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -81,7 +81,7 @@ func (r requestStrategyPiece) CountUnverified() bool { } func (r requestStrategyPiece) Request() bool { - return !r.p.ignoreForRequests() && r.p.purePriority() != PiecePriorityNone + return r.p.effectivePriority() > PiecePriorityNone } var _ requestStrategy.Piece = requestStrategyPiece{} diff --git a/segments/index.go b/segments/index.go index 73dca69a..f217bf5c 100644 --- a/segments/index.go +++ b/segments/index.go @@ -5,6 +5,7 @@ import ( "sort" g "github.com/anacrolix/generics" + "github.com/anacrolix/missinggo/v2/panicif" ) func NewIndex(segments LengthIter) (ret Index) { @@ -84,3 +85,24 @@ func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] { }) } } + +type IndexAndOffset struct { + Index int + Offset int64 +} + +// Returns the Extent that contains the given extent, if it exists. Panics if Extents overlap on the +// offset. +func (me Index) LocateOffset(off int64) (ret g.Option[IndexAndOffset]) { + // I think an Extent needs to have a non-zero to match against it? That's what this method is + // defining. + for i, e := range me.LocateIter(Extent{off, 1}) { + panicif.True(ret.Ok) + panicif.NotEq(e.Length, 1) + ret.Set(IndexAndOffset{ + Index: i, + Offset: e.Start, + }) + } + return +} diff --git a/t.go b/t.go index cbad1959..5b42a101 100644 --- a/t.go +++ b/t.go @@ -205,7 +205,8 @@ func (t *Torrent) CancelPieces(begin, end pieceIndex) { func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex, reason updateRequestReason) { for i := begin; i < end; i++ { - p := &t.pieces[i] + p := t.piece(i) + // Intentionally cancelling only the piece-specific priority here. if p.priority == PiecePriorityNone { continue } diff --git a/torrent.go b/torrent.go index b39ccffd..56ccf78a 100644 --- a/torrent.go +++ b/torrent.go @@ -119,9 +119,9 @@ type Torrent struct { _chunksPerRegularPiece chunkIndexType - webSeeds map[string]*webseedPeer - // Active peer connections, running message stream loops. TODO: Make this - // open (not-closed) connections only. + webSeeds map[webseedUrlKey]*webseedPeer + // Active peer connections, running message stream loops. TODO: Make this open (not-closed) + // connections only. conns map[*PeerConn]struct{} maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are @@ -983,7 +983,7 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { UrlList: func() []string { ret := make([]string, 0, len(t.webSeeds)) for url := range t.webSeeds { - ret = append(ret, url) + ret = append(ret, string(url)) } return ret }(), @@ -1117,6 +1117,12 @@ func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off) } +func (t *Torrent) getRequestIndexContainingOffset(off int64) RequestIndex { + req, ok := t.offsetRequest(off) + panicif.False(ok) + return t.requestIndexFromRequest(req) +} + func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) { n, err := t.piece(piece).Storage().WriteAt(data, begin) if err == nil && n != len(data) { @@ -3019,7 +3025,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { if t.cl.config.DisableWebseeds { return false } - if _, ok := t.webSeeds[url]; ok { + if _, ok := t.webSeeds[webseedUrlKey(url)]; ok { return false } // I don't think Go http supports pipelining requests. However, we can have more ready to go @@ -3074,7 +3080,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { if t.haveInfo() { ws.onGotInfo(t.info) } - t.webSeeds[url] = &ws + t.webSeeds[webseedUrlKey(url)] = &ws ws.peer.onNeedUpdateRequests("Torrent.addWebSeed") return true } @@ -3489,3 +3495,9 @@ func (t *Torrent) withSlogger(base *slog.Logger) *slog.Logger { }), "ih", *t.canonicalShortInfohash())) } + +func (t *Torrent) endRequestIndexForFileIndex(fileIndex int) RequestIndex { + f := t.Files()[fileIndex] + end := intCeilDiv(uint64(f.offset)+uint64(f.length), t.chunkSize.Uint64()) + return RequestIndex(end) +} diff --git a/webseed-peer.go b/webseed-peer.go index 5de29d77..0ad53d8c 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -53,6 +53,7 @@ func (me *webseedPeer) moreRequestsAllowed() bool { } func (me *webseedPeer) updateRequests() { + return if !me.shouldUpdateRequests() { return } @@ -308,49 +309,6 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) { return } -// -//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) { -// err = ws.readChunks(wr) -// switch { -// case err == nil: -// case ws.peer.closed.IsSet(): -// case errors.Is(err, context.Canceled): -// case errors.Is(err, webseed.ErrTooFast): -// default: -// -// } -// ws.peer.t.cl.lock() -// defer ws.peer.t.cl.unlock() -// if ws.peer.t.closed.IsSet() { -// return nil -// } -// if err != nil { -// switch { -// case errors.Is(err, context.Canceled): -// case errors.Is(err, webseed.ErrTooFast): -// case ws.peer.closed.IsSet(): -// default: -// ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) -// // // Here lies my attempt to extract something concrete from Go's error system. RIP. -// // cfg := spew.NewDefaultConfig() -// // cfg.DisableMethods = true -// // cfg.Dump(result.Err) -// -// if webseedPeerCloseOnUnhandledError { -// log.Printf("closing %v", ws) -// ws.peer.close() -// } else { -// ws.lastUnhandledErr = time.Now() -// } -// } -// if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { -// panic("invalid reject") -// } -// return err -// } -// return err -//} - func (me *webseedPeer) peerPieces() *roaring.Bitmap { return &me.client.Pieces } diff --git a/webseed-requesting.go b/webseed-requesting.go index 99b8d25f..d79d72c3 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -1,15 +1,24 @@ package torrent import ( + "cmp" + "iter" + "maps" "unique" - requestStrategy2 "github.com/anacrolix/torrent/internal/request-strategy" + g "github.com/anacrolix/generics" + "github.com/anacrolix/generics/heap" + "github.com/anacrolix/missinggo/v2/panicif" + "github.com/anacrolix/torrent/internal/request-strategy" "github.com/anacrolix/torrent/metainfo" ) +const defaultRequestsPerWebseedHost = 5 + type ( webseedHostKey string webseedHostKeyHandle = unique.Handle[webseedHostKey] + webseedUrlKey string ) /* @@ -21,16 +30,174 @@ type ( This was a globally aware webseed requestor algorithm that is probably going to be abandoned. */ func (cl *Client) abandonedUpdateWebSeedRequests() { - for key, value := range cl.pieceRequestOrder { - input := key.getRequestStrategyInput(cl) - requestStrategy2.GetRequestablePieces( - input, - value.pieces, - func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy2.PieceRequestOrderState) bool { - return true - }, - ) + type aprioriMapValue struct { + startOffset int64 + webseedRequestOrderValue + } + aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue) + for uniqueKey, value := range cl.iterWebseed() { + cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey] + if !ok || cmp.Or( + cmp.Compare(value.priority, cur.priority), + cmp.Compare(cur.startOffset, uniqueKey.startOffset), + ) > 0 { + aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value} + } + } + existingRequests := maps.Collect(cl.iterCurrentWebseedRequests()) + // TODO: Try maps.Clone here? We don't need the value but maybe cloning is just faster anyway? + unusedExistingRequests := make(map[webseedUniqueRequestKey]struct{}, len(existingRequests)) + for key := range existingRequests { + unusedExistingRequests[key] = struct{}{} + } + type heapElem struct { + webseedUniqueRequestKey + webseedRequestOrderValue + } + // Build the request heap, merging existing requests if they match. + heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests)) + for key, value := range aprioriMap { + fullKey := webseedUniqueRequestKey{key, value.startOffset} + heapValue := value.webseedRequestOrderValue + // If there's a matching existing request, make sure to include a reference to it in the + // heap value and deduplicate it. + existingValue, ok := existingRequests[fullKey] + if ok { + // Priorities should have been generated the same. + panicif.NotEq(value.priority, existingValue.priority) + // A-priori map should not have existing request associated with it. TODO: a-priori map + // value shouldn't need some fields. + panicif.NotZero(value.existingWebseedRequest) + heapValue.existingWebseedRequest = existingValue.existingWebseedRequest + // Now the values should match exactly. + panicif.NotEq(heapValue, existingValue) + g.MustDelete(unusedExistingRequests, fullKey) + } + heapSlice = append(heapSlice, heapElem{ + fullKey, + heapValue, + }) + } + // Add remaining existing requests. + for key := range unusedExistingRequests { + heapSlice = append(heapSlice, heapElem{key, existingRequests[key]}) + } + aprioriHeap := heap.InterfaceForSlice( + &heapSlice, + func(l heapElem, r heapElem) bool { + // Prefer the highest priority, then existing requests, then longest remaining file extent. + return cmp.Or( + cmp.Compare(l.priority, r.priority), + // Existing requests are assigned the priority of the piece they're reading next. + compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil), + // This won't thrash because we already preferred existing requests, so we'll finish out small extents. + -cmp.Compare( + l.t.Files()[l.fileIndex].length-l.startOffset, + r.t.Files()[r.fileIndex].length-r.startOffset), + ) < 0 + }, + ) + + unwantedExistingRequests := maps.Clone(existingRequests) + + heap.Init(aprioriHeap) + var plan webseedRequestPlan + for aprioriHeap.Len() > 0 { + elem := heap.Pop(aprioriHeap) + // Pulling the pregenerated form avoids unique.Handle, and possible URL parsing and error + // handling overhead. Need the value to avoid looking this up again. + costKey := elem.costKey + panicif.Zero(costKey) + if len(plan.byCost[costKey]) >= defaultRequestsPerWebseedHost { + continue + } + g.MakeMapIfNil(&plan.byCost) + requestKey := elem.webseedUniqueRequestKey + plan.byCost[costKey] = append(plan.byCost[costKey], requestKey) + delete(unwantedExistingRequests, requestKey) + } + + // Cancel any existing requests that are no longer wanted. + for key, value := range unwantedExistingRequests { + key.t.slogger().Debug("cancelling deprioritized existing webseed request", "webseedUrl", key.url, "fileIndex", key.fileIndex) + value.existingWebseedRequest.Cancel() + } + + for _, requestKeys := range plan.byCost { + for _, requestKey := range requestKeys { + if g.MapContains(existingRequests, requestKey) { + continue + } + t := requestKey.t + // Run the request to the end of the file for now. TODO: Set a reasonable end so the + // remote doesn't oversend. + t.webSeeds[requestKey.url].spawnRequest( + t.getRequestIndexContainingOffset(requestKey.startOffset), + t.endRequestIndexForFileIndex(requestKey.fileIndex)) + } + } +} + +type webseedRequestPlan struct { + byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey +} + +type aprioriWebseedRequestKey struct { + t *Torrent + fileIndex int + url webseedUrlKey +} + +// To allow multiple requests to the object. +type webseedUniqueRequestKey struct { + aprioriWebseedRequestKey + startOffset int64 +} + +type webseedRequestOrderValue struct { + priority PiecePriority + // Used for cancellation if this is deprioritized. Also might be a faster way to sort for + // existing requests. + existingWebseedRequest *webseedRequest + costKey webseedHostKeyHandle +} + +// Yields possible webseed requests by piece. Caller should filter and prioritize these. TODO: +// Doesn't handle dirty chunks. +func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { + return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { + for key, value := range cl.pieceRequestOrder { + input := key.getRequestStrategyInput(cl) + requestStrategy.GetRequestablePieces( + input, + value.pieces, + func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool { + t := cl.torrentsByShortHash[ih] + for i, e := range cl.torrentsByShortHash[ih].piece(pieceIndex).fileExtents() { + for url, ws := range t.webSeeds { + yield( + webseedUniqueRequestKey{ + aprioriWebseedRequestKey{ + t: t, + fileIndex: i, + url: url, + }, + e.Start, + }, + webseedRequestOrderValue{ + priority: orderState.Priority, + costKey: ws.hostKey, + }, + ) + } + } + // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen. + return false + }, + ) + } } + } func (cl *Client) updateWebSeedRequests(reason updateRequestReason) { @@ -40,3 +207,49 @@ func (cl *Client) updateWebSeedRequests(reason updateRequestReason) { } } } + +func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] { + return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) { + for t := range cl.torrents { + for url, ws := range t.webSeeds { + for ar := range ws.activeRequests { + off := t.requestIndexBegin(ar.next) + opt := t.info.FileSegmentsIndex().LocateOffset(off) + if !opt.Ok { + continue + } + p := t.pieceForOffset(off) + if !yield( + webseedUniqueRequestKey{ + aprioriWebseedRequestKey{ + t: t, + fileIndex: opt.Value.Index, + url: url, + }, + opt.Value.Offset, + }, + webseedRequestOrderValue{ + priority: p.effectivePriority(), + existingWebseedRequest: ar, + costKey: ws.hostKey, + }, + ) { + return + } + } + } + } + } +} + +func (cl *Client) updateWebseedRequests() { + cl.abandonedUpdateWebSeedRequests() + // Should have already run to get here. + cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval) +} + +func (cl *Client) updateWebseedRequestsTimerFunc() { + cl.lock() + defer cl.unlock() + cl.updateWebseedRequests() +} diff --git a/webseed/client.go b/webseed/client.go index fb5c824e..52d4bb81 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -75,6 +75,11 @@ type RequestResult struct { Err error } +// Returns the URL for the given file index. This is assumed to be globally unique. +func (ws *Client) UrlForFileIndex(fileIndex int) string { + return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper) +} + func (ws *Client) StartNewRequest(r RequestSpec) Request { ctx, cancel := context.WithCancel(context.TODO()) var requestParts []requestPart diff --git a/webseed/request.go b/webseed/request.go index 63b21665..5ec7afc0 100644 --- a/webseed/request.go +++ b/webseed/request.go @@ -46,19 +46,30 @@ func trailingPath( } // Creates a request per BEP 19. -func newRequest( - ctx context.Context, +func urlForFileIndex( url_ string, fileIndex int, info *metainfo.Info, - offset, length int64, pathEscaper PathEscaper, -) (*http.Request, error) { +) string { fileInfo := info.UpvertedFiles()[fileIndex] if strings.HasSuffix(url_, "/") { // BEP specifies that we append the file path. We need to escape each component of the path // for things like spaces and '#'. url_ += trailingPath(info.BestName(), fileInfo.BestPath(), pathEscaper) } + return url_ +} + +// Creates a request per BEP 19. +func newRequest( + ctx context.Context, + url_ string, fileIndex int, + info *metainfo.Info, + offset, length int64, + pathEscaper PathEscaper, +) (*http.Request, error) { + fileInfo := info.UpvertedFiles()[fileIndex] + url_ = urlForFileIndex(url_, fileIndex, info, pathEscaper) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url_, nil) if err != nil { return nil, err -- 2.51.0