]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Remove unused Client.updateRequests
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/multiless"
14
15         request_strategy "github.com/anacrolix/torrent/request-strategy"
16 )
17
18 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
19         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
20         for _, t := range cl.torrents {
21                 if !t.haveInfo() {
22                         // This would be removed if metadata is handled here. We have to guard against not
23                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
24                         // is the same.
25                         continue
26                 }
27                 rst := request_strategy.Torrent{
28                         InfoHash:       t.infoHash,
29                         ChunksPerPiece: t.chunksPerRegularPiece(),
30                 }
31                 if t.storage != nil {
32                         rst.Capacity = t.storage.Capacity
33                 }
34                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
35                 for i := range t.pieces {
36                         p := &t.pieces[i]
37                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
38                                 Request:           !t.ignorePieceForRequests(i),
39                                 Priority:          p.purePriority(),
40                                 Partial:           t.piecePartiallyDownloaded(i),
41                                 Availability:      p.availability,
42                                 Length:            int64(p.length()),
43                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
44                                 IterPendingChunks: &p.undirtiedChunksIter,
45                         })
46                 }
47                 t.iterPeers(func(p *Peer) {
48                         if p.closed.IsSet() {
49                                 return
50                         }
51                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
52                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
53                         }
54                         p.piecesReceivedSinceLastRequestUpdate = 0
55                         rst.Peers = append(rst.Peers, request_strategy.Peer{
56                                 Pieces:           *p.newPeerPieces(),
57                                 MaxRequests:      p.nominalMaxRequests(),
58                                 ExistingRequests: p.actualRequestState.Requests,
59                                 Choking:          p.peerChoking,
60                                 PieceAllowedFast: p.peerAllowedFast,
61                                 DownloadRate:     p.downloadRate(),
62                                 Age:              time.Since(p.completedHandshake),
63                                 Id: peerId{
64                                         Peer: p,
65                                         ptr:  uintptr(unsafe.Pointer(p)),
66                                 },
67                         })
68                 })
69                 ts = append(ts, rst)
70         }
71         return request_strategy.Input{
72                 Torrents:           ts,
73                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
74         }
75 }
76
77 func init() {
78         gob.Register(peerId{})
79 }
80
81 type peerId struct {
82         *Peer
83         ptr uintptr
84 }
85
86 func (p peerId) Uintptr() uintptr {
87         return p.ptr
88 }
89
90 func (p peerId) GobEncode() (b []byte, _ error) {
91         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
92                 Data: uintptr(unsafe.Pointer(&p.ptr)),
93                 Len:  int(unsafe.Sizeof(p.ptr)),
94                 Cap:  int(unsafe.Sizeof(p.ptr)),
95         }
96         return
97 }
98
99 func (p *peerId) GobDecode(b []byte) error {
100         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
101                 panic(len(b))
102         }
103         ptr := unsafe.Pointer(&b[0])
104         p.ptr = *(*uintptr)(ptr)
105         log.Printf("%p", ptr)
106         dst := reflect.SliceHeader{
107                 Data: uintptr(unsafe.Pointer(&p.Peer)),
108                 Len:  int(unsafe.Sizeof(p.Peer)),
109                 Cap:  int(unsafe.Sizeof(p.Peer)),
110         }
111         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
112         return nil
113 }
114
115 type (
116         RequestIndex   = request_strategy.RequestIndex
117         chunkIndexType = request_strategy.ChunkIndex
118 )
119
120 type peerRequests struct {
121         requestIndexes       []RequestIndex
122         peer                 *Peer
123         torrentStrategyInput request_strategy.Torrent
124 }
125
126 func (p *peerRequests) Len() int {
127         return len(p.requestIndexes)
128 }
129
130 func (p *peerRequests) Less(i, j int) bool {
131         leftRequest := p.requestIndexes[i]
132         rightRequest := p.requestIndexes[j]
133         t := p.peer.t
134         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
135         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
136         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
137         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
138         pending := func(index RequestIndex, current bool) int {
139                 ret := t.pendingRequests.Get(index)
140                 if current {
141                         ret--
142                 }
143                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
144                 // resolved.
145                 if ret < 0 {
146                         panic(ret)
147                 }
148                 return ret
149         }
150         ml := multiless.New()
151         // Push requests that can't be served right now to the end. But we don't throw them away unless
152         // there's a better alternative. This is for when we're using the fast extension and get choked
153         // but our requests could still be good when we get unchoked.
154         if p.peer.peerChoking {
155                 ml = ml.Bool(
156                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
157                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
158                 )
159         }
160         ml = ml.Int(
161                 pending(leftRequest, leftCurrent),
162                 pending(rightRequest, rightCurrent))
163         ml = ml.Bool(!leftCurrent, !rightCurrent)
164         ml = ml.Int(
165                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
166                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
167         )
168         ml = ml.Int(
169                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
170                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
171         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
172         ml = ml.Uint32(leftRequest, rightRequest)
173         return ml.MustLess()
174 }
175
176 func (p *peerRequests) Swap(i, j int) {
177         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
178 }
179
180 func (p *peerRequests) Push(x interface{}) {
181         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
182 }
183
184 func (p *peerRequests) Pop() interface{} {
185         last := len(p.requestIndexes) - 1
186         x := p.requestIndexes[last]
187         p.requestIndexes = p.requestIndexes[:last]
188         return x
189 }
190
191 type desiredRequestState struct {
192         Requests   []RequestIndex
193         Interested bool
194 }
195
196 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
197         input := p.t.cl.getRequestStrategyInput()
198         requestHeap := peerRequests{
199                 peer: p,
200         }
201         for _, t := range input.Torrents {
202                 if t.InfoHash == p.t.infoHash {
203                         requestHeap.torrentStrategyInput = t
204                         break
205                 }
206         }
207         request_strategy.GetRequestablePieces(
208                 input,
209                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
210                         if t.InfoHash != p.t.infoHash {
211                                 return
212                         }
213                         if !p.peerHasPiece(pieceIndex) {
214                                 return
215                         }
216                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
217                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
218                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
219                                 //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
220                                 //      return
221                                 //}
222                                 if !allowedFast {
223                                         // We must signal interest to request this
224                                         desired.Interested = true
225                                         // We can make or will allow sustaining a request here if we're not choked, or
226                                         // have made the request previously (presumably while unchoked), and haven't had
227                                         // the peer respond yet (and the request was retained because we are using the
228                                         // fast extension).
229                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
230                                                 // We can't request this right now.
231                                                 return
232                                         }
233                                 }
234                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
235                         })
236                 },
237         )
238         p.t.assertPendingRequests()
239         heap.Init(&requestHeap)
240         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
241                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
242                 desired.Requests = append(desired.Requests, requestIndex)
243         }
244         return
245 }
246
247 func (p *Peer) maybeUpdateActualRequestState() bool {
248         if p.needRequestUpdate == "" {
249                 return true
250         }
251         var more bool
252         pprof.Do(
253                 context.Background(),
254                 pprof.Labels("update request", p.needRequestUpdate),
255                 func(_ context.Context) {
256                         next := p.getDesiredRequestState()
257                         more = p.applyRequestState(next)
258                 },
259         )
260         return more
261 }
262
263 // Transmit/action the request state to the peer.
264 func (p *Peer) applyRequestState(next desiredRequestState) bool {
265         current := &p.actualRequestState
266         if !p.setInterested(next.Interested) {
267                 return false
268         }
269         more := true
270         cancel := current.Requests.Clone()
271         for _, ri := range next.Requests {
272                 cancel.Remove(ri)
273         }
274         cancel.Iterate(func(req uint32) bool {
275                 more = p.cancel(req)
276                 return more
277         })
278         if !more {
279                 return false
280         }
281         for _, req := range next.Requests {
282                 if p.cancelledRequests.Contains(req) {
283                         // Waiting for a reject or piece message, which will suitably trigger us to update our
284                         // requests, so we can skip this one with no additional consideration.
285                         continue
286                 }
287                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
288                 // in the calculation of the requests. However, if we cancelled requests and they haven't
289                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
290                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
291                 // next request cardinality, but peers might not like that.
292                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
293                         //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
294                         //      next.Requests.GetCardinality(),
295                         //      p.cancelledRequests.GetCardinality(),
296                         //      current.Requests.GetCardinality(),
297                         //      p.nominalMaxRequests(),
298                         //)
299                         break
300                 }
301                 more = p.mustRequest(req)
302                 if !more {
303                         break
304                 }
305         }
306         p.updateRequestsTimer.Stop()
307         if more {
308                 p.needRequestUpdate = ""
309                 if !current.Requests.IsEmpty() {
310                         p.updateRequestsTimer.Reset(3 * time.Second)
311                 }
312         }
313         return more
314 }