package torrent
import (
+ "context"
"fmt"
"io"
"log/slog"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/internal/alloclim"
Discovery PeerSource
trusted bool
closed chansync.SetOnce
+ closedCtx context.Context
+ closedCtxCancel context.CancelFunc
lastUsefulChunkReceived time.Time
lastStartedExpectingToReceiveChunks time.Time
if !p.closed.Set() {
return
}
+ // Not set until Torrent is known.
+ if p.closedCtx != nil {
+ p.closedCtxCancel()
+ }
if p.updateRequestsTimer != nil {
p.updateRequestsTimer.Stop()
}
p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData)
}
}
+
+func (p *Peer) initClosedCtx() {
+ panicif.NotNil(p.closedCtx)
+ p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx)
+}
import (
"bufio"
"bytes"
- "context"
"errors"
"fmt"
"io"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/multiless"
"golang.org/x/time/rate"
// we can verify all the pieces for a file when they're all arrived before submitting them to
// the torrent.
receivedHashPieces map[[32]byte][][32]byte
+
+ peerRequestServerRunning bool
}
func (cn *PeerConn) lastWriteUploadRate() float64 {
}
c.peerRequests[r] = value
if startFetch {
- // TODO: Limit peer request data read concurrency.
- go c.peerRequestDataReader(r, value)
+ c.startPeerRequestServer()
}
return nil
}
-func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
+func (c *PeerConn) startPeerRequestServer() {
+ if !c.peerRequestServerRunning {
+ go c.peerRequestServer()
+ c.peerRequestServerRunning = true
+ }
+}
+
+func (c *PeerConn) peerRequestServer() {
+again:
+ c.locker().Lock()
+ if !c.closed.IsSet() {
+ for r, state := range c.peerRequests {
+ if state.data == nil {
+ c.locker().Unlock()
+ c.servePeerRequest(r, state)
+ goto again
+ }
+ }
+ }
+ panicif.False(c.peerRequestServerRunning)
+ c.peerRequestServerRunning = false
+ c.locker().Unlock()
+ return
+}
+
+// TODO: Return an error then let caller filter on conditions.
+func (c *PeerConn) servePeerRequest(r Request, prs *peerRequestState) {
// Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
// or fail to read and then cleanup. Also, we used to hang here if the reservation was never
// dropped, that was fixed.
- ctx := context.Background()
+ ctx := c.closedCtx
err := prs.allocReservation.Wait(ctx)
if err != nil {
c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
b, err := c.readPeerRequestData(r)
c.locker().Lock()
defer c.locker().Unlock()
+ // This function should remove work from peerRequests so peerRequestServer does not stall.
+ defer func() {
+ panicif.True(MapContains(c.peerRequests, r) && prs.data == nil)
+ }()
if err != nil {
c.peerRequestDataReadFailed(err, r)
} else {
}
func (c *PeerConn) setTorrent(t *Torrent) {
- if c.t != nil {
- panic("connection already associated with a torrent")
- }
+ panicif.NotNil(c.t)
c.t = t
+ c.initClosedCtx()
c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
c.setPeerLoggers(t.logger, t.slogger())
t.reconcileHandshakeStats(c.peerPtr())