X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=webseed-peer.go;h=5b6632badd8153ff0f685e75b4c4452c188ff34d;hb=HEAD;hp=f2ccbff5550df6cc414ad5f5f5b3e96dd35527af;hpb=c895a21a165090d8a37ac43350e46eb67a00dd4d;p=btrtrc.git diff --git a/webseed-peer.go b/webseed-peer.go index f2ccbff5..5b6632ba 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -4,32 +4,39 @@ import ( "context" "errors" "fmt" - "net/http" - "strings" + "math/rand" "sync" + "time" + + "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/log" - "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" - "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/webseed" ) +const ( + webseedPeerUnhandledErrorSleep = 5 * time.Second + webseedPeerCloseOnUnhandledError = false +) + type webseedPeer struct { - client webseed.Client - activeRequests map[Request]webseed.Request - requesterCond sync.Cond - peer Peer + // First field for stats alignment. + peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + lastUnhandledErr time.Time } var _ peerImpl = (*webseedPeer)(nil) -func (me *webseedPeer) writeBufferFull() bool { - return false -} - -func (me *webseedPeer) connStatusString() string { - return me.client.Url +func (me *webseedPeer) peerImplStatusLines() []string { + return []string{ + me.client.Url, + fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } } func (ws *webseedPeer) String() string { @@ -37,20 +44,27 @@ func (ws *webseedPeer) String() string { } func (ws *webseedPeer) onGotInfo(info *metainfo.Info) { - ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles())) - ws.client.Info = info + ws.client.SetInfo(info) + // There should be probably be a callback in Client instead, so it can remove pieces at its whim + // too. + ws.client.Pieces.Iterate(func(x uint32) bool { + ws.peer.t.incPieceAvailability(pieceIndex(x)) + return true + }) } func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r Request) bool { - active, ok := ws.activeRequests[r] - if ok { +func (ws *webseedPeer) _cancel(r RequestIndex) bool { + if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { active.Cancel() + // The requester is running and will handle the result. + return true } - return true + // There should be no requester handling this, so no further events will occur. + return false } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { @@ -62,27 +76,48 @@ func (ws *webseedPeer) _request(r Request) bool { return true } -func (ws *webseedPeer) doRequest(r Request) { +func (ws *webseedPeer) doRequest(r Request) error { webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) ws.activeRequests[r] = webseedRequest - func() { + err := func() error { ws.requesterCond.L.Unlock() defer ws.requesterCond.L.Lock() - ws.requestResultHandler(r, webseedRequest) + return ws.requestResultHandler(r, webseedRequest) }() delete(ws.activeRequests, r) + return err } -func (ws *webseedPeer) requester() { +func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Lock() defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { - for r := range ws.peer.actualRequestState.Requests { + // Restart is set if we don't need to wait for the requestCond before trying again. + restart := false + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { + r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { - continue + return true + } + err := ws.doRequest(r) + ws.requesterCond.L.Unlock() + if err != nil && !errors.Is(err, context.Canceled) { + log.Printf("requester %v: error doing webseed request %v: %v", i, r, err) } - ws.doRequest(r) + restart = true + if errors.Is(err, webseed.ErrTooFast) { + time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) + } + // Demeter is throwing a tantrum on Mount Olympus for this + ws.peer.t.cl.locker().RLock() + duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)) + ws.peer.t.cl.locker().RUnlock() + time.Sleep(duration) + ws.requesterCond.L.Lock() + return false + }) + if restart { goto start } ws.requesterCond.Wait() @@ -93,62 +128,95 @@ func (ws *webseedPeer) connectionFlags() string { return "WS" } -// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could -// return bool if this is even possible, and if it isn't, skip to the next drop candidate. +// Maybe this should drop all existing connections, or something like that. func (ws *webseedPeer) drop() {} -func (ws *webseedPeer) updateRequests() { +func (cn *webseedPeer) ban() { + cn.peer.close() +} + +func (ws *webseedPeer) handleUpdateRequests() { + // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized + // pieces. + go func() { + ws.peer.t.cl.lock() + defer ws.peer.t.cl.unlock() + ws.peer.maybeUpdateActualRequestState() + }() } func (ws *webseedPeer) onClose() { - ws.peer.logger.Print("closing") - for _, r := range ws.activeRequests { - r.Cancel() - } + ws.peer.logger.Levelf(log.Debug, "closing") + // Just deleting them means we would have to manually cancel active requests. + ws.peer.cancelAllRequests() + ws.peer.t.iterPeers(func(p *Peer) { + if p.isLowOnRequests() { + p.updateRequests("webseedPeer.onClose") + } + }) ws.requesterCond.Broadcast() } -func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { +func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error { result := <-webseedRequest.Result + close(webseedRequest.Result) // one-shot // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not // sure if we can divine which errors indicate cancellation on our end without hitting the // network though. - ws.peer.doChunkReadStats(int64(len(result.Bytes))) + if len(result.Bytes) != 0 || result.Err == nil { + // Increment ChunksRead and friends + ws.peer.doChunkReadStats(int64(len(result.Bytes))) + } + ws.peer.readBytes(int64(len(result.Bytes))) ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() - if result.Err != nil { - if !errors.Is(result.Err, context.Canceled) { + if ws.peer.t.closed.IsSet() { + return nil + } + err := result.Err + if err != nil { + switch { + case errors.Is(err, context.Canceled): + case errors.Is(err, webseed.ErrTooFast): + case ws.peer.closed.IsSet(): + default: ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) + // // Here lies my attempt to extract something concrete from Go's error system. RIP. + // cfg := spew.NewDefaultConfig() + // cfg.DisableMethods = true + // cfg.Dump(result.Err) + + if webseedPeerCloseOnUnhandledError { + log.Printf("closing %v", ws) + ws.peer.close() + } else { + ws.lastUnhandledErr = time.Now() + } } - // We need to filter out temporary errors, but this is a nightmare in Go. Currently a bad - // webseed URL can starve out the good ones due to the chunk selection algorithm. - const closeOnAllErrors = false - if closeOnAllErrors || - strings.Contains(result.Err.Error(), "unsupported protocol scheme") || - func() bool { - var err webseed.ErrBadResponse - if !errors.As(result.Err, &err) { - return false - } - return err.Response.StatusCode == http.StatusNotFound - }() { - ws.peer.close() - } else { - ws.peer.remoteRejectedRequest(r) - } - } else { - err := ws.peer.receiveChunk(&pp.Message{ - Type: pp.Piece, - Index: r.Index, - Begin: r.Begin, - Piece: result.Bytes, - }) - if err != nil { - panic(err) + if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { + panic("invalid reject") } + return err + } + err = ws.peer.receiveChunk(&pp.Message{ + Type: pp.Piece, + Index: r.Index, + Begin: r.Begin, + Piece: result.Bytes, + }) + if err != nil { + panic(err) } + return err } -func (me *webseedPeer) onNextRequestStateChanged() { - me.peer.applyNextRequestState() +func (me *webseedPeer) peerPieces() *roaring.Bitmap { + return &me.client.Pieces +} + +func (cn *webseedPeer) peerHasAllPieces() (all, known bool) { + if !cn.peer.t.haveInfo() { + return true, false + } + return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true }