]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement piece request ordering with retained state
authorMatt Joiner <anacrolix@gmail.com>
Wed, 1 Dec 2021 03:38:47 +0000 (14:38 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 12 Dec 2021 07:35:01 +0000 (18:35 +1100)
client.go
piece.go
request-strategy/order.go
request-strategy/piece-request-order.go [new file with mode: 0644]
requesting.go
torrent-piece-request-order.go [new file with mode: 0644]
torrent.go

index c491118b04d905413e2ec08556641d56ac774072..8a686767c7f6a8358116156b9df12a91dd51e900 100644 (file)
--- a/client.go
+++ b/client.go
@@ -27,6 +27,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        "github.com/google/btree"
@@ -74,6 +75,7 @@ type Client struct {
        dopplegangerAddrs map[string]struct{}
        badPeerIPs        map[string]struct{}
        torrents          map[InfoHash]*Torrent
+       pieceRequestOrder map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder
 
        acceptLimiter   map[ipStr]int
        dialRateLimiter *rate.Limiter
index bef5f59e3d676b92fa36bc2eedafebbe5c9f37d5..6caa76284d90f2982226149c32942d9d89c2ffc6 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
        p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+       p.t.updatePieceRequestOrder(p.index)
        p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i RequestIndex) {
        p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+       p.t.updatePieceRequestOrder(p.index)
 }
 
 func (p *Piece) numChunks() chunkIndexType {
index 91dfc74d8f5603f045a406754798131e32c6069c..cacd7f967ae5bb802e8ee30b98c113447f8a3ed9 100644 (file)
@@ -3,12 +3,14 @@ package request_strategy
 import (
        "bytes"
        "expvar"
+       "log"
        "runtime"
        "sort"
        "sync"
 
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/torrent/metainfo"
+       "github.com/google/btree"
 
        "github.com/anacrolix/torrent/types"
 )
@@ -60,6 +62,15 @@ func (me pieceSorter) Swap(i, j int) {
 func (me pieceSorter) Less(_i, _j int) bool {
        i := me.get(_i)
        j := me.get(_j)
+       return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
+}
+
+type pieceOrderInput struct {
+       PieceRequestOrderState
+       PieceRequestOrderKey
+}
+
+func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
        return multiless.New().Int(
                int(j.Priority), int(i.Priority),
        ).Bool(
@@ -67,13 +78,13 @@ func (me pieceSorter) Less(_i, _j int) bool {
        ).Int64(
                i.Availability, j.Availability,
        ).Int(
-               i.index, j.index,
+               i.Index, j.Index,
        ).Lazy(func() multiless.Computation {
                return multiless.New().Cmp(bytes.Compare(
-                       i.t.InfoHash[:],
-                       j.t.InfoHash[:],
+                       i.InfoHash[:],
+                       j.InfoHash[:],
                ))
-       }).MustLess()
+       })
 }
 
 type requestsPeer struct {
@@ -120,6 +131,15 @@ type filterPiece struct {
        *Piece
 }
 
+func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
+       ret.Partial = fp.Partial
+       ret.InfoHash = fp.t.InfoHash
+       ret.Availability = fp.Availability
+       ret.Priority = fp.Priority
+       ret.Index = fp.index
+       return
+}
+
 var (
        sortsMu sync.Mutex
        sorts   = map[*[]filterPiece][]int{}
@@ -186,12 +206,45 @@ type pieceOrderingFinalizer struct {
 }
 
 // Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
-       maxPieces := 0
-       for i := range input.Torrents {
-               maxPieces += len(input.Torrents[i].Pieces)
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
+       if false {
+               maxPieces := 0
+               for i := range input.Torrents {
+                       maxPieces += len(input.Torrents[i].Pieces)
+               }
+               pieces := make([]filterPiece, 0, maxPieces)
+               for _t := range input.Torrents {
+                       // TODO: We could do metainfo requests here.
+                       t := &input.Torrents[_t]
+                       for i := range t.Pieces {
+                               pieces = append(pieces, filterPiece{
+                                       t:     &input.Torrents[_t],
+                                       index: i,
+                                       Piece: &t.Pieces[i],
+                               })
+                       }
+               }
+               pieces = getSortedFilterPieces(pieces)
+               {
+                       if len(pieces) != pro.tree.Len() {
+                               panic("length doesn't match")
+                       }
+                       pieces := pieces
+                       pro.tree.Ascend(func(i btree.Item) bool {
+                               _i := i.(pieceRequestOrderItem)
+                               ii := pieceOrderInput{
+                                       _i.state,
+                                       _i.key,
+                               }
+                               if pieces[0].toPieceOrderInput() != ii {
+                                       panic(_i)
+                               }
+                               pieces = pieces[1:]
+                               return true
+                       })
+               }
+               log.Printf("%v pieces passed", len(pieces))
        }
-       pieces := make([]filterPiece, 0, maxPieces)
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl. A nil value means no capacity limit.
        var storageLeft *int64
@@ -199,42 +252,41 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
                storageLeft = new(int64)
                *storageLeft = *input.Capacity
        }
-       for _t := range input.Torrents {
-               // TODO: We could do metainfo requests here.
-               t := &input.Torrents[_t]
-               for i := range t.Pieces {
-                       pieces = append(pieces, filterPiece{
-                               t:     &input.Torrents[_t],
-                               index: i,
-                               Piece: &t.Pieces[i],
-                       })
-               }
-       }
-       pieces = getSortedFilterPieces(pieces)
        var allTorrentsUnverifiedBytes int64
        torrentUnverifiedBytes := map[metainfo.Hash]int64{}
-       for _, piece := range pieces {
+       pro.tree.Ascend(func(i btree.Item) bool {
+               _i := i.(pieceRequestOrderItem)
+               var piece *Piece
+               var t Torrent
+               for _, t = range input.Torrents {
+                       if t.InfoHash == _i.key.InfoHash {
+                               piece = &t.Pieces[_i.key.Index]
+                               break
+                       }
+               }
                if left := storageLeft; left != nil {
                        if *left < piece.Length {
-                               continue
+                               return true
                        }
                        *left -= piece.Length
                }
                if !piece.Request || piece.NumPendingChunks == 0 {
                        // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
                        // considered unverified and hold up further requests.
-                       continue
+
+                       return true
                }
-               if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
-                       continue
+               if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
+                       return true
                }
                if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
-                       continue
+                       return true
                }
-               torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
+               torrentUnverifiedBytes[t.InfoHash] += piece.Length
                allTorrentsUnverifiedBytes += piece.Length
-               f(piece.t, piece.Piece, piece.index)
-       }
+               f(&t, piece, _i.key.Index)
+               return true
+       })
        return
 }
 
diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go
new file mode 100644 (file)
index 0000000..efe7560
--- /dev/null
@@ -0,0 +1,88 @@
+package request_strategy
+
+import (
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/google/btree"
+)
+
+func NewPieceOrder() *PieceRequestOrder {
+       return &PieceRequestOrder{
+               tree: btree.New(32),
+               keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
+       }
+}
+
+type PieceRequestOrder struct {
+       tree *btree.BTree
+       keys map[PieceRequestOrderKey]PieceRequestOrderState
+}
+
+type PieceRequestOrderKey struct {
+       InfoHash metainfo.Hash
+       Index    int
+}
+
+type PieceRequestOrderState struct {
+       Priority     piecePriority
+       Partial      bool
+       Availability int64
+}
+
+type pieceRequestOrderItem struct {
+       key   PieceRequestOrderKey
+       state PieceRequestOrderState
+}
+
+func (me pieceRequestOrderItem) Less(other btree.Item) bool {
+       otherConcrete := other.(pieceRequestOrderItem)
+       return pieceOrderLess(
+               pieceOrderInput{
+                       PieceRequestOrderState: me.state,
+                       PieceRequestOrderKey:   me.key,
+               },
+               pieceOrderInput{
+                       PieceRequestOrderState: otherConcrete.state,
+                       PieceRequestOrderKey:   otherConcrete.key,
+               },
+       ).Less()
+}
+
+func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
+       if _, ok := me.keys[key]; ok {
+               panic(key)
+       }
+       if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+               key:   key,
+               state: state,
+       }) != nil {
+               panic("shouldn't already have this")
+       }
+       me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
+       if me.tree.Delete(me.existingItemForKey(key)) == nil {
+               panic(key)
+       }
+       if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
+               key:   key,
+               state: state,
+       }) != nil {
+               panic(key)
+       }
+       me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
+       return pieceRequestOrderItem{
+               key:   key,
+               state: me.keys[key],
+       }
+}
+
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
+       if me.tree.Delete(me.existingItemForKey(key)) == nil {
+               panic(key)
+       }
+       delete(me.keys, key)
+}
index 5ad98ea86b68ffae9eec34ef05f075357536cee7..147e574d2ac42589da4a3f93858e4d3908de6248 100644 (file)
@@ -27,9 +27,11 @@ func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input reques
                        input.Capacity = &cap
                }
        }
-       if input.Capacity == nil {
-               input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
-               return
+       if false {
+               if input.Capacity == nil {
+                       input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
+                       return
+               }
        }
        input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
        for _, t := range cl.torrents {
@@ -58,20 +60,32 @@ func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
        }
        rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
        for i := range t.pieces {
-               p := &t.pieces[i]
-               rst.Pieces = append(rst.Pieces, request_strategy.Piece{
-                       Request:           !t.ignorePieceForRequests(i),
-                       Priority:          p.purePriority(),
-                       Partial:           t.piecePartiallyDownloaded(i),
-                       Availability:      p.availability,
-                       Length:            int64(p.length()),
-                       NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
-                       IterPendingChunks: &p.undirtiedChunksIter,
-               })
+               rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
        }
        return rst
 }
 
+func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
+       return request_strategy.PieceRequestOrderState{
+               Priority:     t.piece(i).purePriority(),
+               Partial:      t.piecePartiallyDownloaded(i),
+               Availability: t.piece(i).availability,
+       }
+}
+
+func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
+       p := &t.pieces[i]
+       return request_strategy.Piece{
+               Request:           !t.ignorePieceForRequests(i),
+               Priority:          p.purePriority(),
+               Partial:           t.piecePartiallyDownloaded(i),
+               Availability:      p.availability,
+               Length:            int64(p.length()),
+               NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
+               IterPendingChunks: &p.undirtiedChunksIter,
+       }
+}
+
 func init() {
        gob.Register(peerId{})
 }
@@ -205,6 +219,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
        }
        request_strategy.GetRequestablePieces(
                input,
+               p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
                func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
                        if t.InfoHash != p.t.infoHash {
                                return
diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go
new file mode 100644 (file)
index 0000000..e1ab967
--- /dev/null
@@ -0,0 +1,7 @@
+package torrent
+
+func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+       t.cl.pieceRequestOrder[t.storage.Capacity].Update(
+               t.pieceRequestOrderKey(pieceIndex),
+               t.requestStrategyPieceOrderState(pieceIndex))
+}
index 7ccf13f680444b8fea5ba1d0a77813b1eb4e6aaf..c4ecf07f4cfb98e69af861b28c3eb6bad23a1f1c 100644 (file)
@@ -27,6 +27,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -165,6 +166,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                panic(p.availability)
        }
        p.availability--
+       t.updatePieceRequestOrder(i)
 }
 
 func (t *Torrent) incPieceAvailability(i pieceIndex) {
@@ -172,6 +174,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        if t.haveInfo() {
                p := t.piece(i)
                p.availability++
+               t.updatePieceRequestOrder(i)
        }
 }
 
@@ -424,8 +427,21 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        return nil
 }
 
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+       return request_strategy.PieceRequestOrderKey{
+               InfoHash: t.infoHash,
+               Index:    i,
+       }
+}
+
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       if t.cl.pieceRequestOrder == nil {
+               t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder)
+       }
+       if t.cl.pieceRequestOrder[t.storage.Capacity] == nil {
+               t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder()
+       }
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add availability before updating piece completion, as that may result in conns
@@ -434,6 +450,9 @@ func (t *Torrent) onSetInfo() {
                        panic(p.availability)
                }
                p.availability = int64(t.pieceAvailabilityFromPeers(i))
+               t.cl.pieceRequestOrder[t.storage.Capacity].Add(
+                       t.pieceRequestOrderKey(i),
+                       t.requestStrategyPieceOrderState(i))
                t.updatePieceCompletion(pieceIndex(i))
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
                        // t.logger.Printf("piece %s completion unknown, queueing check", p)
@@ -797,6 +816,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
        return pieceIndex(t._completedPieces.GetCardinality())
 }
 
+func (t *Torrent) deletePieceRequestOrder() {
+       for i := 0; i < t.numPieces(); i++ {
+               t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i))
+       }
+}
+
 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        t.closed.Set()
        if t.storage != nil {
@@ -816,6 +841,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        t.iterPeers(func(p *Peer) {
                p.close()
        })
+       if t.storage != nil {
+               t.deletePieceRequestOrder()
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -1102,6 +1130,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
 }
 
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+       if !t.closed.IsSet() {
+               // It would be possible to filter on pure-priority changes here to avoid churning the piece
+               // request order.
+               t.updatePieceRequestOrder(piece)
+       }
        p := &t.pieces[piece]
        newPrio := p.uncachedPriority()
        // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
@@ -1238,6 +1271,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        } else {
                t._completedPieces.Remove(x)
        }
+       p.t.updatePieceRequestOrder(piece)
        t.updateComplete()
        if complete && len(p.dirtiers) != 0 {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)