]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Remove peer piece inclination and piece request order
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "container/heap"
5         "encoding/gob"
6         "reflect"
7         "time"
8         "unsafe"
9
10         "github.com/RoaringBitmap/roaring"
11         "github.com/anacrolix/chansync/events"
12         "github.com/anacrolix/log"
13         "github.com/anacrolix/missinggo/v2/bitmap"
14         "github.com/anacrolix/multiless"
15
16         request_strategy "github.com/anacrolix/torrent/request-strategy"
17 )
18
19 // Calculate requests individually for each peer.
20 const peerRequesting = true
21
22 func (cl *Client) requester() {
23         for {
24                 update := func() events.Signaled {
25                         cl.lock()
26                         defer cl.unlock()
27                         cl.doRequests()
28                         return cl.updateRequests.Signaled()
29                 }()
30                 minWait := time.After(100 * time.Millisecond)
31                 maxWait := time.After(1000 * time.Millisecond)
32                 select {
33                 case <-cl.closed.Done():
34                         return
35                 case <-minWait:
36                 case <-maxWait:
37                 }
38                 select {
39                 case <-cl.closed.Done():
40                         return
41                 case <-update:
42                 case <-maxWait:
43                 }
44         }
45 }
46
47 func (cl *Client) tickleRequester() {
48         cl.updateRequests.Broadcast()
49 }
50
51 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
52         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
53         for _, t := range cl.torrents {
54                 if !t.haveInfo() {
55                         // This would be removed if metadata is handled here. We have to guard against not
56                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
57                         // is the same.
58                         continue
59                 }
60                 rst := request_strategy.Torrent{
61                         InfoHash:       t.infoHash,
62                         ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
63                 }
64                 if t.storage != nil {
65                         rst.Capacity = t.storage.Capacity
66                 }
67                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
68                 for i := range t.pieces {
69                         p := &t.pieces[i]
70                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
71                                 Request:           !t.ignorePieceForRequests(i),
72                                 Priority:          p.purePriority(),
73                                 Partial:           t.piecePartiallyDownloaded(i),
74                                 Availability:      p.availability,
75                                 Length:            int64(p.length()),
76                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
77                                 IterPendingChunks: p.undirtiedChunksIter(),
78                         })
79                 }
80                 t.iterPeers(func(p *Peer) {
81                         if p.closed.IsSet() {
82                                 return
83                         }
84                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
85                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
86                         }
87                         p.piecesReceivedSinceLastRequestUpdate = 0
88                         rst.Peers = append(rst.Peers, request_strategy.Peer{
89                                 Pieces:           *p.newPeerPieces(),
90                                 MaxRequests:      p.nominalMaxRequests(),
91                                 ExistingRequests: p.actualRequestState.Requests,
92                                 Choking:          p.peerChoking,
93                                 PieceAllowedFast: p.peerAllowedFast,
94                                 DownloadRate:     p.downloadRate(),
95                                 Age:              time.Since(p.completedHandshake),
96                                 Id: peerId{
97                                         Peer: p,
98                                         ptr:  uintptr(unsafe.Pointer(p)),
99                                 },
100                         })
101                 })
102                 ts = append(ts, rst)
103         }
104         return request_strategy.Input{
105                 Torrents:           ts,
106                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
107         }
108 }
109
110 func (cl *Client) doRequests() {
111         input := cl.getRequestStrategyInput()
112         nextPeerStates := request_strategy.Run(input)
113         for p, state := range nextPeerStates {
114                 setPeerNextRequestState(p, state)
115         }
116 }
117
118 func init() {
119         gob.Register(peerId{})
120 }
121
122 type peerId struct {
123         *Peer
124         ptr uintptr
125 }
126
127 func (p peerId) Uintptr() uintptr {
128         return p.ptr
129 }
130
131 func (p peerId) GobEncode() (b []byte, _ error) {
132         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
133                 Data: uintptr(unsafe.Pointer(&p.ptr)),
134                 Len:  int(unsafe.Sizeof(p.ptr)),
135                 Cap:  int(unsafe.Sizeof(p.ptr)),
136         }
137         return
138 }
139
140 func (p *peerId) GobDecode(b []byte) error {
141         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
142                 panic(len(b))
143         }
144         ptr := unsafe.Pointer(&b[0])
145         p.ptr = *(*uintptr)(ptr)
146         log.Printf("%p", ptr)
147         dst := reflect.SliceHeader{
148                 Data: uintptr(unsafe.Pointer(&p.Peer)),
149                 Len:  int(unsafe.Sizeof(p.Peer)),
150                 Cap:  int(unsafe.Sizeof(p.Peer)),
151         }
152         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
153         return nil
154 }
155
156 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
157         p := _p.(peerId).Peer
158         p.nextRequestState = rp
159         p.onNextRequestStateChanged()
160 }
161
162 type RequestIndex = request_strategy.RequestIndex
163 type chunkIndexType = request_strategy.ChunkIndex
164
165 type peerRequests struct {
166         requestIndexes       []RequestIndex
167         peer                 *Peer
168         torrentStrategyInput request_strategy.Torrent
169 }
170
171 func (p peerRequests) Len() int {
172         return len(p.requestIndexes)
173 }
174
175 func (p peerRequests) Less(i, j int) bool {
176         leftRequest := p.requestIndexes[i]
177         rightRequest := p.requestIndexes[j]
178         t := p.peer.t
179         leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
180         rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
181         leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
182         rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
183         pending := func(index RequestIndex, current bool) int {
184                 ret := t.pendingRequests[index]
185                 if current {
186                         ret--
187                 }
188                 return ret
189         }
190         ml := multiless.New()
191         ml = ml.Int(
192                 pending(leftRequest, leftCurrent),
193                 pending(rightRequest, rightCurrent))
194         ml = ml.Bool(rightCurrent, leftCurrent)
195         ml = ml.Int(
196                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
197                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
198         ml = ml.Int(
199                 int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
200                 int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
201         ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
202         ml = ml.Uint32(leftRequest, rightRequest)
203         return ml.MustLess()
204 }
205
206 func (p peerRequests) Swap(i, j int) {
207         p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
208 }
209
210 func (p *peerRequests) Push(x interface{}) {
211         p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
212 }
213
214 func (p *peerRequests) Pop() interface{} {
215         last := len(p.requestIndexes) - 1
216         x := p.requestIndexes[last]
217         p.requestIndexes = p.requestIndexes[:last]
218         return x
219 }
220
221 func (p *Peer) getDesiredRequestState() (desired requestState) {
222         input := p.t.cl.getRequestStrategyInput()
223         requestHeap := peerRequests{
224                 requestIndexes: nil,
225                 peer:           p,
226         }
227         for _, t := range input.Torrents {
228                 if t.InfoHash == p.t.infoHash {
229                         requestHeap.torrentStrategyInput = t
230                         break
231                 }
232         }
233         request_strategy.GetRequestablePieces(
234                 input,
235                 func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
236                         if t.InfoHash != p.t.infoHash {
237                                 return
238                         }
239                         if !p.peerHasPiece(pieceIndex) {
240                                 return
241                         }
242                         rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
243                                 requestHeap.requestIndexes = append(
244                                         requestHeap.requestIndexes,
245                                         p.t.pieceRequestIndexOffset(pieceIndex)+ci)
246                         })
247                 },
248         )
249         heap.Init(&requestHeap)
250         for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
251                 requestIndex := heap.Pop(&requestHeap).(RequestIndex)
252                 pieceIndex := requestIndex / p.t.chunksPerRegularPiece()
253                 allowedFast := p.peerAllowedFast.Contains(pieceIndex)
254                 if !allowedFast {
255                         desired.Interested = true
256                 }
257                 if allowedFast || !p.peerChoking {
258                         desired.Requests.Add(requestIndex)
259                 }
260         }
261         return
262 }
263
264 func (p *Peer) applyNextRequestState() bool {
265         next := p.getDesiredRequestState()
266         current := p.actualRequestState
267         if !p.setInterested(next.Interested) {
268                 return false
269         }
270         more := true
271         cancel := roaring.AndNot(&current.Requests, &next.Requests)
272         cancel.Iterate(func(req uint32) bool {
273                 more = p.cancel(req)
274                 return more
275         })
276         if !more {
277                 return false
278         }
279         next.Requests.Iterate(func(req uint32) bool {
280                 // This could happen if the peer chokes us between the next state being generated, and us
281                 // trying to transmit the state.
282                 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
283                         return true
284                 }
285                 var err error
286                 more, err = p.request(req)
287                 if err != nil {
288                         panic(err)
289                 } /* else {
290                         log.Print(req)
291                 } */
292                 return more
293         })
294         return more
295 }