X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=webseed-peer.go;h=678c805bd9b3b4590aae11f5d92e0be86661d0ab;hb=b18e824a6b9f549bce05e117b1355172d14ef3b3;hp=e5ba748e5df9fc4f71873368fed9a23b6b0f6b43;hpb=4e541951ebef5d2bc4d623c9bf1846040de881b9;p=btrtrc.git diff --git a/webseed-peer.go b/webseed-peer.go index e5ba748e..678c805b 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -1,91 +1,218 @@ package torrent import ( + "context" + "errors" "fmt" - "strings" + "math/rand" + "sync" + "time" + + "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/log" - "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" - "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/webseed" - "github.com/pkg/errors" +) + +const ( + webseedPeerUnhandledErrorSleep = 5 * time.Second + webseedPeerCloseOnUnhandledError = false ) type webseedPeer struct { - client webseed.Client - requests map[request]webseed.Request - peer peer + // First field for stats alignment. + peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + lastUnhandledErr time.Time } var _ peerImpl = (*webseedPeer)(nil) +func (me *webseedPeer) peerImplStatusLines() []string { + return []string{ + me.client.Url, + fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } +} + func (ws *webseedPeer) String() string { return fmt.Sprintf("webseed peer for %q", ws.client.Url) } func (ws *webseedPeer) onGotInfo(info *metainfo.Info) { - ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles())) - ws.client.Info = info -} - -func (ws *webseedPeer) _postCancel(r request) { - ws.cancel(r) + ws.client.SetInfo(info) + // There should be probably be a callback in Client instead, so it can remove pieces at its whim + // too. + ws.client.Pieces.Iterate(func(x uint32) bool { + ws.peer.t.incPieceAvailability(pieceIndex(x)) + return true + }) } func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) cancel(r request) bool { - ws.requests[r].Cancel() - return true +func (ws *webseedPeer) _cancel(r RequestIndex) bool { + if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { + active.Cancel() + // The requester is running and will handle the result. + return true + } + // There should be no requester handling this, so no further events will occur. + return false } -func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec { +func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} } -func (ws *webseedPeer) request(r request) bool { - webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) - ws.requests[r] = webseedRequest - go ws.requestResultHandler(r, webseedRequest) +func (ws *webseedPeer) _request(r Request) bool { + ws.requesterCond.Signal() return true } +func (ws *webseedPeer) doRequest(r Request) error { + webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) + ws.activeRequests[r] = webseedRequest + err := func() error { + ws.requesterCond.L.Unlock() + defer ws.requesterCond.L.Lock() + return ws.requestResultHandler(r, webseedRequest) + }() + delete(ws.activeRequests, r) + return err +} + +func (ws *webseedPeer) requester(i int) { + ws.requesterCond.L.Lock() + defer ws.requesterCond.L.Unlock() +start: + for !ws.peer.closed.IsSet() { + // Restart is set if we don't need to wait for the requestCond before trying again. + restart := false + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { + r := ws.peer.t.requestIndexToRequest(x) + if _, ok := ws.activeRequests[r]; ok { + return true + } + err := ws.doRequest(r) + ws.requesterCond.L.Unlock() + if err != nil && !errors.Is(err, context.Canceled) { + log.Printf("requester %v: error doing webseed request %v: %v", i, r, err) + } + restart = true + if errors.Is(err, webseed.ErrTooFast) { + time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) + } + time.Sleep(time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))) + ws.requesterCond.L.Lock() + return false + }) + if restart { + goto start + } + ws.requesterCond.Wait() + } +} + func (ws *webseedPeer) connectionFlags() string { return "WS" } -// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. +// Maybe this should drop all existing connections, or something like that. func (ws *webseedPeer) drop() {} -func (ws *webseedPeer) updateRequests() { - ws.peer.doRequestState() +func (cn *webseedPeer) ban() { + cn.peer.close() +} + +func (ws *webseedPeer) handleUpdateRequests() { + // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized + // pieces. + go func() { + ws.peer.t.cl.lock() + defer ws.peer.t.cl.unlock() + ws.peer.maybeUpdateActualRequestState() + }() } -func (ws *webseedPeer) _close() {} +func (ws *webseedPeer) onClose() { + ws.peer.logger.Levelf(log.Debug, "closing") + // Just deleting them means we would have to manually cancel active requests. + ws.peer.cancelAllRequests() + ws.peer.t.iterPeers(func(p *Peer) { + if p.isLowOnRequests() { + p.updateRequests("webseedPeer.onClose") + } + }) + ws.requesterCond.Broadcast() +} -func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) { +func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error { result := <-webseedRequest.Result + close(webseedRequest.Result) // one-shot + // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not + // sure if we can divine which errors indicate cancellation on our end without hitting the + // network though. + if len(result.Bytes) != 0 || result.Err == nil { + // Increment ChunksRead and friends + ws.peer.doChunkReadStats(int64(len(result.Bytes))) + } + ws.peer.readBytes(int64(len(result.Bytes))) ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() - if result.Err != nil { - ws.peer.logger.Printf("request %v rejected: %v", r, result.Err) - if strings.Contains(errors.Cause(result.Err).Error(), "unsupported protocol scheme") { - ws.peer.close() - } else { - ws.peer.remoteRejectedRequest(r) + if ws.peer.t.closed.IsSet() { + return nil + } + err := result.Err + if err != nil { + switch { + case errors.Is(err, context.Canceled): + case errors.Is(err, webseed.ErrTooFast): + case ws.peer.closed.IsSet(): + default: + ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) + // // Here lies my attempt to extract something concrete from Go's error system. RIP. + // cfg := spew.NewDefaultConfig() + // cfg.DisableMethods = true + // cfg.Dump(result.Err) + + if webseedPeerCloseOnUnhandledError { + log.Printf("closing %v", ws) + ws.peer.close() + } else { + ws.lastUnhandledErr = time.Now() + } } - } else { - err := ws.peer.receiveChunk(&pp.Message{ - Type: pp.Piece, - Index: r.Index, - Begin: r.Begin, - Piece: result.Bytes, - }) - if err != nil { - panic(err) + if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { + panic("invalid reject") } + return err + } + err = ws.peer.receiveChunk(&pp.Message{ + Type: pp.Piece, + Index: r.Index, + Begin: r.Begin, + Piece: result.Bytes, + }) + if err != nil { + panic(err) + } + return err +} + +func (me *webseedPeer) peerPieces() *roaring.Bitmap { + return &me.client.Pieces +} + +func (cn *webseedPeer) peerHasAllPieces() (all, known bool) { + if !cn.peer.t.haveInfo() { + return true, false } + return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true }