]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Merge branch 'master' into peer-requesting
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "time"
5         "unsafe"
6
7         "github.com/anacrolix/missinggo/v2/bitmap"
8
9         "github.com/anacrolix/chansync"
10         request_strategy "github.com/anacrolix/torrent/request-strategy"
11 )
12
13 // Calculate requests individually for each peer.
14 const peerRequesting = true
15
16 func (cl *Client) requester() {
17         for {
18                 update := func() chansync.Signaled {
19                         cl.lock()
20                         defer cl.unlock()
21                         cl.doRequests()
22                         return cl.updateRequests.Signaled()
23                 }()
24                 minWait := time.After(100 * time.Millisecond)
25                 maxWait := time.After(1000 * time.Millisecond)
26                 select {
27                 case <-cl.closed.Done():
28                         return
29                 case <-minWait:
30                 case <-maxWait:
31                 }
32                 select {
33                 case <-cl.closed.Done():
34                         return
35                 case <-update:
36                 case <-maxWait:
37                 }
38         }
39 }
40
41 func (cl *Client) tickleRequester() {
42         cl.updateRequests.Broadcast()
43 }
44
45 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
46         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
47         for _, t := range cl.torrents {
48                 if !t.haveInfo() {
49                         // This would be removed if metadata is handled here. We have to guard against not
50                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
51                         // is the same.
52                         continue
53                 }
54                 rst := request_strategy.Torrent{
55                         InfoHash:       t.infoHash,
56                         ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
57                 }
58                 if t.storage != nil {
59                         rst.Capacity = t.storage.Capacity
60                 }
61                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
62                 for i := range t.pieces {
63                         p := &t.pieces[i]
64                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
65                                 Request:           !t.ignorePieceForRequests(i),
66                                 Priority:          p.purePriority(),
67                                 Partial:           t.piecePartiallyDownloaded(i),
68                                 Availability:      p.availability,
69                                 Length:            int64(p.length()),
70                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
71                                 IterPendingChunks: p.iterUndirtiedChunks,
72                         })
73                 }
74                 t.iterPeers(func(p *Peer) {
75                         if p.closed.IsSet() {
76                                 return
77                         }
78                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
79                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
80                         }
81                         p.piecesReceivedSinceLastRequestUpdate = 0
82                         rst.Peers = append(rst.Peers, request_strategy.Peer{
83                                 HasPiece:    p.peerHasPiece,
84                                 MaxRequests: p.nominalMaxRequests(),
85                                 HasExistingRequest: func(r RequestIndex) bool {
86                                         return p.actualRequestState.Requests.Contains(r)
87                                 },
88                                 Choking: p.peerChoking,
89                                 PieceAllowedFast: func(i pieceIndex) bool {
90                                         return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
91                                 },
92                                 DownloadRate: p.downloadRate(),
93                                 Age:          time.Since(p.completedHandshake),
94                                 Id: peerId{
95                                         Peer: p,
96                                         ptr:  uintptr(unsafe.Pointer(p)),
97                                 },
98                         })
99                 })
100                 ts = append(ts, rst)
101         }
102         return request_strategy.Input{
103                 Torrents:           ts,
104                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
105         }
106 }
107
108 func (cl *Client) doRequests() {
109         nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
110         for p, state := range nextPeerStates {
111                 setPeerNextRequestState(p, state)
112         }
113 }
114
115 type peerId struct {
116         *Peer
117         ptr uintptr
118 }
119
120 func (p peerId) Uintptr() uintptr {
121         return p.ptr
122 }
123
124 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
125         p := _p.(peerId).Peer
126         p.nextRequestState = rp
127         p.onNextRequestStateChanged()
128 }
129
130 type RequestIndex = request_strategy.RequestIndex
131 type chunkIndexType = request_strategy.ChunkIndex
132
133 func (p *Peer) applyNextRequestState() bool {
134         if peerRequesting {
135                 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
136                         return true
137                 }
138                 type piece struct {
139                         index   int
140                         endGame bool
141                 }
142                 var pieceOrder []piece
143                 request_strategy.GetRequestablePieces(
144                         p.t.cl.getRequestStrategyInput(),
145                         func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
146                                 if t.InfoHash != p.t.infoHash {
147                                         return
148                                 }
149                                 if !p.peerHasPiece(pieceIndex) {
150                                         return
151                                 }
152                                 pieceOrder = append(pieceOrder, piece{
153                                         index:   pieceIndex,
154                                         endGame: rsp.Priority == PiecePriorityNow,
155                                 })
156                         },
157                 )
158                 more := true
159                 interested := false
160                 for _, endGameIter := range []bool{false, true} {
161                         for _, piece := range pieceOrder {
162                                 tp := p.t.piece(piece.index)
163                                 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
164                                         req := cs + tp.requestIndexOffset()
165                                         if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
166                                                 return
167                                         }
168                                         interested = true
169                                         more = p.setInterested(true)
170                                         if !more {
171                                                 return
172                                         }
173                                         if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
174                                                 return
175                                         }
176                                         if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
177                                                 return
178                                         }
179                                         var err error
180                                         more, err = p.request(req)
181                                         if err != nil {
182                                                 panic(err)
183                                         }
184                                 })
185                                 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
186                                         break
187                                 }
188                                 if !more {
189                                         break
190                                 }
191                         }
192                         if !more {
193                                 break
194                         }
195                 }
196                 if !more {
197                         return false
198                 }
199                 if !interested {
200                         p.setInterested(false)
201                 }
202                 return more
203         }
204
205         next := p.nextRequestState
206         current := p.actualRequestState
207         if !p.setInterested(next.Interested) {
208                 return false
209         }
210         more := true
211         current.Requests.Iterate(func(req uint32) bool {
212                 if !next.Requests.Contains(req) {
213                         more = p.cancel(req)
214                         return more
215                 }
216                 return true
217         })
218         if !more {
219                 return false
220         }
221         next.Requests.Iterate(func(req uint32) bool {
222                 // This could happen if the peer chokes us between the next state being generated, and us
223                 // trying to transmit the state.
224                 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
225                         return true
226                 }
227                 var err error
228                 more, err = p.request(req)
229                 if err != nil {
230                         panic(err)
231                 } /* else {
232                         log.Print(req)
233                 } */
234                 return more
235         })
236         return more
237 }