]> Sergey Matveev's repositories - btrtrc.git/blob - requesting.go
Rename file
[btrtrc.git] / requesting.go
1 package torrent
2
3 import (
4         "time"
5         "unsafe"
6
7         "github.com/anacrolix/missinggo/v2/bitmap"
8
9         "github.com/anacrolix/torrent/internal/chansync"
10         request_strategy "github.com/anacrolix/torrent/request-strategy"
11         "github.com/anacrolix/torrent/types"
12 )
13
14 func (cl *Client) requester() {
15         for {
16                 update := func() chansync.Signaled {
17                         cl.lock()
18                         defer cl.unlock()
19                         cl.doRequests()
20                         return cl.updateRequests.Signaled()
21                 }()
22                 // We can probably tune how often to heed this signal. TODO: Currently disabled to retain
23                 // existing behaviour, while the signalling is worked out.
24                 update = nil
25                 select {
26                 case <-cl.closed.LockedChan(cl.locker()):
27                         return
28                 case <-update:
29                 case <-time.After(100 * time.Millisecond):
30                 }
31         }
32 }
33
34 func (cl *Client) tickleRequester() {
35         cl.updateRequests.Broadcast()
36 }
37
38 func (cl *Client) doRequests() {
39         ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
40         for _, t := range cl.torrents {
41                 rst := request_strategy.Torrent{
42                         StableId: uintptr(unsafe.Pointer(t)),
43                 }
44                 if t.storage != nil {
45                         rst.Capacity = t.storage.Capacity
46                 }
47                 for i := range t.pieces {
48                         p := &t.pieces[i]
49                         rst.Pieces = append(rst.Pieces, request_strategy.Piece{
50                                 Request:          !t.ignorePieceForRequests(i),
51                                 Priority:         p.purePriority(),
52                                 Partial:          t.piecePartiallyDownloaded(i),
53                                 Availability:     p.availability,
54                                 Length:           int64(p.length()),
55                                 NumPendingChunks: int(t.pieceNumPendingChunks(i)),
56                                 IterPendingChunks: func(f func(types.ChunkSpec)) {
57                                         p.iterUndirtiedChunks(func(cs ChunkSpec) bool {
58                                                 f(cs)
59                                                 return true
60                                         })
61                                 },
62                         })
63                 }
64                 t.iterPeers(func(p *Peer) {
65                         if p.closed.IsSet() {
66                                 return
67                         }
68                         if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
69                                 p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
70                         }
71                         p.piecesReceivedSinceLastRequestUpdate = 0
72                         rst.Peers = append(rst.Peers, request_strategy.Peer{
73                                 HasPiece:    p.peerHasPiece,
74                                 MaxRequests: p.nominalMaxRequests(),
75                                 HasExistingRequest: func(r request_strategy.Request) bool {
76                                         _, ok := p.actualRequestState.Requests[r]
77                                         return ok
78                                 },
79                                 Choking: p.peerChoking,
80                                 PieceAllowedFast: func(i pieceIndex) bool {
81                                         return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
82                                 },
83                                 DownloadRate: p.downloadRate(),
84                                 Age:          time.Since(p.completedHandshake),
85                                 Id:           (*peerId)(p),
86                         })
87                 })
88                 ts = append(ts, rst)
89         }
90         nextPeerStates := request_strategy.Run(request_strategy.Input{
91                 Torrents:           ts,
92                 MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
93         })
94         for p, state := range nextPeerStates {
95                 setPeerNextRequestState(p, state)
96         }
97 }
98
99 type peerId Peer
100
101 func (p *peerId) Uintptr() uintptr {
102         return uintptr(unsafe.Pointer(p))
103 }
104
105 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
106         p := (*Peer)(_p.(*peerId))
107         p.nextRequestState = rp
108         p.onNextRequestStateChanged()
109 }
110
111 func (p *Peer) applyNextRequestState() bool {
112         next := p.nextRequestState
113         current := p.actualRequestState
114         if !p.setInterested(next.Interested) {
115                 return false
116         }
117         for req := range current.Requests {
118                 if _, ok := next.Requests[req]; !ok {
119                         if !p.cancel(req) {
120                                 return false
121                         }
122                 }
123         }
124         for req := range next.Requests {
125                 more, err := p.request(req)
126                 if err != nil {
127                         panic(err)
128                 } else {
129                         //log.Print(req)
130                 }
131                 if !more {
132                         return false
133                 }
134         }
135         return true
136 }