return p.t.dirtyChunks.Contains(p.requestIndexBegin() + chunk)
}
-func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) {
- it := p.t.dirtyChunks.Iterator()
- begin := uint32(p.requestIndexBegin())
- end := uint32(p.requestIndexMaxEnd())
- it.AdvanceIfNeeded(begin)
- for next := begin; next < end; next++ {
- if !it.HasNext() || it.Next() != next {
- return g.Some(chunkIndexType(next - begin))
+func (p *Piece) iterCleanChunks() iter.Seq[chunkIndexType] {
+ return func(yield func(chunkIndexType) bool) {
+ it := p.t.dirtyChunks.Iterator()
+ begin := uint32(p.requestIndexBegin())
+ end := uint32(p.requestIndexMaxEnd())
+ it.AdvanceIfNeeded(begin)
+ for next := begin; next < end; next++ {
+ if !it.HasNext() || it.Next() != next {
+ if !yield(chunkIndexType(next - begin)) {
+ return
+ }
+ }
}
+ return
+ }
+}
+
+func (p *Piece) firstCleanChunk() (_ g.Option[chunkIndexType]) {
+ for some := range p.iterCleanChunks() {
+ return g.Some(some)
}
return
}
next: begin,
end: end,
}
+ if ws.hasOverlappingRequests(begin, end) {
+ if webseed.PrintDebug {
+ fmt.Printf("webseedPeer.spawnRequest: overlapping request for %v[%v-%v)\n", ws.peer.t.name(), begin, end)
+ }
+ ws.peer.t.cl.dumpCurrentWebseedRequests()
+ }
ws.activeRequests[&wsReq] = struct{}{}
ws.peer.updateExpectingChunks()
panicif.Zero(ws.hostKey)
go ws.runRequest(&wsReq)
}
+func (me *webseedPeer) hasOverlappingRequests(begin, end RequestIndex) bool {
+ for req := range me.activeRequests {
+ if req.cancelled.Load() {
+ continue
+ }
+ if begin < req.end && end >= req.begin {
+ return true
+ }
+ }
+ return false
+}
+
func readChunksErrorLevel(err error, req *webseedRequest) slog.Level {
if req.cancelled.Load() {
return slog.LevelDebug
locker := ws.locker
err := ws.readChunks(webseedRequest)
if webseed.PrintDebug && webseedRequest.next < webseedRequest.end {
- fmt.Printf("webseed peer stopped reading chunks early: %v\n", err)
+ fmt.Printf("webseed peer request %v in %v stopped reading chunks early: %v\n", webseedRequest, ws.peer.t.name(), err)
+ if err != nil {
+ fmt.Printf("error type: %T\n", err)
+ }
+ if err == nil {
+ ws.peer.t.cl.dumpCurrentWebseedRequests()
+ }
}
// Ensure the body reader and response are closed.
webseedRequest.Close()
err = fmt.Errorf("processing chunk: %w", err)
}
if stop {
+ // TODO: Keep reading until the buffer is drained.
return
}
}
"github.com/anacrolix/torrent/webseed"
)
-const defaultRequestsPerWebseedHost = 5
+const defaultRequestsPerWebseedHost = 10
type (
webseedHostKey string
*/
func (cl *Client) globalUpdateWebSeedRequests() {
type aprioriMapValue struct {
+ // Change to request index?
startOffset int64
webseedRequestOrderValue
}
aprioriMap := make(map[aprioriWebseedRequestKey]aprioriMapValue)
- for uniqueKey, value := range cl.iterWebseed() {
+ for uniqueKey, value := range cl.iterPossibleWebseedRequests() {
cur, ok := aprioriMap[uniqueKey.aprioriWebseedRequestKey]
- // Set the webseed request if it doesn't exist, or if the new one has a higher priority or
- // starts earlier in the file.
- if !ok || cmp.Or(
- cmp.Compare(value.priority, cur.priority),
- cmp.Compare(cur.startOffset, uniqueKey.startOffset),
- ) > 0 {
- aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
+ if ok {
+ // Shared in the lookup above.
+ t := uniqueKey.t
+ hasPeerConnRequest := func(offset int64) bool {
+ reqIndex := t.getRequestIndexContainingOffset(offset)
+ return t.requestingPeer(reqIndex) != nil
+ }
+ // Skip the webseed request unless it has a higher priority, is less requested by peer
+ // conns, or has a lower start offset. Including peer conn requests here will bump
+ // webseed requests in favour of peer conns unless there's nothing else to do.
+ if cmp.Or(
+ cmp.Compare(value.priority, cur.priority),
+ compareBool(hasPeerConnRequest(cur.startOffset), hasPeerConnRequest(uniqueKey.startOffset)),
+ cmp.Compare(cur.startOffset, uniqueKey.startOffset),
+ ) <= 0 {
+ continue
+ }
}
+ aprioriMap[uniqueKey.aprioriWebseedRequestKey] = aprioriMapValue{uniqueKey.startOffset, value}
}
existingRequests := maps.Collect(cl.iterCurrentWebseedRequests())
// We don't need the value but maybe cloning is just faster anyway?
panicif.NotEq(peer.hostKey, costKey)
printPlan()
begin := t.getRequestIndexContainingOffset(requestKey.startOffset)
+ // TODO: Find an actual end, so we don't lose lots of data when requests are cancelled.
end := t.endRequestIndexForFileIndex(requestKey.fileIndex)
panicif.Eq(begin, end)
peer.spawnRequest(begin, end)
}
}
+func (cl *Client) dumpCurrentWebseedRequests() {
+ if webseed.PrintDebug {
+ fmt.Println("current webseed requests:")
+ for key, value := range cl.iterCurrentWebseedRequests() {
+ fmt.Printf("\t%v: %v, priority %v\n", key.filePath(), value.existingWebseedRequest, value.priority)
+ }
+ }
+}
+
type webseedRequestPlan struct {
byCost map[webseedHostKeyHandle][]webseedUniqueRequestKey
}
url webseedUrlKey
}
-func (me aprioriWebseedRequestKey) String() string {
- return fmt.Sprintf("%v from %v", me.t.Files()[me.fileIndex].Path(), me.url)
+func (me *aprioriWebseedRequestKey) filePath() string {
+ return me.t.Files()[me.fileIndex].Path()
+}
+
+func (me *aprioriWebseedRequestKey) String() string {
+ return fmt.Sprintf("%v from %v", me.filePath(), me.url)
}
// Distinct webseed request when different offsets to the same object are allowed.
// Non-distinct proposed webseed request data.
type webseedRequestOrderValue struct {
- priority PiecePriority
+ // The associated webseed request per host limit.
+ costKey webseedHostKeyHandle
// Used for cancellation if this is deprioritized. Also, a faster way to sort for existing
// requests.
existingWebseedRequest *webseedRequest
- // The associated webseed request per host limit.
- costKey webseedHostKeyHandle
+ priority PiecePriority
}
func (me webseedRequestOrderValue) String() string {
}
// Yields possible webseed requests by piece. Caller should filter and prioritize these.
-func (cl *Client) iterWebseed() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
+func (cl *Client) iterPossibleWebseedRequests() iter.Seq2[webseedUniqueRequestKey, webseedRequestOrderValue] {
return func(yield func(webseedUniqueRequestKey, webseedRequestOrderValue) bool) {
for key, value := range cl.pieceRequestOrder {
input := key.getRequestStrategyInput(cl)