"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
"github.com/cespare/xxhash"
- "github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
gbtree "github.com/google/btree"
"github.com/pion/webrtc/v4"
"github.com/anacrolix/torrent/webtorrent"
)
+const webseedRequestUpdateTimerInterval = time.Second
+
// Clients contain zero or more Torrents. A Client manages a blocklist, the
// TCP/UDP protocol ports, and DHT as desired.
type Client struct {
defaultLocalLtepProtocolMap LocalLtepProtocolMap
upnpMappings []*upnpMapping
+
+ webseedRequestTimer *time.Timer
}
type ipStr string
return
}
-func writeDhtServerStatus(w io.Writer, s DhtServer) {
- dhtStats := s.Stats()
- fmt.Fprintf(w, " ID: %x\n", s.ID())
- spew.Fdump(w, dhtStats)
-}
-
-func compareBool(a, b bool) int {
- if a == b {
- return 0
- }
- if b {
- return -1
- }
- return 1
-}
-
-// Writes out a human readable status of the client, such as for writing to a
-// HTTP status page.
+// Writes out a human-readable status of the client, such as for writing to an HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
cl.rLock()
defer cl.rUnlock()
},
}
+ cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
+
return
}
func (cl *Client) underWebSeedHttpRequestLimit(key webseedHostKeyHandle) bool {
panicif.Zero(key)
- return cl.numWebSeedRequests[key] < 5
+ return cl.numWebSeedRequests[key] < defaultRequestsPerWebseedHost
}
func (cl *Client) countWebSeedHttpRequests() (num int) {
--- /dev/null
+package torrent
+
+func compareBool(a, b bool) int {
+ if a == b {
+ return 0
+ }
+ if b {
+ return -1
+ }
+ return 1
+}
package torrent
import (
+ "fmt"
"io"
"net"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
peer_store "github.com/anacrolix/dht/v2/peer-store"
+ "github.com/davecgh/go-spew/spew"
)
// DHT server interface for use by a Torrent or Client. It's reasonable for this to make assumptions
}
var _ DhtServer = AnacrolixDhtServerWrapper{}
+
+func writeDhtServerStatus(w io.Writer, s DhtServer) {
+ dhtStats := s.Stats()
+ fmt.Fprintf(w, " ID: %x\n", s.ID())
+ spew.Fdump(w, dhtStats)
+}
github.com/anacrolix/gostdapp v0.1.0
github.com/anacrolix/log v0.16.1-0.20250526073428-5cb74e15092b
github.com/anacrolix/missinggo v1.3.0
- github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797
+ github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56
github.com/anacrolix/multiless v0.4.0
github.com/anacrolix/possum/go v0.3.2
github.com/anacrolix/squirrel v0.6.4
github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA=
github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797 h1:VAfIW3RwRBTZM7V6auEZC0eBPo94ht/R6ywrADNA0q8=
github.com/anacrolix/missinggo/v2 v2.8.1-0.20250610025550-ddd9eb198797/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0=
+github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56 h1:+VSnod9Zipey/E5mDTrhooV9y8A8ZaUHSzG/TnrIHug=
+github.com/anacrolix/missinggo/v2 v2.8.1-0.20250626123431-aa4691b19d56/go.mod h1:vVO5FEziQm+NFmJesc7StpkquZk+WJFCaL0Wp//2sa0=
github.com/anacrolix/mmsg v1.0.1 h1:TxfpV7kX70m3f/O7ielL/2I3OFkMPjrRCPo7+4X5AWw=
github.com/anacrolix/mmsg v1.0.1/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc=
github.com/anacrolix/multiless v0.4.0 h1:lqSszHkliMsZd2hsyrDvHOw4AbYWa+ijQ66LzbjqWjM=
j.State.Partial, i.State.Partial,
).Int(
// If this is done with relative availability, do we lose some determinism? If completeness
- // is used, would that push this far enough down?
+ // is used, would that push this far enough down? What happens if we have a piece in the
+ // order, but it has availability 0?
i.State.Availability, j.State.Availability,
).Int(
i.Key.Index, j.Key.Index,
g "github.com/anacrolix/generics"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/panicif"
+ "github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/merkle"
"github.com/anacrolix/torrent/metainfo"
numVerifiesCond chansync.BroadcastCond
publicPieceState PieceState
- priority PiecePriority
+ // Piece-specific priority. There are other priorities like File and Reader.
+ priority PiecePriority
// Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
// incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
// the Peer isn't recorded in Torrent.connsWithAllPieces.
// This is priority based only on piece, file and reader priorities.
func (p *Piece) purePriority() (ret PiecePriority) {
- for f := range p.files() {
+ for _, f := range p.files() {
ret.Raise(f.prio)
}
if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
return
}
-func (p *Piece) files() iter.Seq[*File] {
- return func(yield func(*File) bool) {
+func (p *Piece) files() iter.Seq2[int, *File] {
+ return func(yield func(int, *File) bool) {
for i := p.beginFile; i < p.endFile; i++ {
- if !yield((*p.t.files)[i]) {
+ if !yield(i, (*p.t.files)[i]) {
return
}
}
})
}
}
+
+func (p *Piece) fileExtents() iter.Seq2[int, segments.Extent] {
+ return p.t.info.FileSegmentsIndex().LocateIter(segments.Extent{
+ p.torrentBeginOffset(),
+ segments.Int(p.length()),
+ })
+}
}
func (r requestStrategyPiece) Request() bool {
- return !r.p.ignoreForRequests() && r.p.purePriority() != PiecePriorityNone
+ return r.p.effectivePriority() > PiecePriorityNone
}
var _ requestStrategy.Piece = requestStrategyPiece{}
"sort"
g "github.com/anacrolix/generics"
+ "github.com/anacrolix/missinggo/v2/panicif"
)
func NewIndex(segments LengthIter) (ret Index) {
})
}
}
+
+type IndexAndOffset struct {
+ Index int
+ Offset int64
+}
+
+// Returns the Extent that contains the given extent, if it exists. Panics if Extents overlap on the
+// offset.
+func (me Index) LocateOffset(off int64) (ret g.Option[IndexAndOffset]) {
+ // I think an Extent needs to have a non-zero to match against it? That's what this method is
+ // defining.
+ for i, e := range me.LocateIter(Extent{off, 1}) {
+ panicif.True(ret.Ok)
+ panicif.NotEq(e.Length, 1)
+ ret.Set(IndexAndOffset{
+ Index: i,
+ Offset: e.Start,
+ })
+ }
+ return
+}
func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex, reason updateRequestReason) {
for i := begin; i < end; i++ {
- p := &t.pieces[i]
+ p := t.piece(i)
+ // Intentionally cancelling only the piece-specific priority here.
if p.priority == PiecePriorityNone {
continue
}
_chunksPerRegularPiece chunkIndexType
- webSeeds map[string]*webseedPeer
- // Active peer connections, running message stream loops. TODO: Make this
- // open (not-closed) connections only.
+ webSeeds map[webseedUrlKey]*webseedPeer
+ // Active peer connections, running message stream loops. TODO: Make this open (not-closed)
+ // connections only.
conns map[*PeerConn]struct{}
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
UrlList: func() []string {
ret := make([]string, 0, len(t.webSeeds))
for url := range t.webSeeds {
- ret = append(ret, url)
+ ret = append(ret, string(url))
}
return ret
}(),
return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
}
+func (t *Torrent) getRequestIndexContainingOffset(off int64) RequestIndex {
+ req, ok := t.offsetRequest(off)
+ panicif.False(ok)
+ return t.requestIndexFromRequest(req)
+}
+
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
n, err := t.piece(piece).Storage().WriteAt(data, begin)
if err == nil && n != len(data) {
if t.cl.config.DisableWebseeds {
return false
}
- if _, ok := t.webSeeds[url]; ok {
+ if _, ok := t.webSeeds[webseedUrlKey(url)]; ok {
return false
}
// I don't think Go http supports pipelining requests. However, we can have more ready to go
if t.haveInfo() {
ws.onGotInfo(t.info)
}
- t.webSeeds[url] = &ws
+ t.webSeeds[webseedUrlKey(url)] = &ws
ws.peer.onNeedUpdateRequests("Torrent.addWebSeed")
return true
}
}),
"ih", *t.canonicalShortInfohash()))
}
+
+func (t *Torrent) endRequestIndexForFileIndex(fileIndex int) RequestIndex {
+ f := t.Files()[fileIndex]
+ end := intCeilDiv(uint64(f.offset)+uint64(f.length), t.chunkSize.Uint64())
+ return RequestIndex(end)
+}
}
func (me *webseedPeer) updateRequests() {
+ return
if !me.shouldUpdateRequests() {
return
}
return
}
-//
-//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) {
-// err = ws.readChunks(wr)
-// switch {
-// case err == nil:
-// case ws.peer.closed.IsSet():
-// case errors.Is(err, context.Canceled):
-// case errors.Is(err, webseed.ErrTooFast):
-// default:
-//
-// }
-// ws.peer.t.cl.lock()
-// defer ws.peer.t.cl.unlock()
-// if ws.peer.t.closed.IsSet() {
-// return nil
-// }
-// if err != nil {
-// switch {
-// case errors.Is(err, context.Canceled):
-// case errors.Is(err, webseed.ErrTooFast):
-// case ws.peer.closed.IsSet():
-// default:
-// ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
-// // // Here lies my attempt to extract something concrete from Go's error system. RIP.
-// // cfg := spew.NewDefaultConfig()
-// // cfg.DisableMethods = true
-// // cfg.Dump(result.Err)
-//
-// if webseedPeerCloseOnUnhandledError {
-// log.Printf("closing %v", ws)
-// ws.peer.close()
-// } else {
-// ws.lastUnhandledErr = time.Now()
-// }
-// }
-// if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
-// panic("invalid reject")
-// }
-// return err
-// }
-// return err
-//}
-
func (me *webseedPeer) peerPieces() *roaring.Bitmap {
return &me.client.Pieces
}
package torrent
import (
+ "cmp"
+ "iter"
+ "maps"
"unique"
- requestStrategy2 "github.com/anacrolix/torrent/internal/request-strategy"
+ g "github.com/anacrolix/generics"
+ "github.com/anacrolix/generics/heap"
+ "github.com/anacrolix/missinggo/v2/panicif"
+ "github.com/anacrolix/torrent/internal/request-strategy"
"github.com/anacrolix/torrent/metainfo"
)
+const defaultRequestsPerWebseedHost = 5
+
type (
webseedHostKey string
webseedHostKeyHandle = unique.Handle[webseedHostKey]
+ webseedUrlKey string
)
/*
This was a globally aware webseed requestor algorithm that is probably going to be abandoned.
*/
func (cl *Client) abandonedUpdateWebSeedRequests() {
- for key, value := range cl.pieceRequestOrder {
- input := key.getRequestStrategyInput(cl)
- requestStrategy2.GetRequestablePieces(
- input,
- value.pieces,
- func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy2.PieceRequestOrderState) bool {
- return true
- },
- )
+ type aprioriMapValue struct {
+ startOffset int64
+ webseedRequestOrderValue
+ }
+ aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
+ for uniqueKey, value := range cl.iterWebseed() {
+ cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
+ if !ok || cmp.Or(
+ cmp.Compare(value.priority, cur.priority),
+ cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+ ) > 0 {
+ aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+ }
+ }
+ existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
+ // TODO: Try maps.Clone here? We don't need the value but maybe cloning is just faster anyway?
+ unusedExistingRequests := make(map[webseedUniqueRequestKey]struct{}, len(existingRequests))
+ for key := range existingRequests {
+ unusedExistingRequests[key] = struct{}{}
+ }
+ type heapElem struct {
+ webseedUniqueRequestKey
+ webseedRequestOrderValue
+ }
+ // Build the request heap, merging existing requests if they match.
+ heapSlice := make([]heapElem, 0, len(aprioriMap)+len(existingRequests))
+ for key, value := range aprioriMap {
+ fullKey := webseedUniqueRequestKey{key, value.startOffset}
+ heapValue := value.webseedRequestOrderValue
+ // If there's a matching existing request, make sure to include a reference to it in the
+ // heap value and deduplicate it.
+ existingValue, ok := existingRequests[fullKey]
+ if ok {
+ // Priorities should have been generated the same.
+ panicif.NotEq(value.priority, existingValue.priority)
+ // A-priori map should not have existing request associated with it. TODO: a-priori map
+ // value shouldn't need some fields.
+ panicif.NotZero(value.existingWebseedRequest)
+ heapValue.existingWebseedRequest = existingValue.existingWebseedRequest
+ // Now the values should match exactly.
+ panicif.NotEq(heapValue, existingValue)
+ g.MustDelete(unusedExistingRequests, fullKey)
+ }
+ heapSlice = append(heapSlice, heapElem{
+ fullKey,
+ heapValue,
+ })
+ }
+ // Add remaining existing requests.
+ for key := range unusedExistingRequests {
+ heapSlice = append(heapSlice, heapElem{key, existingRequests[key]})
+ }
+ aprioriHeap := heap.InterfaceForSlice(
+ &heapSlice,
+ func(l heapElem, r heapElem) bool {
+ // Prefer the highest priority, then existing requests, then longest remaining file extent.
+ return cmp.Or(
+ cmp.Compare(l.priority, r.priority),
+ // Existing requests are assigned the priority of the piece they're reading next.
+ compareBool(l.existingWebseedRequest == nil, r.existingWebseedRequest == nil),
+ // This won't thrash because we already preferred existing requests, so we'll finish out small extents.
+ -cmp.Compare(
+ l.t.Files()[l.fileIndex].length-l.startOffset,
+ r.t.Files()[r.fileIndex].length-r.startOffset),
+ ) < 0
+ },
+ )
+
+ unwantedExistingRequests := maps.Clone(existingRequests)
+
+ heap.Init(aprioriHeap)
+ var plan webseedRequestPlan
+ for aprioriHeap.Len() > 0 {
+ elem := heap.Pop(aprioriHeap)
+ // Pulling the pregenerated form avoids unique.Handle, and possible URL parsing and error
+ // handling overhead. Need the value to avoid looking this up again.
+ costKey := elem.costKey
+ panicif.Zero(costKey)
+ if len(plan.byCost[costKey]) >= defaultRequestsPerWebseedHost {
+ continue
+ }
+ g.MakeMapIfNil(&plan.byCost)
+ requestKey := elem.webseedUniqueRequestKey
+ plan.byCost[costKey] = append(plan.byCost[costKey], requestKey)
+ delete(unwantedExistingRequests, requestKey)
+ }
+
+ // Cancel any existing requests that are no longer wanted.
+ for key, value := range unwantedExistingRequests {
+ key.t.slogger().Debug("cancelling deprioritized existing webseed request", "webseedUrl", key.url, "fileIndex", key.fileIndex)
+ value.existingWebseedRequest.Cancel()
+ }
+
+ for _, requestKeys := range plan.byCost {
+ for _, requestKey := range requestKeys {
+ if g.MapContains(existingRequests, requestKey) {
+ continue
+ }
+ t := requestKey.t
+ // Run the request to the end of the file for now. TODO: Set a reasonable end so the
+ // remote doesn't oversend.
+ t.webSeeds[requestKey.url].spawnRequest(
+ t.getRequestIndexContainingOffset(requestKey.startOffset),
+ t.endRequestIndexForFileIndex(requestKey.fileIndex))
+ }
+ }
+}
+
+type webseedRequestPlan struct {
+ byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
+}
+
+type aprioriWebseedRequestKey struct {
+ t *Torrent
+ fileIndex int
+ url webseedUrlKey
+}
+
+// To allow multiple requests to the object.
+type webseedUniqueRequestKey struct {
+ aprioriWebseedRequestKey
+ startOffset int64
+}
+
+type webseedRequestOrderValue struct {
+ priority PiecePriority
+ // Used for cancellation if this is deprioritized. Also might be a faster way to sort for
+ // existing requests.
+ existingWebseedRequest *webseedRequest
+ costKey webseedHostKeyHandle
+}
+
+// Yields possible webseed requests by piece. Caller should filter and prioritize these. TODO:
+// Doesn't handle dirty chunks.
+func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+ return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+ for key, value := range cl.pieceRequestOrder {
+ input := key.getRequestStrategyInput(cl)
+ requestStrategy.GetRequestablePieces(
+ input,
+ value.pieces,
+ func(ih metainfo.Hash, pieceIndex int, orderState requestStrategy.PieceRequestOrderState) bool {
+ t := cl.torrentsByShortHash[ih]
+ for i, e := range cl.torrentsByShortHash[ih].piece(pieceIndex).fileExtents() {
+ for url, ws := range t.webSeeds {
+ yield(
+ webseedUniqueRequestKey{
+ aprioriWebseedRequestKey{
+ t: t,
+ fileIndex: i,
+ url: url,
+ },
+ e.Start,
+ },
+ webseedRequestOrderValue{
+ priority: orderState.Priority,
+ costKey: ws.hostKey,
+ },
+ )
+ }
+ }
+ // Pieces iterated here are only to select webseed requests. There's no guarantee they're chosen.
+ return false
+ },
+ )
+ }
}
+
}
func (cl *Client) updateWebSeedRequests(reason updateRequestReason) {
}
}
}
+
+func (cl *Client) iterCurrentWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+ return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
+ for t := range cl.torrents {
+ for url, ws := range t.webSeeds {
+ for ar := range ws.activeRequests {
+ off := t.requestIndexBegin(ar.next)
+ opt := t.info.FileSegmentsIndex().LocateOffset(off)
+ if !opt.Ok {
+ continue
+ }
+ p := t.pieceForOffset(off)
+ if !yield(
+ webseedUniqueRequestKey{
+ aprioriWebseedRequestKey{
+ t: t,
+ fileIndex: opt.Value.Index,
+ url: url,
+ },
+ opt.Value.Offset,
+ },
+ webseedRequestOrderValue{
+ priority: p.effectivePriority(),
+ existingWebseedRequest: ar,
+ costKey: ws.hostKey,
+ },
+ ) {
+ return
+ }
+ }
+ }
+ }
+ }
+}
+
+func (cl *Client) updateWebseedRequests() {
+ cl.abandonedUpdateWebSeedRequests()
+ // Should have already run to get here.
+ cl.webseedRequestTimer.Reset(webseedRequestUpdateTimerInterval)
+}
+
+func (cl *Client) updateWebseedRequestsTimerFunc() {
+ cl.lock()
+ defer cl.unlock()
+ cl.updateWebseedRequests()
+}
Err error
}
+// Returns the URL for the given file index. This is assumed to be globally unique.
+func (ws *Client) UrlForFileIndex(fileIndex int) string {
+ return urlForFileIndex(ws.Url, fileIndex, ws.info, ws.PathEscaper)
+}
+
func (ws *Client) StartNewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.TODO())
var requestParts []requestPart
}
// Creates a request per BEP 19.
-func newRequest(
- ctx context.Context,
+func urlForFileIndex(
url_ string, fileIndex int,
info *metainfo.Info,
- offset, length int64,
pathEscaper PathEscaper,
-) (*http.Request, error) {
+) string {
fileInfo := info.UpvertedFiles()[fileIndex]
if strings.HasSuffix(url_, "/") {
// BEP specifies that we append the file path. We need to escape each component of the path
// for things like spaces and '#'.
url_ += trailingPath(info.BestName(), fileInfo.BestPath(), pathEscaper)
}
+ return url_
+}
+
+// Creates a request per BEP 19.
+func newRequest(
+ ctx context.Context,
+ url_ string, fileIndex int,
+ info *metainfo.Info,
+ offset, length int64,
+ pathEscaper PathEscaper,
+) (*http.Request, error) {
+ fileInfo := info.UpvertedFiles()[fileIndex]
+ url_ = urlForFileIndex(url_, fileIndex, info, pathEscaper)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url_, nil)
if err != nil {
return nil, err