]> Sergey Matveev's repositories - btrtrc.git/blob - request-strategy.go
Pass tests with new full-client request strategy implementation
[btrtrc.git] / request-strategy.go
1 package torrent
2
3 import (
4         "sort"
5         "time"
6
7         "github.com/anacrolix/log"
8         "github.com/anacrolix/multiless"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/bradfitz/iter"
11 )
12
13 type clientPieceRequestOrder struct {
14         pieces []pieceRequestOrderPiece
15 }
16
17 type pieceRequestOrderPiece struct {
18         t            *Torrent
19         index        pieceIndex
20         prio         piecePriority
21         partial      bool
22         availability int
23 }
24
25 func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
26         for i := range iter.N(numPieces) {
27                 me.pieces = append(me.pieces, pieceRequestOrderPiece{
28                         t:     t,
29                         index: i,
30                 })
31         }
32 }
33
34 func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
35         newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
36         for _, p := range me.pieces {
37                 if p.t != t {
38                         newPieces = append(newPieces, p)
39                 }
40         }
41         me.pieces = newPieces
42 }
43
44 func (me clientPieceRequestOrder) sort() {
45         sort.SliceStable(me.pieces, me.less)
46 }
47
48 func (me clientPieceRequestOrder) update() {
49         for i := range me.pieces {
50                 p := &me.pieces[i]
51                 p.prio = p.t.piece(p.index).uncachedPriority()
52                 p.partial = p.t.piecePartiallyDownloaded(p.index)
53                 p.availability = p.t.pieceAvailability(p.index)
54         }
55 }
56
57 func (me clientPieceRequestOrder) less(_i, _j int) bool {
58         i := me.pieces[_i]
59         j := me.pieces[_j]
60         ml := multiless.New()
61         ml.Int(int(j.prio), int(i.prio))
62         ml.Bool(j.partial, i.partial)
63         ml.Int(i.availability, j.availability)
64         return ml.Less()
65 }
66
67 func (cl *Client) requester() {
68         for {
69                 func() {
70                         cl.lock()
71                         defer cl.unlock()
72                         cl.doRequests()
73                 }()
74                 select {
75                 case <-cl.closed.LockedChan(cl.locker()):
76                         return
77                 case <-time.After(10 * time.Millisecond):
78                 }
79         }
80 }
81
82 func (cl *Client) doRequests() {
83         requestOrder := clientPieceRequestOrder{}
84         allPeers := make(map[*Torrent][]*Peer)
85         storageCapacity := make(map[*Torrent]*int64)
86         for _, t := range cl.torrents {
87                 // TODO: We could do metainfo requests here.
88                 if t.haveInfo() {
89                         value := int64(t.usualPieceSize())
90                         storageCapacity[t] = &value
91                         requestOrder.addPieces(t, t.numPieces())
92                 }
93                 var peers []*Peer
94                 t.iterPeers(func(p *Peer) {
95                         peers = append(peers, p)
96                 })
97                 allPeers[t] = peers
98         }
99         requestOrder.update()
100         requestOrder.sort()
101         for _, p := range requestOrder.pieces {
102                 if p.t.ignorePieceForRequests(p.index) {
103                         continue
104                 }
105                 peers := allPeers[p.t]
106                 torrentPiece := p.t.piece(p.index)
107                 if left := storageCapacity[p.t]; left != nil {
108                         if *left < int64(torrentPiece.length()) {
109                                 continue
110                         }
111                         *left -= int64(torrentPiece.length())
112                 }
113                 p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
114                         for _, peer := range peers {
115                                 req := Request{pp.Integer(p.index), chunk}
116                                 _, err := peer.request(req)
117                                 if err == nil {
118                                         log.Printf("requested %v", req)
119                                         break
120                                 }
121                         }
122                         return true
123                 })
124         }
125         for _, t := range cl.torrents {
126                 t.iterPeers(func(p *Peer) {
127                         if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
128                                 p.setInterested(false)
129                         }
130                 })
131         }
132 }
133
134 //func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
135 //      chunkIndices := p.dirtyChunks().Copy()
136 //      chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
137 //      return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
138 //              ci, err := chunkIndices.RB.Select(uint32(i))
139 //              if err != nil {
140 //                      panic(err)
141 //              }
142 //              return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
143 //      })
144 //}
145
146 //
147 //func iterUnbiasedPieceRequestOrder(
148 //      cn requestStrategyConnection,
149 //      f func(piece pieceIndex) bool,
150 //      pieceRequestOrder []pieceIndex,
151 //) bool {
152 //      cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
153 //      for _, i := range pieceRequestOrder {
154 //              if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
155 //                      continue
156 //              }
157 //              if !f(i) {
158 //                      return false
159 //              }
160 //      }
161 //      return true
162 //}