]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Update chansync
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "encoding/gob"
5         "reflect"
6         "time"
7         "unsafe"
8
9         "github.com/RoaringBitmap/roaring"
10         "github.com/anacrolix/chansync/events"
11         "github.com/anacrolix/log"
12         "github.com/anacrolix/missinggo/v2/bitmap"
13
14         request_strategy "github.com/anacrolix/torrent/request-strategy"
15 )
16
17 // Calculate requests individually for each peer.
18 const peerRequesting = false
19
20 func (cl *Client) requester() {
21         for {
22                 update := func() events.Signaled {
23                         cl.lock()
24                         defer cl.unlock()
25                         cl.doRequests()
26                         return cl.updateRequests.Signaled()
27                 }()
28                 minWait := time.After(100 * time.Millisecond)
29                 maxWait := time.After(1000 * time.Millisecond)
30                 select {
31                 case <-cl.closed.Done():
32                         return
33                 case <-minWait:
34                 case <-maxWait:
35                 }
36                 select {
37                 case <-cl.closed.Done():
38                         return
39                 case <-update:
40                 case <-maxWait:
41                 }
42         }
43 }
44
45 func (cl *Client) tickleRequester() {
46         cl.updateRequests.Broadcast()
47 }
48
49 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
50         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
51         for _, t := range cl.torrents {
52                 if !t.haveInfo() {
53                         // This would be removed if metadata is handled here. We have to guard against not
54                         // knowing the piece size. If we have no info, we have no pieces too, so the end result
55                         // is the same.
56                         continue
57                 }
58                 rst := request_strategy.Torrent{
59                         InfoHash:       t.infoHash,
60                         ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
61                 }
62                 if t.storage != nil {
63                         rst.Capacity = t.storage.Capacity
64                 }
65                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
66                 for i := range t.pieces {
67                         p := &t.pieces[i]
68                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
69                                 Request:           !t.ignorePieceForRequests(i),
70                                 Priority:          p.purePriority(),
71                                 Partial:           t.piecePartiallyDownloaded(i),
72                                 Availability:      p.availability,
73                                 Length:            int64(p.length()),
74                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
75                                 IterPendingChunks: p.undirtiedChunksIter(),
76                         })
77                 }
78                 t.iterPeers(func(p *Peer) {
79                         if p.closed.IsSet() {
80                                 return
81                         }
82                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
83                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
84                         }
85                         p.piecesReceivedSinceLastRequestUpdate = 0
86                         rst.Peers = append(rst.Peers, request_strategy.Peer{
87                                 Pieces:           *p.newPeerPieces(),
88                                 MaxRequests:      p.nominalMaxRequests(),
89                                 ExistingRequests: p.actualRequestState.Requests,
90                                 Choking:          p.peerChoking,
91                                 PieceAllowedFast: p.peerAllowedFast,
92                                 DownloadRate:     p.downloadRate(),
93                                 Age:              time.Since(p.completedHandshake),
94                                 Id: peerId{
95                                         Peer: p,
96                                         ptr:  uintptr(unsafe.Pointer(p)),
97                                 },
98                         })
99                 })
100                 ts = append(ts, rst)
101         }
102         return request_strategy.Input{
103                 Torrents:           ts,
104                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
105         }
106 }
107
108 func (cl *Client) doRequests() {
109         input := cl.getRequestStrategyInput()
110         nextPeerStates := request_strategy.Run(input)
111         for p, state := range nextPeerStates {
112                 setPeerNextRequestState(p, state)
113         }
114 }
115
116 func init() {
117         gob.Register(peerId{})
118 }
119
120 type peerId struct {
121         *Peer
122         ptr uintptr
123 }
124
125 func (p peerId) Uintptr() uintptr {
126         return p.ptr
127 }
128
129 func (p peerId) GobEncode() (b []byte, _ error) {
130         *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
131                 Data: uintptr(unsafe.Pointer(&p.ptr)),
132                 Len:  int(unsafe.Sizeof(p.ptr)),
133                 Cap:  int(unsafe.Sizeof(p.ptr)),
134         }
135         return
136 }
137
138 func (p *peerId) GobDecode(b []byte) error {
139         if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
140                 panic(len(b))
141         }
142         ptr := unsafe.Pointer(&b[0])
143         p.ptr = *(*uintptr)(ptr)
144         log.Printf("%p", ptr)
145         dst := reflect.SliceHeader{
146                 Data: uintptr(unsafe.Pointer(&p.Peer)),
147                 Len:  int(unsafe.Sizeof(p.Peer)),
148                 Cap:  int(unsafe.Sizeof(p.Peer)),
149         }
150         copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
151         return nil
152 }
153
154 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
155         p := _p.(peerId).Peer
156         p.nextRequestState = rp
157         p.onNextRequestStateChanged()
158 }
159
160 type RequestIndex = request_strategy.RequestIndex
161 type chunkIndexType = request_strategy.ChunkIndex
162
163 func (p *Peer) applyNextRequestState() bool {
164         if peerRequesting {
165                 if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
166                         return true
167                 }
168                 type piece struct {
169                         index   int
170                         endGame bool
171                 }
172                 var pieceOrder []piece
173                 request_strategy.GetRequestablePieces(
174                         p.t.cl.getRequestStrategyInput(),
175                         func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
176                                 if t.InfoHash != p.t.infoHash {
177                                         return
178                                 }
179                                 if !p.peerHasPiece(pieceIndex) {
180                                         return
181                                 }
182                                 pieceOrder = append(pieceOrder, piece{
183                                         index:   pieceIndex,
184                                         endGame: rsp.Priority == PiecePriorityNow,
185                                 })
186                         },
187                 )
188                 more := true
189                 interested := false
190                 for _, endGameIter := range []bool{false, true} {
191                         for _, piece := range pieceOrder {
192                                 tp := p.t.piece(piece.index)
193                                 tp.iterUndirtiedChunks(func(cs chunkIndexType) {
194                                         req := cs + tp.requestIndexOffset()
195                                         if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
196                                                 return
197                                         }
198                                         interested = true
199                                         more = p.setInterested(true)
200                                         if !more {
201                                                 return
202                                         }
203                                         if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
204                                                 return
205                                         }
206                                         if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
207                                                 return
208                                         }
209                                         var err error
210                                         more, err = p.request(req)
211                                         if err != nil {
212                                                 panic(err)
213                                         }
214                                 })
215                                 if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
216                                         break
217                                 }
218                                 if !more {
219                                         break
220                                 }
221                         }
222                         if !more {
223                                 break
224                         }
225                 }
226                 if !more {
227                         return false
228                 }
229                 if !interested {
230                         p.setInterested(false)
231                 }
232                 return more
233         }
234
235         next := p.nextRequestState
236         current := p.actualRequestState
237         if !p.setInterested(next.Interested) {
238                 return false
239         }
240         more := true
241         cancel := roaring.AndNot(&current.Requests, &next.Requests)
242         cancel.Iterate(func(req uint32) bool {
243                 more = p.cancel(req)
244                 return more
245         })
246         if !more {
247                 return false
248         }
249         next.Requests.Iterate(func(req uint32) bool {
250                 // This could happen if the peer chokes us between the next state being generated, and us
251                 // trying to transmit the state.
252                 if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) {
253                         return true
254                 }
255                 var err error
256                 more, err = p.request(req)
257                 if err != nil {
258                         panic(err)
259                 } /* else {
260                         log.Print(req)
261                 } */
262                 return more
263         })
264         return more
265 }