]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Simplify PIece.iterUndirtiedChunks
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "time"
5         "unsafe"
6
7         "github.com/anacrolix/missinggo/v2/bitmap"
8
9         "github.com/anacrolix/chansync"
10         request_strategy "github.com/anacrolix/torrent/request-strategy"
11 )
12
13 func (cl *Client) requester() {
14         for {
15                 update := func() chansync.Signaled {
16                         cl.lock()
17                         defer cl.unlock()
18                         cl.doRequests()
19                         return cl.updateRequests.Signaled()
20                 }()
21                 select {
22                 case <-cl.closed.Done():
23                         return
24                 case <-time.After(100 * time.Millisecond):
25                 }
26                 select {
27                 case <-cl.closed.Done():
28                         return
29                 case <-update:
30                 case <-time.After(time.Second):
31                 }
32         }
33 }
34
35 func (cl *Client) tickleRequester() {
36         cl.updateRequests.Broadcast()
37 }
38
39 func (cl *Client) doRequests() {
40         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
41         for _, t := range cl.torrents {
42                 rst := request_strategy.Torrent{
43                         StableId: uintptr(unsafe.Pointer(t)),
44                 }
45                 if t.storage != nil {
46                         rst.Capacity = t.storage.Capacity
47                 }
48                 rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
49                 for i := range t.pieces {
50                         p := &t.pieces[i]
51                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
52                                 Request:           !t.ignorePieceForRequests(i),
53                                 Priority:          p.purePriority(),
54                                 Partial:           t.piecePartiallyDownloaded(i),
55                                 Availability:      p.availability,
56                                 Length:            int64(p.length()),
57                                 NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
58                                 IterPendingChunks: p.iterUndirtiedChunks,
59                         })
60                 }
61                 t.iterPeers(func(p *Peer) {
62                         if p.closed.IsSet() {
63                                 return
64                         }
65                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
66                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
67                         }
68                         p.piecesReceivedSinceLastRequestUpdate = 0
69                         rst.Peers = append(rst.Peers, request_strategy.Peer{
70                                 HasPiece:    p.peerHasPiece,
71                                 MaxRequests: p.nominalMaxRequests(),
72                                 HasExistingRequest: func(r request_strategy.Request) bool {
73                                         _, ok := p.actualRequestState.Requests[r]
74                                         return ok
75                                 },
76                                 Choking: p.peerChoking,
77                                 PieceAllowedFast: func(i pieceIndex) bool {
78                                         return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
79                                 },
80                                 DownloadRate: p.downloadRate(),
81                                 Age:          time.Since(p.completedHandshake),
82                                 Id: peerId{
83                                         Peer: p,
84                                         ptr:  uintptr(unsafe.Pointer(p)),
85                                 },
86                         })
87                 })
88                 ts = append(ts, rst)
89         }
90         nextPeerStates := request_strategy.Run(request_strategy.Input{
91                 Torrents:           ts,
92                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
93         })
94         for p, state := range nextPeerStates {
95                 setPeerNextRequestState(p, state)
96         }
97 }
98
99 type peerId struct {
100         *Peer
101         ptr uintptr
102 }
103
104 func (p peerId) Uintptr() uintptr {
105         return p.ptr
106 }
107
108 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
109         p := _p.(peerId).Peer
110         p.nextRequestState = rp
111         p.onNextRequestStateChanged()
112 }
113
114 func (p *Peer) applyNextRequestState() bool {
115         next := p.nextRequestState
116         current := p.actualRequestState
117         if !p.setInterested(next.Interested) {
118                 return false
119         }
120         for req := range current.Requests {
121                 if _, ok := next.Requests[req]; !ok {
122                         if !p.cancel(req) {
123                                 return false
124                         }
125                 }
126         }
127         for req := range next.Requests {
128                 more, err := p.request(req)
129                 if err != nil {
130                         panic(err)
131                 } /* else {
132                         log.Print(req)
133                 } */
134                 if !more {
135                         return false
136                 }
137         }
138         return true
139 }