]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement multiple chunk reads for webseed
authorMatt Joiner <anacrolix@gmail.com>
Fri, 16 May 2025 05:19:24 +0000 (15:19 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 16 May 2025 05:19:24 +0000 (15:19 +1000)
12 files changed:
config.go
math.go
misc.go
peer-impl.go
peer.go
peerconn.go
request-strategy-impls_test.go
requesting.go
torrent.go
webseed-peer.go
webseed-request.go
webseed/client.go

index 2955f555a3ec00a74ea467c5b70f735f53788afd..e07278657f2c9263c45bfa4615cf70bc2ecb7401 100644 (file)
--- a/config.go
+++ b/config.go
@@ -152,7 +152,8 @@ type ClientConfig struct {
        // How long between writes before sending a keep alive message on a peer connection that we want
        // to maintain.
        KeepAliveTimeout time.Duration
-       // Maximum bytes to buffer per peer connection for peer request data before it is sent.
+       // Maximum bytes to buffer per peer connection for peer request data before it is sent. This
+       // must be >= the request chunk size from peers.
        MaxAllocPeerRequestDataPerConn int
 
        // The IP addresses as our peers should see them. May differ from the
diff --git a/math.go b/math.go
index 10cbafc73d7b9237c864a6f3c625bf9c74366b21..cd733e826249c601063df2a4fa6528ee72df2047 100644 (file)
--- a/math.go
+++ b/math.go
@@ -1 +1,11 @@
 package torrent
+
+import (
+       "golang.org/x/exp/constraints"
+)
+
+func intCeilDiv[T constraints.Integer](a, b T) T {
+       // This still sux for negative numbers due to truncating division. But I don't know that we need
+       // or ceil division makes sense for negative numbers.
+       return (a + b - 1) / b
+}
diff --git a/misc.go b/misc.go
index 12a454de1deccf7593f67556d47a897be352207f..ba4597ed7cdc86e4be713431917042bfbbb3d45b 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -125,26 +125,6 @@ func connIsIpv6(nc interface {
        return rip.To4() == nil && rip.To16() != nil
 }
 
-func maxInt(as ...int) int {
-       ret := as[0]
-       for _, a := range as[1:] {
-               if a > ret {
-                       ret = a
-               }
-       }
-       return ret
-}
-
-func minInt(as ...int) int {
-       ret := as[0]
-       for _, a := range as[1:] {
-               if a < ret {
-                       ret = a
-               }
-       }
-       return ret
-}
-
 var unlimited = rate.NewLimiter(rate.Inf, 0)
 
 type (
index d23ad6d8926dde84304e3b82f08f6081ac983ecd..f3e13077fb0b1d6da4678fbd099c2a55dd7d4eff 100644 (file)
@@ -6,7 +6,7 @@ import (
        "github.com/anacrolix/torrent/metainfo"
 )
 
-// Contains implementation details that differ between peer types, like Webseeds and regular
+// Contains implementation details that differ between peer types, like WebSeeds and regular
 // BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy
 // expectations that they exist on the child type. Some methods are underlined to avoid collisions
 // with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to
@@ -46,4 +46,6 @@ type legacyPeerImpl interface {
 // Abstract methods implemented by subclasses of Peer.
 type newHotPeerImpl interface {
        lastWriteUploadRate() float64
+       // How many requests should be assigned to the peer.
+       nominalMaxRequests() maxRequests
 }
diff --git a/peer.go b/peer.go
index 2f4a9ea7b762ef7232b521b4ef0ae05a2f0d4556..113dcb31178235f547f34a709e83efca334c4306 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -58,7 +58,8 @@ type (
                lastChunkSent           time.Time
 
                // Stuff controlled by the local peer.
-               needRequestUpdate    updateRequestReason
+               needRequestUpdate updateRequestReason
+               // TODO: How are pending cancels handled for webseed peers?
                requestState         request_strategy.PeerRequestState
                updateRequestsTimer  *time.Timer
                lastRequestUpdate    time.Time
@@ -322,7 +323,7 @@ func (cn *Peer) writeStatus(w io.Writer) {
                &cn._stats.ChunksWritten,
                cn.requestState.Requests.GetCardinality(),
                cn.requestState.Cancelled.GetCardinality(),
-               cn.nominalMaxRequests(),
+               cn.peerImpl.nominalMaxRequests(),
                cn.PeerMaxRequests,
                len(cn.peerRequests),
                localClientReqq,
@@ -387,7 +388,7 @@ var (
 )
 
 // The actual value to use as the maximum outbound requests.
-func (cn *Peer) nominalMaxRequests() maxRequests {
+func (cn *PeerConn) nominalMaxRequests() maxRequests {
        // TODO: This should differ for webseeds...
        return max(1, min(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
 }
@@ -455,7 +456,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        return nil
 }
 
-func (cn *PeerConn) mustRequest(r RequestIndex) bool {
+func (cn *Peer) mustRequest(r RequestIndex) bool {
        more, err := cn.request(r)
        if err != nil {
                panic(err)
@@ -463,14 +464,14 @@ func (cn *PeerConn) mustRequest(r RequestIndex) bool {
        return more
 }
 
-func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
+func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
        if cn.requestState.Requests.Contains(r) {
                return true, nil
        }
-       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
        cn.requestState.Requests.Add(r)
@@ -479,13 +480,13 @@ func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
        }
        cn.validReceiveChunks[r]++
        cn.t.requestState[r] = requestState{
-               peer: &cn.Peer,
+               peer: cn,
                when: time.Now(),
        }
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
-               f(PeerRequestEvent{&cn.Peer, ppReq})
+               f(PeerRequestEvent{cn, ppReq})
        }
        return cn.legacyPeerImpl._request(ppReq), nil
 }
index 94ec1f4e085090974bdc98a83ab1478a6f7b44e7..420af5be5b0e96abf41338828ab8c0251b0ca3aa 100644 (file)
@@ -325,8 +325,11 @@ func (me *PeerConn) _request(r Request) bool {
        })
 }
 
-func (me *PeerConn) _cancel(r RequestIndex) bool {
+func (me *PeerConn) handleCancel(r RequestIndex) {
        me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+}
+
+func (me *PeerConn) acksCancels() bool {
        return me.remoteRejectsCancels()
 }
 
@@ -849,9 +852,8 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.Piece:
                        c.doChunkReadStats(int64(len(msg.Piece)))
                        err = c.receiveChunk(&msg)
-                       if len(msg.Piece) == int(t.chunkSize) {
-                               t.chunkPool.Put(&msg.Piece)
-                       }
+                       t.putChunkBuffer(msg.Piece)
+                       msg.Piece = nil
                        if err != nil {
                                err = fmt.Errorf("receiving chunk: %w", err)
                        }
@@ -1446,3 +1448,7 @@ func hashRequestFromMessage(m pp.Message) hashRequest {
                proofLayers: m.ProofLayers,
        }
 }
+
+func (me *PeerConn) peerPtr() *Peer {
+       return &me.Peer
+}
index daa5221754f1658226b0cb55aae30a551a26a64f..3940da0e084cbfb98927f58872292fa313ad32aa 100644 (file)
@@ -129,7 +129,7 @@ func BenchmarkRequestStrategy(b *testing.B) {
                        tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
                        // End of part that should be timed.
                        remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
-                       c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
+                       c.Assert(rs.Requests.requestIndexes, qt.HasLen, min(
                                remainingChunks,
                                int(cl.config.MaxUnverifiedBytes/chunkSize)))
                }
index 958e11b887c5e8685591c29e4a03b6eb42487537..8680665de0c7f626f6204dd70af1e66d593997a9 100644 (file)
@@ -184,7 +184,7 @@ func (t *Torrent) getRequestablePieces(f requestStrategy.RequestPieceFunc) {
 // This gets the best-case request state. That means handling pieces limited by capacity, preferring
 // earlier pieces, low availability etc. It pays no attention to existing requests on the peer or
 // other peers. Those are handled later.
-func (p *PeerConn) getDesiredRequestState() (desired desiredRequestState) {
+func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
        t := p.t
        if !t.haveInfo() {
                return
@@ -257,7 +257,9 @@ func (p *Peer) maybeUpdateActualRequestState() {
                        panic(since)
                }
        }
-       p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
+       if p.t.cl.config.Debug {
+               p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
+       }
        pprof.Do(
                context.Background(),
                pprof.Labels("update request", string(p.needRequestUpdate)),
@@ -304,7 +306,7 @@ func (p *Peer) allowSendNotInterested() bool {
 
 // Transmit/action the request state to the peer. This includes work-stealing from other peers and
 // some piece order randomization within the preferred state calculated earlier in next.
-func (p *PeerConn) applyRequestState(next desiredRequestState) {
+func (p *Peer) applyRequestState(next desiredRequestState) {
        current := &p.requestState
        // Make interest sticky
        if !next.Interested && p.requestState.Interested {
@@ -330,7 +332,7 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) {
                        break
                }
                numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
-               if numPending >= p.nominalMaxRequests() {
+               if numPending >= p.peerImpl.nominalMaxRequests() {
                        break
                }
                req := heap.Pop(requestHeap)
@@ -338,8 +340,8 @@ func (p *PeerConn) applyRequestState(next desiredRequestState) {
                        panic("changed")
                }
 
-               // don't add requests on reciept of a reject - because this causes request back
-               // to potentially permanently unresponive peers - which just adds network noise.  If
+               // don't add requests on receipt of a reject - because this causes request back
+               // to potentially permanently unresponsive peers - which just adds network noise.  If
                // the peer can handle more requests it will send an "unchoked" message - which
                // will cause it to get added back to the request queue
                if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
index 5fbec4b3f9ca4dc4d025d5775b9835429046fa9a..da5b69554d2dd2de0084aa3f97439cfc55fa2a06 100644 (file)
@@ -1074,6 +1074,16 @@ func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
        }
 }
 
+// The whole-torrent first byte position.
+func (t *Torrent) requestIndexBegin(r RequestIndex) int64 {
+       return t.requestOffset(t.requestIndexToRequest(r))
+}
+
+func (t *Torrent) requestIndexEnd(r RequestIndex) int64 {
+       req := t.requestIndexToRequest(r)
+       return t.requestOffset(req) + int64(req.Length)
+}
+
 func (t *Torrent) requestOffset(r Request) int64 {
        return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
 }
@@ -3376,3 +3386,28 @@ func (t *Torrent) Complete() chansync.ReadOnlyFlag {
 func (t *Torrent) slogger() *slog.Logger {
        return t._slogger
 }
+
+// Get a chunk buffer from the pool. It should be returned when it's no longer in use. Do we
+// waste an allocation if we throw away the pointer it was stored with?
+func (t *Torrent) getChunkBuffer() []byte {
+       return *t.chunkPool.Get().(*[]byte)
+}
+
+func (t *Torrent) putChunkBuffer(b []byte) {
+       panicif.NotEq(cap(b), t.chunkSize.Int())
+       // Does this allocate? Are we amortizing against the cost of a large buffer?
+       t.chunkPool.Put(&b)
+}
+
+func (t *Torrent) withSlogger(base *slog.Logger) *slog.Logger {
+       return base.With(slog.Group(
+               "torrent",
+               "name", lazyLogValuer(func() any {
+                       opt := t.bestName()
+                       if opt.Ok {
+                               return opt.Value
+                       }
+                       return nil
+               }),
+               "ih", *t.canonicalShortInfohash()))
+}
index 8804137afe7e34f24ad8b525e72e6df135ccb907..95fc2157620fd534df4d2896278e29b09e62f3db 100644 (file)
@@ -4,39 +4,47 @@ import (
        "context"
        "errors"
        "fmt"
+       "io"
        "iter"
-       "math/rand"
+       "log/slog"
+       "slices"
        "sync"
        "time"
 
        "github.com/RoaringBitmap/roaring"
-       "github.com/anacrolix/log"
+       g "github.com/anacrolix/generics"
 
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
 )
 
-const (
-       webseedPeerCloseOnUnhandledError = false
-)
-
 type webseedPeer struct {
        // First field for stats alignment.
        peer             Peer
        client           webseed.Client
-       activeRequests   map[Request]webseed.Request
+       activeRequests   map[*webseedRequest]struct{}
        locker           sync.Locker
        lastUnhandledErr time.Time
 }
 
+func (me *webseedPeer) nominalMaxRequests() maxRequests {
+       // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
+       // we just allow 2 MiB worth of requests.
+       return intCeilDiv(2<<20, me.peer.t.chunkSize.Int())
+}
+
 func (me *webseedPeer) acksCancels() bool {
        return false
 }
 
 func (me *webseedPeer) updateRequests() {
-       //TODO implement me
-       panic("implement me")
+       p := &me.peer
+       next := p.getDesiredRequestState()
+       p.applyRequestState(next)
+       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
+       // Run this after all requests applied to the peer, so they can be batched up.
+       me.spawnRequests()
 }
 
 func (me *webseedPeer) lastWriteUploadRate() float64 {
@@ -71,51 +79,83 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) _cancel(r RequestIndex) bool {
-       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
-               active.Cancel()
-               // The requester is running and will handle the result.
-               return true
+func (ws *webseedPeer) handleCancel(r RequestIndex) {
+       for wr := range ws.activeRequestsForIndex(r) {
+               wr.request.Cancel()
        }
-       // There should be no requester handling this, so no further events will occur.
-       return false
 }
 
-func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
-       return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+func (ws *webseedPeer) activeRequestsForIndex(r RequestIndex) iter.Seq[*webseedRequest] {
+       return func(yield func(*webseedRequest) bool) {
+               for wr := range ws.activeRequests {
+                       if r < wr.next || r >= wr.end {
+                               continue
+                       }
+                       if !yield(wr) {
+                               return
+                       }
+               }
+       }
+}
+
+func (ws *webseedPeer) requestIndexTorrentOffset(r RequestIndex) int64 {
+       return ws.peer.t.requestIndexBegin(r)
+}
+
+func (ws *webseedPeer) intoSpec(begin, end RequestIndex) webseed.RequestSpec {
+       t := ws.peer.t
+       start := t.requestIndexBegin(begin)
+       endOff := t.requestIndexEnd(end - 1)
+       return webseed.RequestSpec{start, endOff - start}
 }
 
 func (ws *webseedPeer) _request(r Request) bool {
-       ws.spawnRequests()
        return true
 }
 
-func (ws *webseedPeer) spawnReq(r Request) {
-       webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
-       ws.activeRequests[r] = webseedRequest
-       go ws.doRequest(r, webseedRequest)
+func (ws *webseedPeer) spawnRequest(begin, end RequestIndex) {
+       extWsReq := ws.client.StartNewRequest(ws.intoSpec(begin, end))
+       wsReq := webseedRequest{
+               request: extWsReq,
+               begin:   begin,
+               next:    begin,
+               end:     end,
+       }
+       ws.activeRequests[&wsReq] = struct{}{}
+       ws.peer.logger.Slogger().Debug(
+               "starting webseed request",
+               "begin", begin,
+               "end", end,
+               "len", end-begin,
+               "avail", ws.peer.requestState.Requests.GetCardinality())
+       go ws.runRequest(&wsReq)
 }
 
-func (ws *webseedPeer) doRequest(r Request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) runRequest(webseedRequest *webseedRequest) {
        locker := ws.locker
-       err := ws.requestResultHandler(r, webseedRequest)
+       err := ws.readChunks(webseedRequest)
+       // Ensure the body reader and response are closed.
+       webseedRequest.request.Cancel()
+       locker.Lock()
        if err != nil {
-               level := log.Warning
+               level := slog.LevelWarn
                if errors.Is(err, context.Canceled) {
-                       level = log.Debug
-               }
-               ws.peer.logger.Levelf(level, "error doing webseed request %v: %v", r, err)
-               // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
-               // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
-               // it doesn't hurt to slow a few down if there are issues.
-               select {
-               case <-ws.peer.closed.Done():
-               case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+                       level = slog.LevelDebug
+               } else {
+                       panic(err)
                }
+               ws.slogger().Log(context.TODO(), level, "webseed request error", "err", err)
+               //      // This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
+               //      // kind of error. There are maxRequests (in Torrent.addWebSeed) requesters bouncing around
+               //      // it doesn't hurt to slow a few down if there are issues.
+               //      select {
+               //      case <-ws.peer.closed.Done():
+               //      case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
+               //      }
        }
-       locker.Lock()
+       //locker.Lock()
        // Delete this entry after waiting above on an error, to prevent more requests.
-       delete(ws.activeRequests, r)
+       delete(ws.activeRequests, webseedRequest)
        if err != nil {
                ws.peer.onNeedUpdateRequests("webseedPeer request errored")
        }
@@ -131,19 +171,65 @@ func (ws *webseedPeer) spawnRequests() {
                if !ok {
                        break
                }
-               ws.spawnReq(req)
+               end := seqLast(ws.iterConsecutiveInactiveRequests(req)).Unwrap()
+               ws.spawnRequest(req, end+1)
+       }
+}
+
+// Returns Some of the last item in a iter.Seq, or None if the sequence is empty.
+func seqLast[V any](seq iter.Seq[V]) (last g.Option[V]) {
+       for item := range seq {
+               last.Set(item)
+       }
+       return
+}
+
+func (ws *webseedPeer) iterConsecutiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
+       return func(yield func(RequestIndex) bool) {
+               for {
+                       if !ws.peer.requestState.Requests.Contains(begin) {
+                               return
+                       }
+                       if !yield(begin) {
+                               return
+                       }
+                       begin++
+               }
        }
 }
 
-func (ws *webseedPeer) inactiveRequests() iter.Seq[Request] {
-       return func(yield func(Request) bool) {
+func (ws *webseedPeer) iterConsecutiveInactiveRequests(begin RequestIndex) iter.Seq[RequestIndex] {
+       return func(yield func(RequestIndex) bool) {
+               for req := range ws.iterConsecutiveRequests(begin) {
+                       if !ws.inactiveRequestIndex(req) {
+                               return
+                       }
+                       if !yield(req) {
+                               return
+                       }
+               }
+       }
+}
+
+func (ws *webseedPeer) inactiveRequestIndex(index RequestIndex) bool {
+       for range ws.activeRequestsForIndex(index) {
+               return false
+       }
+       return true
+}
+
+func (ws *webseedPeer) inactiveRequests() iter.Seq[RequestIndex] {
+       return func(yield func(RequestIndex) bool) {
+               sorted := slices.Sorted(ws.peer.requestState.Requests.Iterator())
+               if len(sorted) != 0 {
+                       fmt.Println("inactiveRequests", sorted)
+               }
                for reqIndex := range ws.peer.requestState.Requests.Iterator() {
-                       r := ws.peer.t.requestIndexToRequest(reqIndex)
-                       _, ok := ws.activeRequests[r]
-                       if !ok {
-                               if !yield(r) {
-                                       return
-                               }
+                       if !ws.inactiveRequestIndex(reqIndex) {
+                               continue
+                       }
+                       if !yield(reqIndex) {
+                               return
                        }
                }
        }
@@ -171,7 +257,6 @@ func (ws *webseedPeer) handleOnNeedUpdateRequests() {
 }
 
 func (ws *webseedPeer) onClose() {
-       ws.peer.logger.Levelf(log.Debug, "closing")
        // Just deleting them means we would have to manually cancel active requests.
        ws.peer.cancelAllRequests()
        ws.peer.t.iterPeers(func(p *Peer) {
@@ -181,59 +266,81 @@ func (ws *webseedPeer) onClose() {
        })
 }
 
-func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
-       result := <-webseedRequest.Result
-       close(webseedRequest.Result) // one-shot
-       // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
-       // sure if we can divine which errors indicate cancellation on our end without hitting the
-       // network though.
-       if len(result.Bytes) != 0 || result.Err == nil {
-               // Increment ChunksRead and friends
-               ws.peer.doChunkReadStats(int64(len(result.Bytes)))
-       }
-       ws.peer.readBytes(int64(len(result.Bytes)))
-       ws.peer.t.cl.lock()
-       defer ws.peer.t.cl.unlock()
-       if ws.peer.t.closed.IsSet() {
-               return nil
-       }
-       err := result.Err
-       if err != nil {
-               switch {
-               case errors.Is(err, context.Canceled):
-               case errors.Is(err, webseed.ErrTooFast):
-               case ws.peer.closed.IsSet():
-               default:
-                       ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
-                       // // Here lies my attempt to extract something concrete from Go's error system. RIP.
-                       // cfg := spew.NewDefaultConfig()
-                       // cfg.DisableMethods = true
-                       // cfg.Dump(result.Err)
-
-                       if webseedPeerCloseOnUnhandledError {
-                               log.Printf("closing %v", ws)
-                               ws.peer.close()
-                       } else {
-                               ws.lastUnhandledErr = time.Now()
-                       }
+func (ws *webseedPeer) readChunks(wr *webseedRequest) (err error) {
+       t := ws.peer.t
+       buf := t.getChunkBuffer()
+       defer t.putChunkBuffer(buf)
+       for ; wr.next < wr.end; wr.next++ {
+               reqSpec := t.requestIndexToRequest(wr.next)
+               chunkLen := reqSpec.Length.Int()
+               buf = buf[:chunkLen]
+               var n int
+               n, err = io.ReadFull(wr.request.Body, buf)
+               ws.peer.readBytes(int64(n))
+               if err != nil {
+                       err = fmt.Errorf("reading chunk: %w", err)
+                       return
                }
-               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
-                       panic("invalid reject")
+               ws.peer.doChunkReadStats(int64(chunkLen))
+               ws.peer.locker().Lock()
+               err = ws.peer.receiveChunk(&pp.Message{
+                       Type:  pp.Piece,
+                       Piece: buf,
+                       Index: reqSpec.Index,
+                       Begin: reqSpec.Begin,
+               })
+               ws.peer.locker().Unlock()
+               if err != nil {
+                       err = fmt.Errorf("processing chunk: %w", err)
+                       return
                }
-               return err
        }
-       err = ws.peer.receiveChunk(&pp.Message{
-               Type:  pp.Piece,
-               Index: r.Index,
-               Begin: r.Begin,
-               Piece: result.Bytes,
-       })
-       if err != nil {
-               panic(err)
-       }
-       return err
+       return
 }
 
+//
+//func (ws *webseedPeer) requestResultHandler(wr *webseedRequest) (err error) {
+//     err = ws.readChunks(wr)
+//     switch {
+//     case err == nil:
+//     case ws.peer.closed.IsSet():
+//     case errors.Is(err, context.Canceled):
+//     case errors.Is(err, webseed.ErrTooFast):
+//     default:
+//
+//     }
+//     ws.peer.t.cl.lock()
+//     defer ws.peer.t.cl.unlock()
+//     if ws.peer.t.closed.IsSet() {
+//             return nil
+//     }
+//     if err != nil {
+//             switch {
+//             case errors.Is(err, context.Canceled):
+//             case errors.Is(err, webseed.ErrTooFast):
+//             case ws.peer.closed.IsSet():
+//             default:
+//                     ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+//                     // // Here lies my attempt to extract something concrete from Go's error system. RIP.
+//                     // cfg := spew.NewDefaultConfig()
+//                     // cfg.DisableMethods = true
+//                     // cfg.Dump(result.Err)
+//
+//                     if webseedPeerCloseOnUnhandledError {
+//                             log.Printf("closing %v", ws)
+//                             ws.peer.close()
+//                     } else {
+//                             ws.lastUnhandledErr = time.Now()
+//                     }
+//             }
+//             if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+//                     panic("invalid reject")
+//             }
+//             return err
+//     }
+//     return err
+//}
+
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
        return &me.client.Pieces
 }
@@ -244,3 +351,7 @@ func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
        }
        return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
 }
+
+func (me *webseedPeer) slogger() *slog.Logger {
+       return me.peer.logger.Slogger()
+}
index 10cbafc73d7b9237c864a6f3c625bf9c74366b21..0f8440dbe3d5bdad3f4b2e567f3be04a4ceb9c5f 100644 (file)
@@ -1 +1,16 @@
 package torrent
+
+import (
+       "github.com/anacrolix/torrent/webseed"
+)
+
+// A wrapper around webseed.Request with extra state for webseedPeer.
+type webseedRequest struct {
+       request webseed.Request
+       // First assigned in the range.
+       begin RequestIndex
+       // The next to be read.
+       next RequestIndex
+       // One greater than the end of the range.
+       end RequestIndex
+}
index 9e9a96ee9e29cdd4bf8ed0c829c8087cf0356580..36f2f569cc860c24c6cb5fb95042eef266fa110a 100644 (file)
@@ -1,7 +1,6 @@
 package webseed
 
 import (
-       "bytes"
        "context"
        "errors"
        "fmt"
@@ -11,6 +10,7 @@ import (
        "strings"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/missinggo/v2/panicif"
 
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/segments"
@@ -28,7 +28,9 @@ type requestPart struct {
 
 type Request struct {
        cancel func()
-       Result chan RequestResult
+       // Closed in the machinery when cancelled?
+       Body io.Reader
+       err  chan error
 }
 
 func (r Request) Cancel() {
@@ -47,7 +49,8 @@ type Client struct {
        // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
        // level. We can map our file-level adjustments to the pieces here. This probably need to be
        // private in the future, if Client ever starts removing pieces.
-       Pieces              roaring.Bitmap
+       Pieces roaring.Bitmap
+       // This wraps http.Response bodies, for example to limit the download rate.
        ResponseBodyWrapper ResponseBodyWrapper
        PathEscaper         PathEscaper
 }
@@ -95,16 +98,14 @@ func (ws *Client) StartNewRequest(r RequestSpec) Request {
        }) {
                panic("request out of file bounds")
        }
+       body, w := io.Pipe()
        req := Request{
                cancel: cancel,
-               Result: make(chan RequestResult, 1),
+               Body:   body,
        }
        go func() {
-               b, err := readRequestPartResponses(ctx, requestParts)
-               req.Result <- RequestResult{
-                       Bytes: b,
-                       Err:   err,
-               }
+               err := readRequestPartResponses(ctx, w, requestParts)
+               panicif.Err(w.CloseWithError(err))
        }()
        return req
 }
@@ -118,7 +119,8 @@ func (me ErrBadResponse) Error() string {
        return me.Msg
 }
 
-func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error {
+// Reads the part in full. All expected bytes must be returned or there will an error returned.
+func recvPartResult(ctx context.Context, w io.Writer, part requestPart, resp *http.Response) error {
        defer resp.Body.Close()
        var body io.Reader = resp.Body
        if part.responseBodyWrapper != nil {
@@ -131,7 +133,7 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
        }
        switch resp.StatusCode {
        case http.StatusPartialContent:
-               copied, err := io.Copy(buf, body)
+               copied, err := io.Copy(w, body)
                if err != nil {
                        return err
                }
@@ -144,30 +146,33 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
                // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
                // archive.org might be using a webserver implementation that refuses to do partial
                // responses to small files.
-               if part.e.Start < 48<<10 {
-                       if part.e.Start != 0 {
-                               log.Printf("resp status ok but requested range [url=%q, range=%q]",
-                                       part.req.URL,
-                                       part.req.Header.Get("Range"))
-                       }
-                       // Instead of discarding, we could try receiving all the chunks present in the response
-                       // body. I don't know how one would handle multiple chunk requests resulting in an OK
-                       // response for the same file. The request algorithm might be need to be smarter for
-                       // that.
-                       discarded, _ := io.CopyN(io.Discard, body, part.e.Start)
-                       if discarded != 0 {
-                               log.Printf("discarded %v bytes in webseed request response part", discarded)
-                       }
-                       _, err := io.CopyN(buf, body, part.e.Length)
-                       return err
-               } else {
+               discard := part.e.Start
+               if discard > 48<<10 {
                        return ErrBadResponse{"resp status ok but requested range", resp}
                }
+               if discard != 0 {
+                       log.Printf("resp status ok but requested range [url=%q, range=%q]",
+                               part.req.URL,
+                               part.req.Header.Get("Range"))
+               }
+               // Instead of discarding, we could try receiving all the chunks present in the response
+               // body. I don't know how one would handle multiple chunk requests resulting in an OK
+               // response for the same file. The request algorithm might be need to be smarter for that.
+               discarded, err := io.CopyN(io.Discard, body, discard)
+               if err != nil {
+                       return fmt.Errorf("error discarding bytes from http ok response: %w", err)
+               }
+               panicif.NotEq(discarded, discard)
+               // Because the reply is not a partial aware response, we limit the body reader
+               // intentionally.
+               _, err = io.CopyN(w, body, part.e.Length)
+               return err
        case http.StatusServiceUnavailable:
                return ErrTooFast
        default:
+               // TODO: Could we have a slog.Valuer or something to allow callers to unpack reasonable values?
                return ErrBadResponse{
-                       fmt.Sprintf("unhandled response status code (%v)", resp.StatusCode),
+                       fmt.Sprintf("unhandled response status code (%v)", resp.Status),
                        resp,
                }
        }
@@ -175,20 +180,17 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *
 
 var ErrTooFast = errors.New("making requests too fast")
 
-func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
-       var buf bytes.Buffer
+func readRequestPartResponses(ctx context.Context, w io.Writer, parts []requestPart) (err error) {
        for _, part := range parts {
                var resp *http.Response
                resp, err = part.do()
-
                if err == nil {
-                       err = recvPartResult(ctx, &buf, part, resp)
+                       err = recvPartResult(ctx, w, part, resp)
                }
-
                if err != nil {
                        err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
                        break
                }
        }
-       return buf.Bytes(), err
+       return
 }