"sync"
"github.com/anacrolix/chansync"
+ "github.com/anacrolix/log"
)
type Reservation struct {
func (me *Reservation) Wait(ctx context.Context) error {
if me.n > me.l.Max {
- return fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max)
+ return log.WithLevel(
+ log.Warning,
+ fmt.Errorf("reservation for %v exceeds limiter max %v", me.n, me.l.Max),
+ )
}
select {
case <-ctx.Done():
}
func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
- b, err := c.readPeerRequestData(r, prs)
+ // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
+ // or fail to read and then cleanup. Also, we used to hang here if the reservation was never
+ // dropped, that was fixed.
+ ctx := context.Background()
+ err := prs.allocReservation.Wait(ctx)
+ if err != nil {
+ c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
+ return
+ }
+ b, err := c.readPeerRequestData(r)
c.locker().Lock()
defer c.locker().Unlock()
if err != nil {
// https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313.
logLevel = log.Debug
}
- c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err)
+ c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err)
if c.t.closed.IsSet() {
return
}
}
}
-func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
- // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
- // or fail to read and then cleanup.
- ctx := context.Background()
- err := prs.allocReservation.Wait(ctx)
- if err != nil {
- if ctx.Err() == nil {
- // The error is from the reservation itself. Something is very broken, or we're not
- // guarding against excessively large requests.
- err = log.WithLevel(log.Critical, err)
- }
- err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
- return nil, err
- }
+func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) {
b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
cfg.KeepAliveTimeout = time.Millisecond
cfg.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true)
cfg.Logger = log.Default.WithContextText(t.Name())
+ // 2 would suffice for the greeting test, but 5 is needed for a few other tests. This should be
+ // something slightly higher than the usual chunk size, so it gets tickled in some tests.
+ cfg.MaxAllocPeerRequestDataPerConn = 5
//cfg.Debug = true
//cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string {
// t := m.Text()
if len(bannableTouchers) >= 1 {
c := bannableTouchers[0]
if len(bannableTouchers) != 1 {
- t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
+ t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
} else {
// Turns out it's still useful to ban peers like this because if there's only a
// single peer for a piece, and we never progress that piece to completion, we