]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Export request_strategy.GetRequestablePieces
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "time"
5         "unsafe"
6
7         "github.com/anacrolix/missinggo/v2/bitmap"
8
9         "github.com/anacrolix/chansync"
10         request_strategy "github.com/anacrolix/torrent/request-strategy"
11 )
12
13 func (cl *Client) requester() {
14         for {
15                 update := func() chansync.Signaled {
16                         cl.lock()
17                         defer cl.unlock()
18                         cl.doRequests()
19                         return cl.updateRequests.Signaled()
20                 }()
21                 minWait := time.After(100 * time.Millisecond)
22                 maxWait := time.After(1000 * time.Millisecond)
23                 select {
24                 case <-cl.closed.Done():
25                         return
26                 case <-minWait:
27                 case <-maxWait:
28                 }
29                 select {
30                 case <-cl.closed.Done():
31                         return
32                 case <-update:
33                 case <-maxWait:
34                 }
35         }
36 }
37
38 func (cl *Client) tickleRequester() {
39         cl.updateRequests.Broadcast()
40 }
41
42 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
43         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
44         for _, t := range cl.torrents {
45                 rst := request_strategy.Torrent{
46                         StableId: uintptr(unsafe.Pointer(t)),
47                 }
48                 if t.storage != nil {
49                         rst.Capacity = t.storage.Capacity
50                 }
51                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
52                 for i := range t.pieces {
53                         p := &t.pieces[i]
54                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
55                                 Request:           !t.ignorePieceForRequests(i),
56                                 Priority:          p.purePriority(),
57                                 Partial:           t.piecePartiallyDownloaded(i),
58                                 Availability:      p.availability,
59                                 Length:            int64(p.length()),
60                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
61                                 IterPendingChunks: p.iterUndirtiedChunks,
62                         })
63                 }
64                 t.iterPeers(func(p *Peer) {
65                         if p.closed.IsSet() {
66                                 return
67                         }
68                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
69                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
70                         }
71                         p.piecesReceivedSinceLastRequestUpdate = 0
72                         rst.Peers = append(rst.Peers, request_strategy.Peer{
73                                 HasPiece:    p.peerHasPiece,
74                                 MaxRequests: p.nominalMaxRequests(),
75                                 HasExistingRequest: func(r request_strategy.Request) bool {
76                                         _, ok := p.actualRequestState.Requests[r]
77                                         return ok
78                                 },
79                                 Choking: p.peerChoking,
80                                 PieceAllowedFast: func(i pieceIndex) bool {
81                                         return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
82                                 },
83                                 DownloadRate: p.downloadRate(),
84                                 Age:          time.Since(p.completedHandshake),
85                                 Id: peerId{
86                                         Peer: p,
87                                         ptr:  uintptr(unsafe.Pointer(p)),
88                                 },
89                         })
90                 })
91                 ts = append(ts, rst)
92         }
93         return request_strategy.Input{
94                 Torrents:           ts,
95                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
96         }
97 }
98
99 func (cl *Client) doRequests() {
100         nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
101         for p, state := range nextPeerStates {
102                 setPeerNextRequestState(p, state)
103         }
104 }
105
106 type peerId struct {
107         *Peer
108         ptr uintptr
109 }
110
111 func (p peerId) Uintptr() uintptr {
112         return p.ptr
113 }
114
115 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
116         p := _p.(peerId).Peer
117         p.nextRequestState = rp
118         p.onNextRequestStateChanged()
119 }
120
121 func (p *Peer) applyNextRequestState() bool {
122         next := p.nextRequestState
123         current := p.actualRequestState
124         if !p.setInterested(next.Interested) {
125                 return false
126         }
127         for req := range current.Requests {
128                 if _, ok := next.Requests[req]; !ok {
129                         if !p.cancel(req) {
130                                 return false
131                         }
132                 }
133         }
134         for req := range next.Requests {
135                 more, err := p.request(req)
136                 if err != nil {
137                         panic(err)
138                 } /* else {
139                         log.Print(req)
140                 } */
141                 if !more {
142                         return false
143                 }
144         }
145         return true
146 }