]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Ensure peerRequests.torrentStrategyInput is set
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/multiless"
14
15         request_strategy "github.com/anacrolix/torrent/request-strategy"
16 )
17
18 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
19         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
20         for _, t := range cl.torrents {
21                 if !t.haveInfo() {
22                         // This would be removed if metadata is handled here. We have to guard against not
23                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
24                         // is the same.
25                         continue
26                 }
27                 rst := request_strategy.Torrent{
28                         InfoHash:       t.infoHash,
29                         ChunksPerPiece: t.chunksPerRegularPiece(),
30                 }
31                 if t.storage != nil {
32                         rst.Capacity = t.storage.Capacity
33                 }
34                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
35                 for i := range t.pieces {
36                         p := &t.pieces[i]
37                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
38                                 Request:           !t.ignorePieceForRequests(i),
39                                 Priority:          p.purePriority(),
40                                 Partial:           t.piecePartiallyDownloaded(i),
41                                 Availability:      p.availability,
42                                 Length:            int64(p.length()),
43                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
44                                 IterPendingChunks: &p.undirtiedChunksIter,
45                         })
46                 }
47                 ts = append(ts, rst)
48         }
49         return request_strategy.Input{
50                 Torrents:           ts,
51                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
52         }
53 }
54
55 func init() {
56         gob.Register(peerId{})
57 }
58
59 type peerId struct {
60         *Peer
61         ptr uintptr
62 }
63
64 func (p peerId) Uintptr() uintptr {
65         return p.ptr
66 }
67
68 func (p peerId) GobEncode() (b []byte, _ error) {
69         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
70                 Data: uintptr(unsafe.Pointer(&p.ptr)),
71                 Len:  int(unsafe.Sizeof(p.ptr)),
72                 Cap:  int(unsafe.Sizeof(p.ptr)),
73         }
74         return
75 }
76
77 func (p *peerId) GobDecode(b []byte) error {
78         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
79                 panic(len(b))
80         }
81         ptr := unsafe.Pointer(&b[0])
82         p.ptr = *(*uintptr)(ptr)
83         log.Printf("%p", ptr)
84         dst := reflect.SliceHeader{
85                 Data: uintptr(unsafe.Pointer(&p.Peer)),
86                 Len:  int(unsafe.Sizeof(p.Peer)),
87                 Cap:  int(unsafe.Sizeof(p.Peer)),
88         }
89         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
90         return nil
91 }
92
93 type (
94         RequestIndex   = request_strategy.RequestIndex
95         chunkIndexType = request_strategy.ChunkIndex
96 )
97
98 type peerRequests struct {
99         requestIndexes       []RequestIndex
100         peer                 *Peer
101         torrentStrategyInput *request_strategy.Torrent
102 }
103
104 func (p *peerRequests) Len() int {
105         return len(p.requestIndexes)
106 }
107
108 func (p *peerRequests) Less(i, j int) bool {
109         leftRequest := p.requestIndexes[i]
110         rightRequest := p.requestIndexes[j]
111         t := p.peer.t
112         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
113         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
114         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
115         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
116         pending := func(index RequestIndex, current bool) int {
117                 ret := t.pendingRequests.Get(index)
118                 if current {
119                         ret--
120                 }
121                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
122                 // resolved.
123                 if ret < 0 {
124                         panic(ret)
125                 }
126                 return ret
127         }
128         ml := multiless.New()
129         // Push requests that can't be served right now to the end. But we don't throw them away unless
130         // there's a better alternative. This is for when we're using the fast extension and get choked
131         // but our requests could still be good when we get unchoked.
132         if p.peer.peerChoking {
133                 ml = ml.Bool(
134                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
135                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
136                 )
137         }
138         ml = ml.Int(
139                 pending(leftRequest, leftCurrent),
140                 pending(rightRequest, rightCurrent))
141         ml = ml.Bool(!leftCurrent, !rightCurrent)
142         ml = ml.Int(
143                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
144                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
145         )
146         ml = ml.Int(
147                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
148                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
149         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
150         ml = ml.Uint32(leftRequest, rightRequest)
151         return ml.MustLess()
152 }
153
154 func (p *peerRequests) Swap(i, j int) {
155         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
156 }
157
158 func (p *peerRequests) Push(x interface{}) {
159         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
160 }
161
162 func (p *peerRequests) Pop() interface{} {
163         last := len(p.requestIndexes) - 1
164         x := p.requestIndexes[last]
165         p.requestIndexes = p.requestIndexes[:last]
166         return x
167 }
168
169 type desiredRequestState struct {
170         Requests   []RequestIndex
171         Interested bool
172 }
173
174 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
175         input := p.t.cl.getRequestStrategyInput()
176         requestHeap := peerRequests{
177                 peer: p,
178         }
179         for i := range input.Torrents {
180                 t := &input.Torrents[i]
181                 if t.InfoHash == p.t.infoHash {
182                         requestHeap.torrentStrategyInput = t
183                         break
184                 }
185         }
186         request_strategy.GetRequestablePieces(
187                 input,
188                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
189                         if t.InfoHash != p.t.infoHash {
190                                 return
191                         }
192                         if !p.peerHasPiece(pieceIndex) {
193                                 return
194                         }
195                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
196                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
197                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
198                                 //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
199                                 //      return
200                                 //}
201                                 if !allowedFast {
202                                         // We must signal interest to request this
203                                         desired.Interested = true
204                                         // We can make or will allow sustaining a request here if we're not choked, or
205                                         // have made the request previously (presumably while unchoked), and haven't had
206                                         // the peer respond yet (and the request was retained because we are using the
207                                         // fast extension).
208                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
209                                                 // We can't request this right now.
210                                                 return
211                                         }
212                                 }
213                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
214                         })
215                 },
216         )
217         p.t.assertPendingRequests()
218         heap.Init(&requestHeap)
219         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
220                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
221                 desired.Requests = append(desired.Requests, requestIndex)
222         }
223         return
224 }
225
226 func (p *Peer) maybeUpdateActualRequestState() bool {
227         if p.needRequestUpdate == "" {
228                 return true
229         }
230         var more bool
231         pprof.Do(
232                 context.Background(),
233                 pprof.Labels("update request", p.needRequestUpdate),
234                 func(_ context.Context) {
235                         next := p.getDesiredRequestState()
236                         more = p.applyRequestState(next)
237                 },
238         )
239         return more
240 }
241
242 // Transmit/action the request state to the peer.
243 func (p *Peer) applyRequestState(next desiredRequestState) bool {
244         current := &p.actualRequestState
245         if !p.setInterested(next.Interested) {
246                 return false
247         }
248         more := true
249         cancel := current.Requests.Clone()
250         for _, ri := range next.Requests {
251                 cancel.Remove(ri)
252         }
253         cancel.Iterate(func(req uint32) bool {
254                 more = p.cancel(req)
255                 return more
256         })
257         if !more {
258                 return false
259         }
260         for _, req := range next.Requests {
261                 if p.cancelledRequests.Contains(req) {
262                         // Waiting for a reject or piece message, which will suitably trigger us to update our
263                         // requests, so we can skip this one with no additional consideration.
264                         continue
265                 }
266                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
267                 // in the calculation of the requests. However, if we cancelled requests and they haven't
268                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
269                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
270                 // next request cardinality, but peers might not like that.
271                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
272                         //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
273                         //      next.Requests.GetCardinality(),
274                         //      p.cancelledRequests.GetCardinality(),
275                         //      current.Requests.GetCardinality(),
276                         //      p.nominalMaxRequests(),
277                         //)
278                         break
279                 }
280                 more = p.mustRequest(req)
281                 if !more {
282                         break
283                 }
284         }
285         p.updateRequestsTimer.Stop()
286         if more {
287                 p.needRequestUpdate = ""
288                 if !current.Requests.IsEmpty() {
289                         p.updateRequestsTimer.Reset(3 * time.Second)
290                 }
291         }
292         return more
293 }