"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
torrents map[InfoHash]*Torrent
+ pieceRequestOrder map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder
acceptLimiter map[ipStr]int
dialRateLimiter *rate.Limiter
func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+ p.t.updatePieceRequestOrder(p.index)
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+ p.t.updatePieceRequestOrder(p.index)
}
func (p *Piece) numChunks() chunkIndexType {
import (
"bytes"
"expvar"
+ "log"
"runtime"
"sort"
"sync"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
+ "github.com/google/btree"
"github.com/anacrolix/torrent/types"
)
func (me pieceSorter) Less(_i, _j int) bool {
i := me.get(_i)
j := me.get(_j)
+ return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
+}
+
+type pieceOrderInput struct {
+ PieceRequestOrderState
+ PieceRequestOrderKey
+}
+
+func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
).Int64(
i.Availability, j.Availability,
).Int(
- i.index, j.index,
+ i.Index, j.Index,
).Lazy(func() multiless.Computation {
return multiless.New().Cmp(bytes.Compare(
- i.t.InfoHash[:],
- j.t.InfoHash[:],
+ i.InfoHash[:],
+ j.InfoHash[:],
))
- }).MustLess()
+ })
}
type requestsPeer struct {
*Piece
}
+func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
+ ret.Partial = fp.Partial
+ ret.InfoHash = fp.t.InfoHash
+ ret.Availability = fp.Availability
+ ret.Priority = fp.Priority
+ ret.Index = fp.index
+ return
+}
+
var (
sortsMu sync.Mutex
sorts = map[*[]filterPiece][]int{}
}
// Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
- maxPieces := 0
- for i := range input.Torrents {
- maxPieces += len(input.Torrents[i].Pieces)
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
+ if false {
+ maxPieces := 0
+ for i := range input.Torrents {
+ maxPieces += len(input.Torrents[i].Pieces)
+ }
+ pieces := make([]filterPiece, 0, maxPieces)
+ for _t := range input.Torrents {
+ // TODO: We could do metainfo requests here.
+ t := &input.Torrents[_t]
+ for i := range t.Pieces {
+ pieces = append(pieces, filterPiece{
+ t: &input.Torrents[_t],
+ index: i,
+ Piece: &t.Pieces[i],
+ })
+ }
+ }
+ pieces = getSortedFilterPieces(pieces)
+ {
+ if len(pieces) != pro.tree.Len() {
+ panic("length doesn't match")
+ }
+ pieces := pieces
+ pro.tree.Ascend(func(i btree.Item) bool {
+ _i := i.(pieceRequestOrderItem)
+ ii := pieceOrderInput{
+ _i.state,
+ _i.key,
+ }
+ if pieces[0].toPieceOrderInput() != ii {
+ panic(_i)
+ }
+ pieces = pieces[1:]
+ return true
+ })
+ }
+ log.Printf("%v pieces passed", len(pieces))
}
- pieces := make([]filterPiece, 0, maxPieces)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
storageLeft = new(int64)
*storageLeft = *input.Capacity
}
- for _t := range input.Torrents {
- // TODO: We could do metainfo requests here.
- t := &input.Torrents[_t]
- for i := range t.Pieces {
- pieces = append(pieces, filterPiece{
- t: &input.Torrents[_t],
- index: i,
- Piece: &t.Pieces[i],
- })
- }
- }
- pieces = getSortedFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
torrentUnverifiedBytes := map[metainfo.Hash]int64{}
- for _, piece := range pieces {
+ pro.tree.Ascend(func(i btree.Item) bool {
+ _i := i.(pieceRequestOrderItem)
+ var piece *Piece
+ var t Torrent
+ for _, t = range input.Torrents {
+ if t.InfoHash == _i.key.InfoHash {
+ piece = &t.Pieces[_i.key.Index]
+ break
+ }
+ }
if left := storageLeft; left != nil {
if *left < piece.Length {
- continue
+ return true
}
*left -= piece.Length
}
if !piece.Request || piece.NumPendingChunks == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
- continue
+
+ return true
}
- if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
- continue
+ if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
+ return true
}
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
- continue
+ return true
}
- torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
+ torrentUnverifiedBytes[t.InfoHash] += piece.Length
allTorrentsUnverifiedBytes += piece.Length
- f(piece.t, piece.Piece, piece.index)
- }
+ f(&t, piece, _i.key.Index)
+ return true
+ })
return
}
--- /dev/null
+package request_strategy
+
+import (
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/google/btree"
+)
+
+func NewPieceOrder() *PieceRequestOrder {
+ return &PieceRequestOrder{
+ tree: btree.New(32),
+ keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
+ }
+}
+
+type PieceRequestOrder struct {
+ tree *btree.BTree
+ keys map[PieceRequestOrderKey]PieceRequestOrderState
+}
+
+type PieceRequestOrderKey struct {
+ InfoHash metainfo.Hash
+ Index int
+}
+
+type PieceRequestOrderState struct {
+ Priority piecePriority
+ Partial bool
+ Availability int64
+}
+
+type pieceRequestOrderItem struct {
+ key PieceRequestOrderKey
+ state PieceRequestOrderState
+}
+
+func (me pieceRequestOrderItem) Less(other btree.Item) bool {
+ otherConcrete := other.(pieceRequestOrderItem)
+ return pieceOrderLess(
+ pieceOrderInput{
+ PieceRequestOrderState: me.state,
+ PieceRequestOrderKey: me.key,
+ },
+ pieceOrderInput{
+ PieceRequestOrderState: otherConcrete.state,
+ PieceRequestOrderKey: otherConcrete.key,
+ },
+ ).Less()
+}
+
+func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
+ if _, ok := me.keys[key]; ok {
+ panic(key)
+ }
+ if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+ key: key,
+ state: state,
+ }) != nil {
+ panic("shouldn't already have this")
+ }
+ me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
+ if me.tree.Delete(me.existingItemForKey(key)) == nil {
+ panic(key)
+ }
+ if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+ key: key,
+ state: state,
+ }) != nil {
+ panic(key)
+ }
+ me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
+ return pieceRequestOrderItem{
+ key: key,
+ state: me.keys[key],
+ }
+}
+
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
+ if me.tree.Delete(me.existingItemForKey(key)) == nil {
+ panic(key)
+ }
+ delete(me.keys, key)
+}
input.Capacity = &cap
}
}
- if input.Capacity == nil {
- input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
- return
+ if false {
+ if input.Capacity == nil {
+ input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
+ return
+ }
}
input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
}
rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
for i := range t.pieces {
- p := &t.pieces[i]
- rst.Pieces = append(rst.Pieces, request_strategy.Piece{
- Request: !t.ignorePieceForRequests(i),
- Priority: p.purePriority(),
- Partial: t.piecePartiallyDownloaded(i),
- Availability: p.availability,
- Length: int64(p.length()),
- NumPendingChunks: int(t.pieceNumPendingChunks(i)),
- IterPendingChunks: &p.undirtiedChunksIter,
- })
+ rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
}
return rst
}
+func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
+ return request_strategy.PieceRequestOrderState{
+ Priority: t.piece(i).purePriority(),
+ Partial: t.piecePartiallyDownloaded(i),
+ Availability: t.piece(i).availability,
+ }
+}
+
+func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
+ p := &t.pieces[i]
+ return request_strategy.Piece{
+ Request: !t.ignorePieceForRequests(i),
+ Priority: p.purePriority(),
+ Partial: t.piecePartiallyDownloaded(i),
+ Availability: p.availability,
+ Length: int64(p.length()),
+ NumPendingChunks: int(t.pieceNumPendingChunks(i)),
+ IterPendingChunks: &p.undirtiedChunksIter,
+ }
+}
+
func init() {
gob.Register(peerId{})
}
}
request_strategy.GetRequestablePieces(
input,
+ p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
if t.InfoHash != p.t.infoHash {
return
--- /dev/null
+package torrent
+
+func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+ t.cl.pieceRequestOrder[t.storage.Capacity].Update(
+ t.pieceRequestOrderKey(pieceIndex),
+ t.requestStrategyPieceOrderState(pieceIndex))
+}
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
panic(p.availability)
}
p.availability--
+ t.updatePieceRequestOrder(i)
}
func (t *Torrent) incPieceAvailability(i pieceIndex) {
if t.haveInfo() {
p := t.piece(i)
p.availability++
+ t.updatePieceRequestOrder(i)
}
}
return nil
}
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+ return request_strategy.PieceRequestOrderKey{
+ InfoHash: t.infoHash,
+ Index: i,
+ }
+}
+
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ if t.cl.pieceRequestOrder == nil {
+ t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder)
+ }
+ if t.cl.pieceRequestOrder[t.storage.Capacity] == nil {
+ t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder()
+ }
for i := range t.pieces {
p := &t.pieces[i]
// Need to add availability before updating piece completion, as that may result in conns
panic(p.availability)
}
p.availability = int64(t.pieceAvailabilityFromPeers(i))
+ t.cl.pieceRequestOrder[t.storage.Capacity].Add(
+ t.pieceRequestOrderKey(i),
+ t.requestStrategyPieceOrderState(i))
t.updatePieceCompletion(pieceIndex(i))
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p)
return pieceIndex(t._completedPieces.GetCardinality())
}
+func (t *Torrent) deletePieceRequestOrder() {
+ for i := 0; i < t.numPieces(); i++ {
+ t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i))
+ }
+}
+
func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
t.closed.Set()
if t.storage != nil {
t.iterPeers(func(p *Peer) {
p.close()
})
+ if t.storage != nil {
+ t.deletePieceRequestOrder()
+ }
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
}
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+ if !t.closed.IsSet() {
+ // It would be possible to filter on pure-priority changes here to avoid churning the piece
+ // request order.
+ t.updatePieceRequestOrder(piece)
+ }
p := &t.pieces[piece]
newPrio := p.uncachedPriority()
// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
} else {
t._completedPieces.Remove(x)
}
+ p.t.updatePieceRequestOrder(piece)
t.updateComplete()
if complete && len(p.dirtiers) != 0 {
t.logger.Printf("marked piece %v complete but still has dirtiers", piece)