]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Default to peer requesting disabled
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "time"
5         "unsafe"
6
7         "github.com/RoaringBitmap/roaring"
8         "github.com/anacrolix/missinggo/v2/bitmap"
9
10         "github.com/anacrolix/chansync"
11         request_strategy "github.com/anacrolix/torrent/request-strategy"
12 )
13
14 // Calculate requests individually for each peer.
15 const peerRequesting = false
16
17 func (cl *Client) requester() {
18         for {
19                 update := func() chansync.Signaled {
20                         cl.lock()
21                         defer cl.unlock()
22                         cl.doRequests()
23                         return cl.updateRequests.Signaled()
24                 }()
25                 minWait := time.After(100 * time.Millisecond)
26                 maxWait := time.After(1000 * time.Millisecond)
27                 select {
28                 case <-cl.closed.Done():
29                         return
30                 case <-minWait:
31                 case <-maxWait:
32                 }
33                 select {
34                 case <-cl.closed.Done():
35                         return
36                 case <-update:
37                 case <-maxWait:
38                 }
39         }
40 }
41
42 func (cl *Client) tickleRequester() {
43         cl.updateRequests.Broadcast()
44 }
45
46 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
47         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
48         for _, t := range cl.torrents {
49                 if !t.haveInfo() {
50                         // This would be removed if metadata is handled here. We have to guard against not
51                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
52                         // is the same.
53                         continue
54                 }
55                 rst := request_strategy.Torrent{
56                         InfoHash:       t.infoHash,
57                         ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
58                 }
59                 if t.storage != nil {
60                         rst.Capacity = t.storage.Capacity
61                 }
62                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
63                 for i := range t.pieces {
64                         p := &t.pieces[i]
65                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
66                                 Request:           !t.ignorePieceForRequests(i),
67                                 Priority:          p.purePriority(),
68                                 Partial:           t.piecePartiallyDownloaded(i),
69                                 Availability:      p.availability,
70                                 Length:            int64(p.length()),
71                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
72                                 IterPendingChunks: p.iterUndirtiedChunks,
73                         })
74                 }
75                 t.iterPeers(func(p *Peer) {
76                         if p.closed.IsSet() {
77                                 return
78                         }
79                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
80                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
81                         }
82                         p.piecesReceivedSinceLastRequestUpdate = 0
83                         rst.Peers = append(rst.Peers, request_strategy.Peer{
84                                 HasPiece:    p.peerHasPiece,
85                                 MaxRequests: p.nominalMaxRequests(),
86                                 HasExistingRequest: func(r RequestIndex) bool {
87                                         return p.actualRequestState.Requests.Contains(r)
88                                 },
89                                 Choking: p.peerChoking,
90                                 PieceAllowedFast: func(i pieceIndex) bool {
91                                         return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
92                                 },
93                                 DownloadRate: p.downloadRate(),
94                                 Age:          time.Since(p.completedHandshake),
95                                 Id: peerId{
96                                         Peer: p,
97                                         ptr:  uintptr(unsafe.Pointer(p)),
98                                 },
99                         })
100                 })
101                 ts = append(ts, rst)
102         }
103         return request_strategy.Input{
104                 Torrents:           ts,
105                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
106         }
107 }
108
109 func (cl *Client) doRequests() {
110         nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
111         for p, state := range nextPeerStates {
112                 setPeerNextRequestState(p, state)
113         }
114 }
115
116 type peerId struct {
117         *Peer
118         ptr uintptr
119 }
120
121 func (p peerId) Uintptr() uintptr {
122         return p.ptr
123 }
124
125 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
126         p := _p.(peerId).Peer
127         p.nextRequestState = rp
128         p.onNextRequestStateChanged()
129 }
130
131 type RequestIndex = request_strategy.RequestIndex
132 type chunkIndexType = request_strategy.ChunkIndex
133
134 func (p *Peer) applyNextRequestState() bool {
135         if peerRequesting {
136                 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
137                         return true
138                 }
139                 type piece struct {
140                         index   int
141                         endGame bool
142                 }
143                 var pieceOrder []piece
144                 request_strategy.GetRequestablePieces(
145                         p.t.cl.getRequestStrategyInput(),
146                         func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
147                                 if t.InfoHash != p.t.infoHash {
148                                         return
149                                 }
150                                 if !p.peerHasPiece(pieceIndex) {
151                                         return
152                                 }
153                                 pieceOrder = append(pieceOrder, piece{
154                                         index:   pieceIndex,
155                                         endGame: rsp.Priority == PiecePriorityNow,
156                                 })
157                         },
158                 )
159                 more := true
160                 interested := false
161                 for _, endGameIter := range []bool{false, true} {
162                         for _, piece := range pieceOrder {
163                                 tp := p.t.piece(piece.index)
164                                 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
165                                         req := cs + tp.requestIndexOffset()
166                                         if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
167                                                 return
168                                         }
169                                         interested = true
170                                         more = p.setInterested(true)
171                                         if !more {
172                                                 return
173                                         }
174                                         if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
175                                                 return
176                                         }
177                                         if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
178                                                 return
179                                         }
180                                         var err error
181                                         more, err = p.request(req)
182                                         if err != nil {
183                                                 panic(err)
184                                         }
185                                 })
186                                 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
187                                         break
188                                 }
189                                 if !more {
190                                         break
191                                 }
192                         }
193                         if !more {
194                                 break
195                         }
196                 }
197                 if !more {
198                         return false
199                 }
200                 if !interested {
201                         p.setInterested(false)
202                 }
203                 return more
204         }
205
206         next := p.nextRequestState
207         current := p.actualRequestState
208         if !p.setInterested(next.Interested) {
209                 return false
210         }
211         more := true
212         cancel := roaring.AndNot(&current.Requests, &next.Requests)
213         cancel.Iterate(func(req uint32) bool {
214                 more = p.cancel(req)
215                 return more
216         })
217         if !more {
218                 return false
219         }
220         next.Requests.Iterate(func(req uint32) bool {
221                 // This could happen if the peer chokes us between the next state being generated, and us
222                 // trying to transmit the state.
223                 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
224                         return true
225                 }
226                 var err error
227                 more, err = p.request(req)
228                 if err != nil {
229                         panic(err)
230                 } /* else {
231                         log.Print(req)
232                 } */
233                 return more
234         })
235         return more
236 }