]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Fix log message argument
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "context"
6         "encoding/gob"
7         "reflect"
8         "runtime/pprof"
9         "time"
10         "unsafe"
11
12         "github.com/RoaringBitmap/roaring"
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 func (cl *Client) tickleRequester() {
20         cl.updateRequests.Broadcast()
21 }
22
23 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
24         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
25         for _, t := range cl.torrents {
26                 if !t.haveInfo() {
27                         // This would be removed if metadata is handled here. We have to guard against not
28                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
29                         // is the same.
30                         continue
31                 }
32                 rst := request_strategy.Torrent{
33                         InfoHash:       t.infoHash,
34                         ChunksPerPiece: t.chunksPerRegularPiece(),
35                 }
36                 if t.storage != nil {
37                         rst.Capacity = t.storage.Capacity
38                 }
39                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
40                 for i := range t.pieces {
41                         p := &t.pieces[i]
42                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
43                                 Request:           !t.ignorePieceForRequests(i),
44                                 Priority:          p.purePriority(),
45                                 Partial:           t.piecePartiallyDownloaded(i),
46                                 Availability:      p.availability,
47                                 Length:            int64(p.length()),
48                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
49                                 IterPendingChunks: &p.undirtiedChunksIter,
50                         })
51                 }
52                 t.iterPeers(func(p *Peer) {
53                         if p.closed.IsSet() {
54                                 return
55                         }
56                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
57                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
58                         }
59                         p.piecesReceivedSinceLastRequestUpdate = 0
60                         rst.Peers = append(rst.Peers, request_strategy.Peer{
61                                 Pieces:           *p.newPeerPieces(),
62                                 MaxRequests:      p.nominalMaxRequests(),
63                                 ExistingRequests: p.actualRequestState.Requests,
64                                 Choking:          p.peerChoking,
65                                 PieceAllowedFast: p.peerAllowedFast,
66                                 DownloadRate:     p.downloadRate(),
67                                 Age:              time.Since(p.completedHandshake),
68                                 Id: peerId{
69                                         Peer: p,
70                                         ptr:  uintptr(unsafe.Pointer(p)),
71                                 },
72                         })
73                 })
74                 ts = append(ts, rst)
75         }
76         return request_strategy.Input{
77                 Torrents:           ts,
78                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
79         }
80 }
81
82 func init() {
83         gob.Register(peerId{})
84 }
85
86 type peerId struct {
87         *Peer
88         ptr uintptr
89 }
90
91 func (p peerId) Uintptr() uintptr {
92         return p.ptr
93 }
94
95 func (p peerId) GobEncode() (b []byte, _ error) {
96         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
97                 Data: uintptr(unsafe.Pointer(&p.ptr)),
98                 Len:  int(unsafe.Sizeof(p.ptr)),
99                 Cap:  int(unsafe.Sizeof(p.ptr)),
100         }
101         return
102 }
103
104 func (p *peerId) GobDecode(b []byte) error {
105         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
106                 panic(len(b))
107         }
108         ptr := unsafe.Pointer(&b[0])
109         p.ptr = *(*uintptr)(ptr)
110         log.Printf("%p", ptr)
111         dst := reflect.SliceHeader{
112                 Data: uintptr(unsafe.Pointer(&p.Peer)),
113                 Len:  int(unsafe.Sizeof(p.Peer)),
114                 Cap:  int(unsafe.Sizeof(p.Peer)),
115         }
116         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
117         return nil
118 }
119
120 type RequestIndex = request_strategy.RequestIndex
121 type chunkIndexType = request_strategy.ChunkIndex
122
123 type peerRequests struct {
124         requestIndexes       []RequestIndex
125         peer                 *Peer
126         torrentStrategyInput request_strategy.Torrent
127 }
128
129 func (p *peerRequests) Len() int {
130         return len(p.requestIndexes)
131 }
132
133 func (p *peerRequests) Less(i, j int) bool {
134         leftRequest := p.requestIndexes[i]
135         rightRequest := p.requestIndexes[j]
136         t := p.peer.t
137         leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
138         rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
139         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
140         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
141         pending := func(index RequestIndex, current bool) int {
142                 ret := t.pendingRequests.Get(index)
143                 if current {
144                         ret--
145                 }
146                 // I have a hunch that this could trigger for requests for chunks that are choked and not
147                 // allowed fast, since the current conn shouldn't already be included. It's a very specific
148                 // circumstance, and if it triggers I will fix it.
149                 if ret < 0 {
150                         panic(ret)
151                 }
152                 return ret
153         }
154         ml := multiless.New()
155         // Push requests that can't be served right now to the end. But we don't throw them away unless
156         // there's a better alternative. This is for when we're using the fast extension and get choked
157         // but our requests could still be good when we get unchoked.
158         if p.peer.peerChoking {
159                 ml = ml.Bool(
160                         !p.peer.peerAllowedFast.Contains(leftPieceIndex),
161                         !p.peer.peerAllowedFast.Contains(rightPieceIndex),
162                 )
163         }
164         ml = ml.Int(
165                 pending(leftRequest, leftCurrent),
166                 pending(rightRequest, rightCurrent))
167         ml = ml.Bool(!leftCurrent, !rightCurrent)
168         ml = ml.Int(
169                 -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
170                 -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
171         )
172         ml = ml.Int(
173                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
174                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
175         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
176         ml = ml.Uint32(leftRequest, rightRequest)
177         return ml.MustLess()
178 }
179
180 func (p *peerRequests) Swap(i, j int) {
181         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
182 }
183
184 func (p *peerRequests) Push(x interface{}) {
185         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
186 }
187
188 func (p *peerRequests) Pop() interface{} {
189         last := len(p.requestIndexes) - 1
190         x := p.requestIndexes[last]
191         p.requestIndexes = p.requestIndexes[:last]
192         return x
193 }
194
195 func (p *Peer) getDesiredRequestState() (desired requestState) {
196         input := p.t.cl.getRequestStrategyInput()
197         requestHeap := peerRequests{
198                 peer: p,
199         }
200         for _, t := range input.Torrents {
201                 if t.InfoHash == p.t.infoHash {
202                         requestHeap.torrentStrategyInput = t
203                         break
204                 }
205         }
206         request_strategy.GetRequestablePieces(
207                 input,
208                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
209                         if t.InfoHash != p.t.infoHash {
210                                 return
211                         }
212                         if !p.peerHasPiece(pieceIndex) {
213                                 return
214                         }
215                         allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
216                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
217                                 if !allowedFast {
218                                         // We must signal interest to request this..
219                                         desired.Interested = true
220                                         if p.peerChoking && !p.actualRequestState.Requests.Contains(ci) {
221                                                 // We can't request this right now.
222                                                 return
223                                         }
224                                 }
225                                 requestHeap.requestIndexes = append(
226                                         requestHeap.requestIndexes,
227                                         p.t.pieceRequestIndexOffset(pieceIndex)+ci)
228                         })
229                 },
230         )
231         heap.Init(&requestHeap)
232         for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
233                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
234                 desired.Requests.Add(requestIndex)
235         }
236         return
237 }
238
239 func (p *Peer) applyNextRequestState() bool {
240         if p.needRequestUpdate == "" {
241                 return true
242         }
243         var more bool
244         pprof.Do(
245                 context.Background(),
246                 pprof.Labels("update request", p.needRequestUpdate),
247                 func(_ context.Context) {
248                         next := p.getDesiredRequestState()
249                         more = p.applyRequestState(next)
250                 },
251         )
252         return more
253 }
254
255 func (p *Peer) applyRequestState(next requestState) bool {
256         current := p.actualRequestState
257         if !p.setInterested(next.Interested) {
258                 return false
259         }
260         more := true
261         cancel := roaring.AndNot(&current.Requests, &next.Requests)
262         cancel.Iterate(func(req uint32) bool {
263                 more = p.cancel(req)
264                 return more
265         })
266         if !more {
267                 return false
268         }
269         next.Requests.Iterate(func(req uint32) bool {
270                 if p.cancelledRequests.Contains(req) {
271                         log.Printf("waiting for cancelled request %v", req)
272                         return false
273                 }
274                 if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
275                         log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
276                                 next.Requests.GetCardinality(),
277                                 p.cancelledRequests.GetCardinality(),
278                                 p.nominalMaxRequests(),
279                         )
280                         return false
281                 }
282                 var err error
283                 more, err = p.request(req)
284                 if err != nil {
285                         panic(err)
286                 }
287                 return more
288         })
289         if more {
290                 p.needRequestUpdate = ""
291         }
292         return more
293 }