9 "github.com/RoaringBitmap/roaring"
10 "github.com/anacrolix/log"
11 "github.com/anacrolix/torrent/metainfo"
12 pp "github.com/anacrolix/torrent/peer_protocol"
13 "github.com/anacrolix/torrent/webseed"
16 type webseedPeer struct {
18 activeRequests map[Request]webseed.Request
19 requesterCond sync.Cond
21 // Number of requester routines.
25 var _ peerImpl = (*webseedPeer)(nil)
27 func (me *webseedPeer) connStatusString() string {
31 func (ws *webseedPeer) String() string {
32 return fmt.Sprintf("webseed peer for %q", ws.client.Url)
35 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
36 ws.client.SetInfo(info)
37 // There should be probably be a callback in Client instead, so it can remove pieces at its whim
39 ws.client.Pieces.Iterate(func(x uint32) bool {
40 ws.peer.t.incPieceAvailability(pieceIndex(x))
45 func (ws *webseedPeer) writeInterested(interested bool) bool {
49 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
50 active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
53 if !ws.peer.deleteRequest(r) {
54 panic("cancelled webseed request should exist")
56 if ws.peer.isLowOnRequests() {
57 ws.peer.updateRequests("webseedPeer._cancel")
63 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
64 return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
67 func (ws *webseedPeer) _request(r Request) bool {
68 ws.requesterCond.Signal()
72 func (ws *webseedPeer) doRequest(r Request) {
73 webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
74 ws.activeRequests[r] = webseedRequest
76 ws.requesterCond.L.Unlock()
77 defer ws.requesterCond.L.Lock()
78 ws.requestResultHandler(r, webseedRequest)
80 delete(ws.activeRequests, r)
83 func (ws *webseedPeer) requester() {
84 ws.requesterCond.L.Lock()
85 defer ws.requesterCond.L.Unlock()
87 for !ws.peer.closed.IsSet() {
89 ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
90 r := ws.peer.t.requestIndexToRequest(x)
91 if _, ok := ws.activeRequests[r]; ok {
101 ws.requesterCond.Wait()
105 func (ws *webseedPeer) connectionFlags() string {
109 // TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
110 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
111 func (ws *webseedPeer) drop() {}
113 func (ws *webseedPeer) handleUpdateRequests() {
114 ws.peer.maybeUpdateActualRequestState()
117 func (ws *webseedPeer) onClose() {
118 ws.peer.logger.WithLevel(log.Debug).Print("closing")
119 ws.peer.deleteAllRequests()
120 for _, r := range ws.activeRequests {
123 ws.requesterCond.Broadcast()
126 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
127 result := <-webseedRequest.Result
128 // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
129 // sure if we can divine which errors indicate cancellation on our end without hitting the
131 ws.peer.doChunkReadStats(int64(len(result.Bytes)))
132 ws.peer.readBytes(int64(len(result.Bytes)))
134 defer ws.peer.t.cl.unlock()
135 if ws.peer.t.closed.IsSet() {
138 if result.Err != nil {
139 if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
140 ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
141 // cfg := spew.NewDefaultConfig()
142 // cfg.DisableMethods = true
143 // cfg.Dump(result.Err)
144 log.Printf("closing %v", ws)
147 ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
149 err := ws.peer.receiveChunk(&pp.Message{
161 func (me *webseedPeer) isLowOnRequests() bool {
162 return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
165 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
166 return &me.client.Pieces
169 func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
170 if !cn.peer.t.haveInfo() {
173 return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true