]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add global webseed requests on a timer
authorMatt Joiner <anacrolix@gmail.com>
Thu, 26 Jun 2025 12:51:05 +0000 (22:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 26 Jun 2025 12:51:05 +0000 (22:51 +1000)
15 files changed:
client.go
cmp.go [new file with mode: 0644]
dht.go
go.mod
go.sum
internal/request-strategy/order.go
piece.go
request-strategy-impls.go
segments/index.go
t.go
torrent.go
webseed-peer.go
webseed-requesting.go
webseed/client.go
webseed/request.go

index 74d5d7e1c21b03a52aff48504163e10c7d868b98..18919d9e5987f33f28389e2dfad15bfd717e50b8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -33,7 +33,6 @@ import (
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
        "github.com/cespare/xxhash"
-       "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        gbtree "github.com/google/btree"
        "github.com/pion/webrtc/v4"
@@ -52,6 +51,8 @@ import (
        "github.com/anacrolix/torrent/webtorrent"
 )
 
+const webseedRequestUpdateTimerInterval = time.Second
+
 // Clients contain zero or more Torrents. A Client manages a blocklist, the
 // TCP/UDP protocol ports, and DHT as desired.
 type Client struct {
@@ -105,6 +106,8 @@ type Client struct {
        defaultLocalLtepProtocolMap LocalLtepProtocolMap
 
        upnpMappings []*upnpMapping
+
+       webseedRequestTimer *time.Timer
 }
 
 type ipStr string
@@ -142,24 +145,7 @@ func (cl *Client) LocalPort() (port int) {
        return
 }
 
-func writeDhtServerStatus(w io.Writer, s DhtServer) {
-       dhtStats := s.Stats()
-       fmt.Fprintf(w, " ID: %x\n", s.ID())
-       spew.Fdump(w, dhtStats)
-}
-
-func compareBool(a, b bool) int {
-       if a == b {
-               return 0
-       }
-       if b {
-               return -1
-       }
-       return 1
-}
-
-// Writes out a human readable status of the client, such as for writing to a
-// HTTP status page.
+// Writes out a human-readable status of the client, such as for writing to an HTTP status page.
 func (cl *Client) WriteStatus(_w io.Writer) {
        cl.rLock()
        defer cl.rUnlock()
@@ -402,6 +388,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                },
        }
 
+       cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
+
        return
 }
 
@@ -1937,7 +1925,7 @@ func (cl *Client) Stats() ClientStats {
 
 func (cl *Client) underWebSeedHttpRequestLimit(key webseedHostKeyHandle) bool {
        panicif.Zero(key)
-       return cl.numWebSeedRequests[key] < 5
+       return cl.numWebSeedRequests[key] < defaultRequestsPerWebseedHost
 }
 
 func (cl *Client) countWebSeedHttpRequests() (num int) {
diff --git a/cmp.go b/cmp.go
new file mode 100644 (file)
index 0000000..31cc2bf
--- /dev/null
+++ b/cmp.go
@@ -0,0 +1,11 @@
+package torrent
+
+func compareBool(a, b bool) int {
+       if a == b {
+               return 0
+       }
+       if b {
+               return -1
+       }
+       return 1
+}
diff --git a/dht.go b/dht.go
index 77975a2f5d19337db3fd18e32d106a4b5fc39542..e001bd66d870788bcfd37683a379a674eb2323bb 100644 (file)
--- a/dht.go
+++ b/dht.go
@@ -1,12 +1,14 @@
 package torrent
 
 import (
+       "fmt"
        "io"
        "net"
 
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/dht/v2/krpc"
        peer_store "github.com/anacrolix/dht/v2/peer-store"
+       "github.com/davecgh/go-spew/spew"
 )
 
 // DHT server interface for use by a Torrent or Client. It's reasonable for this to make assumptions
@@ -60,3 +62,9 @@ func (me AnacrolixDhtServerWrapper) Ping(addr *net.UDPAddr) {
 }
 
 var _ DhtServer = AnacrolixDhtServerWrapper{}
+
+func writeDhtServerStatus(w io.Writer, s DhtServer) {
+       dhtStats := s.Stats()
+       fmt.Fprintf(w, " ID: %x\n", s.ID())
+       spew.Fdump(w, dhtStats)
+}
diff --git a/go.mod b/go.mod
index ea4c4c3c5ff905b2da2bfd48f64168f570ff7a99..38d4c9f7f15b55ae8042c4b55c9a7878893d70dd 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,7 @@ require (
        github.com/anacrolix/gostdapp v0.1.0
        github.com/anacrolix/log v0.16.1-0.20250526073428-5cb74e15092b
        github.com/anacrolix/missinggo v1.3.0
-       github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797
+       github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56
        github.com/anacrolix/multiless v0.4.0
        github.com/anacrolix/possum/go v0.3.2
        github.com/anacrolix/squirrel v0.6.4
diff --git a/go.sum b/go.sum
index 73a83a0ad06aa20bc1350f1be540a54c3c1de1bd..c9b7cd270b3d7a7df01535e93341a844b4f64dde 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -107,6 +107,8 @@ github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbU
 github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA=
 github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797 h1:VAfIW3RwRBTZM7V6auEZC0eBPo94ht/R6ywrADNA0q8=
 github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0=
+github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 h1:+VSnod9Zipey/E5mDTrhooV9y8A8ZaUHSzG/TnrIHug=
+github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0=
 github.com/anacrolix/mmsg v1.0.1 h1:TxfpV7kX70m3f/O7ielL/2I3OFkMPjrRCPo7+4X5AWw=
 github.com/anacrolix/mmsg v1.0.1/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc=
 github.com/anacrolix/multiless v0.4.0 h1:lqSszHkliMsZd2hsyrDvHOw4AbYWa+ijQ66LzbjqWjM=
index bffe6a7a84297540e27cc58316dc04bec36ae8b9..d3ede920ddf6a155a49202253dc1901c73341aa0 100644 (file)
@@ -30,7 +30,8 @@ func pieceOrderLess(i, j *PieceRequestOrderItem) multiless.Computation {
                j.State.Partial, i.State.Partial,
        ).Int(
                // If this is done with relative availability, do we lose some determinism? If completeness
-               // is used, would that push this far enough down?
+               // is used, would that push this far enough down? What happens if we have a piece in the
+               // order, but it has availability 0?
                i.State.Availability, j.State.Availability,
        ).Int(
                i.Key.Index, j.Key.Index,
index 37bb588bc396bc0cb9a4d53977bdad5ee9effe49..b1982f0a3fac3e1ef2c2195d0d673646c98bd12e 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -11,6 +11,7 @@ import (
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/panicif"
+       "github.com/anacrolix/torrent/segments"
 
        "github.com/anacrolix/torrent/merkle"
        "github.com/anacrolix/torrent/metainfo"
@@ -38,7 +39,8 @@ type Piece struct {
        numVerifiesCond chansync.BroadcastCond
 
        publicPieceState PieceState
-       priority         PiecePriority
+       // Piece-specific priority. There are other priorities like File and Reader.
+       priority PiecePriority
        // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
        // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
        // the Peer isn't recorded in Torrent.connsWithAllPieces.
@@ -249,7 +251,7 @@ func (p *Piece) SetPriority(prio PiecePriority) {
 
 // This is priority based only on piece, file and reader priorities.
 func (p *Piece) purePriority() (ret PiecePriority) {
-       for f := range p.files() {
+       for _, f := range p.files() {
                ret.Raise(f.prio)
        }
        if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
@@ -379,10 +381,10 @@ func (p *Piece) obtainHashV2() (hash [32]byte, err error) {
        return
 }
 
-func (p *Piece) files() iter.Seq[*File] {
-       return func(yield func(*File) bool) {
+func (p *Piece) files() iter.Seq2[int, *File] {
+       return func(yield func(int, *File) bool) {
                for i := p.beginFile; i < p.endFile; i++ {
-                       if !yield((*p.t.files)[i]) {
+                       if !yield(i, (*p.t.files)[i]) {
                                return
                        }
                }
@@ -423,3 +425,10 @@ func (p *Piece) publishStateChange() {
                })
        }
 }
+
+func (p *Piece) fileExtents() iter.Seq2[int, segments.Extent] {
+       return p.t.info.FileSegmentsIndex().LocateIter(segments.Extent{
+               p.torrentBeginOffset(),
+               segments.Int(p.length()),
+       })
+}
index 500110c3f27002379b20ee57288b5ca1748e4abe..3db71c49e2d8e367d4f376cd788544dbf9db793e 100644 (file)
@@ -81,7 +81,7 @@ func (r requestStrategyPiece) CountUnverified() bool {
 }
 
 func (r requestStrategyPiece) Request() bool {
-       return !r.p.ignoreForRequests() && r.p.purePriority() != PiecePriorityNone
+       return r.p.effectivePriority() > PiecePriorityNone
 }
 
 var _ requestStrategy.Piece = requestStrategyPiece{}
index 73dca69a0029224a4b6e5b9f36a541a5ba61112e..f217bf5cf964b09170b04ad7b1da7537357935e2 100644 (file)
@@ -5,6 +5,7 @@ import (
        "sort"
 
        g "github.com/anacrolix/generics"
+       "github.com/anacrolix/missinggo/v2/panicif"
 )
 
 func NewIndex(segments LengthIter) (ret Index) {
@@ -84,3 +85,24 @@ func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] {
                })
        }
 }
+
+type IndexAndOffset struct {
+       Index  int
+       Offset int64
+}
+
+// Returns the Extent that contains the given extent, if it exists. Panics if Extents overlap on the
+// offset.
+func (me Index) LocateOffset(off int64) (ret g.Option[IndexAndOffset]) {
+       // I think an Extent needs to have a non-zero to match against it? That's what this method is
+       // defining.
+       for i, e := range me.LocateIter(Extent{off, 1}) {
+               panicif.True(ret.Ok)
+               panicif.NotEq(e.Length, 1)
+               ret.Set(IndexAndOffset{
+                       Index:  i,
+                       Offset: e.Start,
+               })
+       }
+       return
+}
diff --git a/t.go b/t.go
index cbad1959f9ccc255e37b0a85106a5f2f2ea65fa8..5b42a10170ed1b41e59ce71fa88f796128ec8901 100644 (file)
--- a/t.go
+++ b/t.go
@@ -205,7 +205,8 @@ func (t *Torrent) CancelPieces(begin, end pieceIndex) {
 
 func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex, reason updateRequestReason) {
        for i := begin; i < end; i++ {
-               p := &t.pieces[i]
+               p := t.piece(i)
+               // Intentionally cancelling only the piece-specific priority here.
                if p.priority == PiecePriorityNone {
                        continue
                }
index b39ccffdeebf953fadf8205fadc18230da2acdbc..56ccf78a2d474e0a304a81f89f4415ecc07fc59b 100644 (file)
@@ -119,9 +119,9 @@ type Torrent struct {
 
        _chunksPerRegularPiece chunkIndexType
 
-       webSeeds map[string]*webseedPeer
-       // Active peer connections, running message stream loops. TODO: Make this
-       // open (not-closed) connections only.
+       webSeeds map[webseedUrlKey]*webseedPeer
+       // Active peer connections, running message stream loops. TODO: Make this open (not-closed)
+       // connections only.
        conns               map[*PeerConn]struct{}
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
@@ -983,7 +983,7 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
                UrlList: func() []string {
                        ret := make([]string, 0, len(t.webSeeds))
                        for url := range t.webSeeds {
-                               ret = append(ret, url)
+                               ret = append(ret, string(url))
                        }
                        return ret
                }(),
@@ -1117,6 +1117,12 @@ func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
        return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
 }
 
+func (t *Torrent) getRequestIndexContainingOffset(off int64) RequestIndex {
+       req, ok := t.offsetRequest(off)
+       panicif.False(ok)
+       return t.requestIndexFromRequest(req)
+}
+
 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
        n, err := t.piece(piece).Storage().WriteAt(data, begin)
        if err == nil && n != len(data) {
@@ -3019,7 +3025,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
        if t.cl.config.DisableWebseeds {
                return false
        }
-       if _, ok := t.webSeeds[url]; ok {
+       if _, ok := t.webSeeds[webseedUrlKey(url)]; ok {
                return false
        }
        // I don't think Go http supports pipelining requests. However, we can have more ready to go
@@ -3074,7 +3080,7 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
        if t.haveInfo() {
                ws.onGotInfo(t.info)
        }
-       t.webSeeds[url] = &ws
+       t.webSeeds[webseedUrlKey(url)] = &ws
        ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
        return true
 }
@@ -3489,3 +3495,9 @@ func (t *Torrent) withSlogger(base *slog.Logger) *slog.Logger {
                }),
                "ih", *t.canonicalShortInfohash()))
 }
+
+func (t *Torrent) endRequestIndexForFileIndex(fileIndex int) RequestIndex {
+       f := t.Files()[fileIndex]
+       end := intCeilDiv(uint64(f.offset)+uint64(f.length), t.chunkSize.Uint64())
+       return RequestIndex(end)
+}
index 5de29d77aa549c55db9d8816b391aa1fd118ec34..0ad53d8c8bfafe2b5fcca7f1f79d2632da212d69 100644 (file)
@@ -53,6 +53,7 @@ func (me *webseedPeer) moreRequestsAllowed() bool {
 }
 
 func (me *webseedPeer) updateRequests() {
+       return
        if !me.shouldUpdateRequests() {
                return
        }
@@ -308,49 +309,6 @@ func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) {
        return
 }
 
-//
-//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) {
-//     err = ws.readChunks(wr)
-//     switch {
-//     case err == nil:
-//     case ws.peer.closed.IsSet():
-//     case errors.Is(err, context.Canceled):
-//     case errors.Is(err, webseed.ErrTooFast):
-//     default:
-//
-//     }
-//     ws.peer.t.cl.lock()
-//     defer ws.peer.t.cl.unlock()
-//     if ws.peer.t.closed.IsSet() {
-//             return nil
-//     }
-//     if err != nil {
-//             switch {
-//             case errors.Is(err, context.Canceled):
-//             case errors.Is(err, webseed.ErrTooFast):
-//             case ws.peer.closed.IsSet():
-//             default:
-//                     ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
-//                     // // Here lies my attempt to extract something concrete from Go's error system. RIP.
-//                     // cfg := spew.NewDefaultConfig()
-//                     // cfg.DisableMethods = true
-//                     // cfg.Dump(result.Err)
-//
-//                     if webseedPeerCloseOnUnhandledError {
-//                             log.Printf("closing %v", ws)
-//                             ws.peer.close()
-//                     } else {
-//                             ws.lastUnhandledErr = time.Now()
-//                     }
-//             }
-//             if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
-//                     panic("invalid reject")
-//             }
-//             return err
-//     }
-//     return err
-//}
-
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
        return &me.client.Pieces
 }
index 99b8d25f9a279c54d61494f9b0c22c54799dde9c..d79d72c3eb024b1d7a2fb6b3fa56361bb2fb7747 100644 (file)
@@ -1,15 +1,24 @@
 package torrent
 
 import (
+       "cmp"
+       "iter"
+       "maps"
        "unique"
 
-       requestStrategy2 "github.com/anacrolix/torrent/internal/request-strategy"
+       g "github.com/anacrolix/generics"
+       "github.com/anacrolix/generics/heap"
+       "github.com/anacrolix/missinggo/v2/panicif"
+       "github.com/anacrolix/torrent/internal/request-strategy"
        "github.com/anacrolix/torrent/metainfo"
 )
 
+const defaultRequestsPerWebseedHost = 5
+
 type (
        webseedHostKey       string
        webseedHostKeyHandle = unique.Handle[webseedHostKey]
+       webseedUrlKey        string
 )
 
 /*
@@ -21,16 +30,174 @@ type (
 This was a globally aware webseed requestor algorithm that is probably going to be abandoned.
 */
 func (cl *Client) abandonedUpdateWebSeedRequests() {
-       for key, value := range cl.pieceRequestOrder {
-               input := key.getRequestStrategyInput(cl)
-               requestStrategy2.GetRequestablePieces(
-                       input,
-                       value.pieces,
-                       func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy2.PieceRequestOrderState) bool {
-                               return true
-                       },
-               )
+       type aprioriMapValue struct {
+               startOffset int64
+               webseedRequestOrderValue
+       }
+       aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
+       for uniqueKey, value := range cl.iterWebseed() {
+               cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
+               if !ok || cmp.Or(
+                       cmp.Compare(value.priority, cur.priority),
+                       cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+               ) > 0 {
+                       aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+               }
+       }
+       existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
+       // TODO: Try maps.Clone here? We don't need the value but maybe cloning is just faster anyway?
+       unusedExistingRequests := make(map[webseedUniqueRequestKey]struct{}, len(existingRequests))
+       for key := range existingRequests {
+               unusedExistingRequests[key] = struct{}{}
+       }
+       type heapElem struct {
+               webseedUniqueRequestKey
+               webseedRequestOrderValue
+       }
+       // Build the request heap, merging existing requests if they match.
+       heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
+       for key, value := range aprioriMap {
+               fullKey := webseedUniqueRequestKey{key, value.startOffset}
+               heapValue := value.webseedRequestOrderValue
+               // If there's a matching existing request, make sure to include a reference to it in the
+               // heap value and deduplicate it.
+               existingValue, ok := existingRequests[fullKey]
+               if ok {
+                       // Priorities should have been generated the same.
+                       panicif.NotEq(value.priority, existingValue.priority)
+                       // A-priori map should not have existing request associated with it. TODO: a-priori map
+                       // value shouldn't need some fields.
+                       panicif.NotZero(value.existingWebseedRequest)
+                       heapValue.existingWebseedRequest = existingValue.existingWebseedRequest
+                       // Now the values should match exactly.
+                       panicif.NotEq(heapValue, existingValue)
+                       g.MustDelete(unusedExistingRequests, fullKey)
+               }
+               heapSlice = append(heapSlice, heapElem{
+                       fullKey,
+                       heapValue,
+               })
+       }
+       // Add remaining existing requests.
+       for key := range unusedExistingRequests {
+               heapSlice = append(heapSlice, heapElem{key, existingRequests[key]})
+       }
+       aprioriHeap := heap.InterfaceForSlice(
+               &heapSlice,
+               func(l heapElem, r heapElem) bool {
+                       // Prefer the highest priority, then existing requests, then longest remaining file extent.
+                       return cmp.Or(
+                               cmp.Compare(l.priority, r.priority),
+                               // Existing requests are assigned the priority of the piece they're reading next.
+                               compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil),
+                               // This won't thrash because we already preferred existing requests, so we'll finish out small extents.
+                               -cmp.Compare(
+                                       l.t.Files()[l.fileIndex].length-l.startOffset,
+                                       r.t.Files()[r.fileIndex].length-r.startOffset),
+                       ) < 0
+               },
+       )
+
+       unwantedExistingRequests := maps.Clone(existingRequests)
+
+       heap.Init(aprioriHeap)
+       var plan webseedRequestPlan
+       for aprioriHeap.Len() > 0 {
+               elem := heap.Pop(aprioriHeap)
+               // Pulling the pregenerated form avoids unique.Handle, and possible URL parsing and error
+               // handling overhead. Need the value to avoid looking this up again.
+               costKey := elem.costKey
+               panicif.Zero(costKey)
+               if len(plan.byCost[costKey]) >= defaultRequestsPerWebseedHost {
+                       continue
+               }
+               g.MakeMapIfNil(&plan.byCost)
+               requestKey := elem.webseedUniqueRequestKey
+               plan.byCost[costKey] = append(plan.byCost[costKey], requestKey)
+               delete(unwantedExistingRequests, requestKey)
+       }
+
+       // Cancel any existing requests that are no longer wanted.
+       for key, value := range unwantedExistingRequests {
+               key.t.slogger().Debug("cancelling deprioritized existing webseed request", "webseedUrl", key.url, "fileIndex", key.fileIndex)
+               value.existingWebseedRequest.Cancel()
+       }
+
+       for _, requestKeys := range plan.byCost {
+               for _, requestKey := range requestKeys {
+                       if g.MapContains(existingRequests, requestKey) {
+                               continue
+                       }
+                       t := requestKey.t
+                       // Run the request to the end of the file for now. TODO: Set a reasonable end so the
+                       // remote doesn't oversend.
+                       t.webSeeds[requestKey.url].spawnRequest(
+                               t.getRequestIndexContainingOffset(requestKey.startOffset),
+                               t.endRequestIndexForFileIndex(requestKey.fileIndex))
+               }
+       }
+}
+
+type webseedRequestPlan struct {
+       byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
+}
+
+type aprioriWebseedRequestKey struct {
+       t         *Torrent
+       fileIndex int
+       url       webseedUrlKey
+}
+
+// To allow multiple requests to the object.
+type webseedUniqueRequestKey struct {
+       aprioriWebseedRequestKey
+       startOffset int64
+}
+
+type webseedRequestOrderValue struct {
+       priority PiecePriority
+       // Used for cancellation if this is deprioritized. Also might be a faster way to sort for
+       // existing requests.
+       existingWebseedRequest *webseedRequest
+       costKey                webseedHostKeyHandle
+}
+
+// Yields possible webseed requests by piece. Caller should filter and prioritize these. TODO:
+// Doesn't handle dirty chunks.
+func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+       return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+               for key, value := range cl.pieceRequestOrder {
+                       input := key.getRequestStrategyInput(cl)
+                       requestStrategy.GetRequestablePieces(
+                               input,
+                               value.pieces,
+                               func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool {
+                                       t := cl.torrentsByShortHash[ih]
+                                       for i, e := range cl.torrentsByShortHash[ih].piece(pieceIndex).fileExtents() {
+                                               for url, ws := range t.webSeeds {
+                                                       yield(
+                                                               webseedUniqueRequestKey{
+                                                                       aprioriWebseedRequestKey{
+                                                                               t:         t,
+                                                                               fileIndex: i,
+                                                                               url:       url,
+                                                                       },
+                                                                       e.Start,
+                                                               },
+                                                               webseedRequestOrderValue{
+                                                                       priority: orderState.Priority,
+                                                                       costKey:  ws.hostKey,
+                                                               },
+                                                       )
+                                               }
+                                       }
+                                       // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen.
+                                       return false
+                               },
+                       )
+               }
        }
+
 }
 
 func (cl *Client) updateWebSeedRequests(reason updateRequestReason) {
@@ -40,3 +207,49 @@ func (cl *Client) updateWebSeedRequests(reason updateRequestReason) {
                }
        }
 }
+
+func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+       return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+               for t := range cl.torrents {
+                       for url, ws := range t.webSeeds {
+                               for ar := range ws.activeRequests {
+                                       off := t.requestIndexBegin(ar.next)
+                                       opt := t.info.FileSegmentsIndex().LocateOffset(off)
+                                       if !opt.Ok {
+                                               continue
+                                       }
+                                       p := t.pieceForOffset(off)
+                                       if !yield(
+                                               webseedUniqueRequestKey{
+                                                       aprioriWebseedRequestKey{
+                                                               t:         t,
+                                                               fileIndex: opt.Value.Index,
+                                                               url:       url,
+                                                       },
+                                                       opt.Value.Offset,
+                                               },
+                                               webseedRequestOrderValue{
+                                                       priority:               p.effectivePriority(),
+                                                       existingWebseedRequest: ar,
+                                                       costKey:                ws.hostKey,
+                                               },
+                                       ) {
+                                               return
+                                       }
+                               }
+                       }
+               }
+       }
+}
+
+func (cl *Client) updateWebseedRequests() {
+       cl.abandonedUpdateWebSeedRequests()
+       // Should have already run to get here.
+       cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval)
+}
+
+func (cl *Client) updateWebseedRequestsTimerFunc() {
+       cl.lock()
+       defer cl.unlock()
+       cl.updateWebseedRequests()
+}
index fb5c824e0d4b720f1fb183c504f9d48c678f1af5..52d4bb818a91260af1954ebe760e8fef703ffbf8 100644 (file)
@@ -75,6 +75,11 @@ type RequestResult struct {
        Err   error
 }
 
+// Returns the URL for the given file index. This is assumed to be globally unique.
+func (ws *Client) UrlForFileIndex(fileIndex int) string {
+       return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper)
+}
+
 func (ws *Client) StartNewRequest(r RequestSpec) Request {
        ctx, cancel := context.WithCancel(context.TODO())
        var requestParts []requestPart
index 63b21665252b30fbfc31ae7402121e91526318b6..5ec7afc08377a46404242e9e369e0ddf9d5ae020 100644 (file)
@@ -46,19 +46,30 @@ func trailingPath(
 }
 
 // Creates a request per BEP 19.
-func newRequest(
-       ctx context.Context,
+func urlForFileIndex(
        url_ string, fileIndex int,
        info *metainfo.Info,
-       offset, length int64,
        pathEscaper PathEscaper,
-) (*http.Request, error) {
+) string {
        fileInfo := info.UpvertedFiles()[fileIndex]
        if strings.HasSuffix(url_, "/") {
                // BEP specifies that we append the file path. We need to escape each component of the path
                // for things like spaces and '#'.
                url_ += trailingPath(info.BestName(), fileInfo.BestPath(), pathEscaper)
        }
+       return url_
+}
+
+// Creates a request per BEP 19.
+func newRequest(
+       ctx context.Context,
+       url_ string, fileIndex int,
+       info *metainfo.Info,
+       offset, length int64,
+       pathEscaper PathEscaper,
+) (*http.Request, error) {
+       fileInfo := info.UpvertedFiles()[fileIndex]
+       url_ = urlForFileIndex(url_, fileIndex, info, pathEscaper)
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, url_, nil)
        if err != nil {
                return nil, err