]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Remove unused peer stuff in request strategy
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/multiless"
14
15         request_strategy "github.com/anacrolix/torrent/request-strategy"
16 )
17
18 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
19         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
20         for _, t := range cl.torrents {
21                 if !t.haveInfo() {
22                         // This would be removed if metadata is handled here. We have to guard against not
23                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
24                         // is the same.
25                         continue
26                 }
27                 rst := request_strategy.Torrent{
28                         InfoHash:       t.infoHash,
29                         ChunksPerPiece: t.chunksPerRegularPiece(),
30                 }
31                 if t.storage != nil {
32                         rst.Capacity = t.storage.Capacity
33                 }
34                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
35                 for i := range t.pieces {
36                         p := &t.pieces[i]
37                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
38                                 Request:           !t.ignorePieceForRequests(i),
39                                 Priority:          p.purePriority(),
40                                 Partial:           t.piecePartiallyDownloaded(i),
41                                 Availability:      p.availability,
42                                 Length:            int64(p.length()),
43                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
44                                 IterPendingChunks: &p.undirtiedChunksIter,
45                         })
46                 }
47                 ts = append(ts, rst)
48         }
49         return request_strategy.Input{
50                 Torrents:           ts,
51                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
52         }
53 }
54
55 func init() {
56         gob.Register(peerId{})
57 }
58
59 type peerId struct {
60         *Peer
61         ptr uintptr
62 }
63
64 func (p peerId) Uintptr() uintptr {
65         return p.ptr
66 }
67
68 func (p peerId) GobEncode() (b []byte, _ error) {
69         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
70                 Data: uintptr(unsafe.Pointer(&p.ptr)),
71                 Len:  int(unsafe.Sizeof(p.ptr)),
72                 Cap:  int(unsafe.Sizeof(p.ptr)),
73         }
74         return
75 }
76
77 func (p *peerId) GobDecode(b []byte) error {
78         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
79                 panic(len(b))
80         }
81         ptr := unsafe.Pointer(&b[0])
82         p.ptr = *(*uintptr)(ptr)
83         log.Printf("%p", ptr)
84         dst := reflect.SliceHeader{
85                 Data: uintptr(unsafe.Pointer(&p.Peer)),
86                 Len:  int(unsafe.Sizeof(p.Peer)),
87                 Cap:  int(unsafe.Sizeof(p.Peer)),
88         }
89         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
90         return nil
91 }
92
93 type (
94         RequestIndex   = request_strategy.RequestIndex
95         chunkIndexType = request_strategy.ChunkIndex
96 )
97
98 type peerRequests struct {
99         requestIndexes       []RequestIndex
100         peer                 *Peer
101         torrentStrategyInput request_strategy.Torrent
102 }
103
104 func (p *peerRequests) Len() int {
105         return len(p.requestIndexes)
106 }
107
108 func (p *peerRequests) Less(i, j int) bool {
109         leftRequest := p.requestIndexes[i]
110         rightRequest := p.requestIndexes[j]
111         t := p.peer.t
112         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
113         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
114         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
115         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
116         pending := func(index RequestIndex, current bool) int {
117                 ret := t.pendingRequests.Get(index)
118                 if current {
119                         ret--
120                 }
121                 // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
122                 // resolved.
123                 if ret < 0 {
124                         panic(ret)
125                 }
126                 return ret
127         }
128         ml := multiless.New()
129         // Push requests that can't be served right now to the end. But we don't throw them away unless
130         // there's a better alternative. This is for when we're using the fast extension and get choked
131         // but our requests could still be good when we get unchoked.
132         if p.peer.peerChoking {
133                 ml = ml.Bool(
134                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
135                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
136                 )
137         }
138         ml = ml.Int(
139                 pending(leftRequest, leftCurrent),
140                 pending(rightRequest, rightCurrent))
141         ml = ml.Bool(!leftCurrent, !rightCurrent)
142         ml = ml.Int(
143                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
144                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
145         )
146         ml = ml.Int(
147                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
148                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
149         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
150         ml = ml.Uint32(leftRequest, rightRequest)
151         return ml.MustLess()
152 }
153
154 func (p *peerRequests) Swap(i, j int) {
155         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
156 }
157
158 func (p *peerRequests) Push(x interface{}) {
159         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
160 }
161
162 func (p *peerRequests) Pop() interface{} {
163         last := len(p.requestIndexes) - 1
164         x := p.requestIndexes[last]
165         p.requestIndexes = p.requestIndexes[:last]
166         return x
167 }
168
169 type desiredRequestState struct {
170         Requests   []RequestIndex
171         Interested bool
172 }
173
174 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
175         input := p.t.cl.getRequestStrategyInput()
176         requestHeap := peerRequests{
177                 peer: p,
178         }
179         for _, t := range input.Torrents {
180                 if t.InfoHash == p.t.infoHash {
181                         requestHeap.torrentStrategyInput = t
182                         break
183                 }
184         }
185         request_strategy.GetRequestablePieces(
186                 input,
187                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
188                         if t.InfoHash != p.t.infoHash {
189                                 return
190                         }
191                         if !p.peerHasPiece(pieceIndex) {
192                                 return
193                         }
194                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
195                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
196                                 r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
197                                 //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
198                                 //      return
199                                 //}
200                                 if !allowedFast {
201                                         // We must signal interest to request this
202                                         desired.Interested = true
203                                         // We can make or will allow sustaining a request here if we're not choked, or
204                                         // have made the request previously (presumably while unchoked), and haven't had
205                                         // the peer respond yet (and the request was retained because we are using the
206                                         // fast extension).
207                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
208                                                 // We can't request this right now.
209                                                 return
210                                         }
211                                 }
212                                 requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
213                         })
214                 },
215         )
216         p.t.assertPendingRequests()
217         heap.Init(&requestHeap)
218         for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
219                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
220                 desired.Requests = append(desired.Requests, requestIndex)
221         }
222         return
223 }
224
225 func (p *Peer) maybeUpdateActualRequestState() bool {
226         if p.needRequestUpdate == "" {
227                 return true
228         }
229         var more bool
230         pprof.Do(
231                 context.Background(),
232                 pprof.Labels("update request", p.needRequestUpdate),
233                 func(_ context.Context) {
234                         next := p.getDesiredRequestState()
235                         more = p.applyRequestState(next)
236                 },
237         )
238         return more
239 }
240
241 // Transmit/action the request state to the peer.
242 func (p *Peer) applyRequestState(next desiredRequestState) bool {
243         current := &p.actualRequestState
244         if !p.setInterested(next.Interested) {
245                 return false
246         }
247         more := true
248         cancel := current.Requests.Clone()
249         for _, ri := range next.Requests {
250                 cancel.Remove(ri)
251         }
252         cancel.Iterate(func(req uint32) bool {
253                 more = p.cancel(req)
254                 return more
255         })
256         if !more {
257                 return false
258         }
259         for _, req := range next.Requests {
260                 if p.cancelledRequests.Contains(req) {
261                         // Waiting for a reject or piece message, which will suitably trigger us to update our
262                         // requests, so we can skip this one with no additional consideration.
263                         continue
264                 }
265                 // The cardinality of our desired requests shouldn't exceed the max requests since it's used
266                 // in the calculation of the requests. However, if we cancelled requests and they haven't
267                 // been rejected or serviced yet with the fast extension enabled, we can end up with more
268                 // extra outstanding requests. We could subtract the number of outstanding cancels from the
269                 // next request cardinality, but peers might not like that.
270                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
271                         //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
272                         //      next.Requests.GetCardinality(),
273                         //      p.cancelledRequests.GetCardinality(),
274                         //      current.Requests.GetCardinality(),
275                         //      p.nominalMaxRequests(),
276                         //)
277                         break
278                 }
279                 more = p.mustRequest(req)
280                 if !more {
281                         break
282                 }
283         }
284         p.updateRequestsTimer.Stop()
285         if more {
286                 p.needRequestUpdate = ""
287                 if !current.Requests.IsEmpty() {
288                         p.updateRequestsTimer.Reset(3 * time.Second)
289                 }
290         }
291         return more
292 }