ChunkSpec = types.ChunkSpec
)
-type ClientPieceOrder struct {
- pieces []pieceRequestOrderPiece
-}
+type ClientPieceOrder struct{}
-type orderTorrent struct {
- *Torrent
+type filterTorrent struct {
+ Torrent
unverifiedBytes int64
// Potentially shared with other torrents.
storageLeft *int64
- peers []*requestsPeer
-}
-
-type pieceRequestOrderPiece struct {
- t *orderTorrent
- index pieceIndex
- Piece
}
-func (me *ClientPieceOrder) Len() int {
- return len(me.pieces)
-}
-
-func (me ClientPieceOrder) sort() {
- sort.Slice(me.pieces, me.less)
-}
-
-func (me ClientPieceOrder) less(_i, _j int) bool {
- i := me.pieces[_i]
- j := me.pieces[_j]
- return multiless.New().Int(
- int(j.Priority), int(i.Priority),
- ).Bool(
- j.Partial, i.Partial,
- ).Int64(
- i.Availability, j.Availability,
- ).Int(
- i.index, j.index,
- ).Uintptr(
- i.t.StableId, j.t.StableId,
- ).MustLess()
+func sortFilterPieces(pieces []filterPiece) {
+ sort.Slice(pieces, func(_i, _j int) bool {
+ i := pieces[_i]
+ j := pieces[_j]
+ return multiless.New().Int(
+ int(j.Priority), int(i.Priority),
+ ).Bool(
+ j.Partial, i.Partial,
+ ).Int64(
+ i.Availability, j.Availability,
+ ).Int(
+ i.index, j.index,
+ ).Uintptr(
+ i.t.StableId, j.t.StableId,
+ ).MustLess()
+ })
}
type requestsPeer struct {
me.requestsInPiece++
}
-func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
- requestOrder.pieces = requestOrder.pieces[:0]
+type requestablePiece struct {
+ index pieceIndex
+ t Torrent
+ NumPendingChunks int
+ IterPendingChunks ChunksIter
+}
+
+type filterPiece struct {
+ t *filterTorrent
+ index pieceIndex
+ Piece
+}
+
+func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
- orderTorrents := make([]*orderTorrent, 0, len(torrents))
+ var pieces []filterPiece
for _, _t := range torrents {
// TODO: We could do metainfo requests here.
- t := &orderTorrent{
+ t := &filterTorrent{
Torrent: _t,
unverifiedBytes: 0,
}
}
t.storageLeft = storageLeft[key]
}
- var peers []*requestsPeer
- for _, p := range t.Peers {
- peers = append(peers, &requestsPeer{
- Peer: p,
- nextState: PeerNextRequestState{
- Requests: make(map[Request]struct{}),
- },
- })
- }
for i, tp := range t.Pieces {
- requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+ pieces = append(pieces, filterPiece{
t: t,
index: i,
Piece: tp,
})
- if tp.Request && tp.NumPendingChunks != 0 {
- for _, p := range peers {
- if p.canRequestPiece(i) {
- p.requestablePiecesRemaining++
- }
- }
- }
}
- t.peers = peers
- orderTorrents = append(orderTorrents, t)
}
- requestOrder.sort()
- for _, piece := range requestOrder.pieces {
+ sortFilterPieces(pieces)
+ for _, piece := range pieces {
if left := piece.t.storageLeft; left != nil {
if *left < int64(piece.Length) {
continue
continue
}
if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
- //log.Print("skipping piece")
continue
}
- allocatePendingChunks(piece, piece.t.peers)
piece.t.unverifiedBytes += piece.Length
- //log.Print(piece.t.unverifiedBytes)
+ ret = append(ret, requestablePiece{
+ index: piece.index,
+ t: piece.t.Torrent,
+ NumPendingChunks: piece.NumPendingChunks,
+ IterPendingChunks: piece.iterPendingChunksWrapper,
+ })
+ }
+ return
+}
+
+// TODO: We could do metainfo requests here.
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
+ requestPieces := getRequestablePieces(torrents)
+ allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
+ for _, t := range torrents {
+ peers := make([]*requestsPeer, 0, len(t.Peers))
+ for _, p := range t.Peers {
+ peers = append(peers, &requestsPeer{
+ Peer: p,
+ nextState: PeerNextRequestState{
+ Requests: make(map[Request]struct{}),
+ },
+ })
+ }
+ allPeers[t.StableId] = peers
+ }
+ for _, piece := range requestPieces {
+ for _, peer := range allPeers[piece.t.StableId] {
+ if peer.canRequestPiece(piece.index) {
+ peer.requestablePiecesRemaining++
+ }
+ }
+ }
+ for _, piece := range requestPieces {
+ allocatePendingChunks(piece, allPeers[piece.t.StableId])
}
ret := make(map[PeerId]PeerNextRequestState)
- for _, ots := range orderTorrents {
- for _, rp := range ots.peers {
+ for _, peers := range allPeers {
+ for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
return ret
}
-func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
+func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
for _, peer := range peers {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
})
}
preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
- p.iterPendingChunksWrapper(func(spec ChunkSpec) {
+ p.IterPendingChunks(func(spec ChunkSpec) {
req := Request{pp.Integer(p.index), spec}
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
}
})
pendingChunksRemaining := int(p.NumPendingChunks)
- p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
+ p.IterPendingChunks(func(chunk types.ChunkSpec) {
if _, ok := preallocated[chunk]; ok {
return
}
}
}
peer.addNextRequest(req)
- return
+ break
}
})
chunk:
for chunk, prePeer := range preallocated {
+ pendingChunksRemaining--
req := Request{pp.Integer(p.index), chunk}
prePeer.requestsInPiece--
sortPeersForPiece(&req)
continue
}
}
- pendingChunksRemaining--
peer.addNextRequest(req)
continue chunk
}