]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rip out the pieces by bytes left and responsive download strategy stuff
authorMatt Joiner <anacrolix@gmail.com>
Fri, 26 Dec 2014 06:15:17 +0000 (17:15 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 26 Dec 2014 06:15:17 +0000 (17:15 +1100)
client.go
download_strategies.go
fs/torrentfs_test.go
torrent.go

index 59b85b7cd76123d231fadbf9d9b9d3dcacdf68bd..470a5f97dbc4432de1ea9f430eb04fcafeecd08e 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1969,14 +1969,7 @@ func (cl *Client) allTorrentsCompleted() bool {
                if !t.haveInfo() {
                        return false
                }
-               for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-                       i := e.Value.(int)
-                       if t.Pieces[i].Complete() {
-                               continue
-                       }
-                       // If the piece isn't complete, make sure it's not because it's
-                       // never been hashed.
-                       cl.queueFirstHash(t, i)
+               if t.NumPiecesCompleted() != t.NumPieces() {
                        return false
                }
        }
@@ -2043,7 +2036,6 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                }
                me.queuePieceCheck(t, req.Index)
        }
-       t.PieceBytesLeftChanged(int(req.Index))
 
        // Unprioritize the chunk.
        me.downloadStrategy.TorrentGotChunk(t, req)
@@ -2124,7 +2116,6 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                        me.openNewConns(t)
                }
        }
-       t.PieceBytesLeftChanged(int(piece))
        for _, conn := range t.Conns {
                if correct {
                        conn.Post(pp.Message{
index d3f38e98e4a70ad931936a81bcac53e22c65969d..275b57e3fb510d427e219d6667f693cf189b0dfb 100644 (file)
@@ -1,10 +1,7 @@
 package torrent
 
 import (
-       "container/heap"
-       "fmt"
        "io"
-       "math/rand"
 
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
 )
@@ -75,266 +72,3 @@ func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
 func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {}
 func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int)      {}
 func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
-
-func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy {
-       return &responsiveDownloadStrategy{
-               Readahead:      readahead,
-               lastReadOffset: make(map[*torrent]int64),
-               priorities:     make(map[*torrent]map[request]struct{}),
-               requestHeat:    make(map[*torrent]map[request]int),
-               rand:           rand.New(rand.NewSource(1337)),
-       }
-}
-
-type responsiveDownloadStrategy struct {
-       // How many bytes to preemptively download starting at the beginning of
-       // the last piece read for a given torrent.
-       Readahead      int64
-       lastReadOffset map[*torrent]int64
-       priorities     map[*torrent]map[request]struct{}
-       requestHeat    map[*torrent]map[request]int
-       rand           *rand.Rand // Avoid global lock
-       dummyConn      *connection
-}
-
-func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
-       fmt.Fprintf(w, "Priorities:\n")
-       for t, pp := range me.priorities {
-               fmt.Fprintf(w, "\t%s:", t.Name())
-               for r := range pp {
-                       fmt.Fprintf(w, " %v", r)
-               }
-               fmt.Fprintln(w)
-       }
-}
-
-func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
-       me.priorities[t] = make(map[request]struct{})
-       me.requestHeat[t] = make(map[request]int)
-       me.dummyConn = &connection{}
-}
-
-func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
-       delete(me.lastReadOffset, t)
-       delete(me.priorities, t)
-}
-func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
-       rh := me.requestHeat[t]
-       if rh[r] <= 0 {
-               panic("request heat invariant broken")
-       }
-       rh[r]--
-}
-
-type requestFiller struct {
-       c *connection
-       t *torrent
-       s *responsiveDownloadStrategy
-}
-
-// Wrapper around connection.request that tracks request heat.
-func (me *requestFiller) request(req request) bool {
-       if me.c.RequestPending(req) {
-               return true
-       }
-       if !me.t.wantChunk(req) {
-               return true
-       }
-       again := me.c.Request(req)
-       if me.c.RequestPending(req) {
-               me.s.requestHeat[me.t][req]++
-       }
-       return again
-}
-
-// Adds additional constraints around the request heat wrapper.
-func (me *requestFiller) conservativelyRequest(req request) bool {
-       again := me.request(req)
-       if len(me.c.Requests) >= 50 {
-               return false
-       }
-       return again
-}
-
-// Fill priority requests.
-func (me *requestFiller) priorities() bool {
-       for req := range me.s.priorities[me.t] {
-               // TODO: Perhaps this filter should be applied to every request?
-               if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
-                       panic(req)
-               }
-               if !me.request(req) {
-                       return false
-               }
-       }
-       return true
-}
-
-// Fill requests, with all contextual information available in the receiver.
-func (me *requestFiller) Run() {
-       if !me.priorities() {
-               return
-       }
-       if len(me.c.Requests) > 25 {
-               return
-       }
-       if !me.readahead() {
-               return
-       }
-       if len(me.c.Requests) > 0 {
-               return
-       }
-       me.completePartial()
-}
-
-// Request partial pieces that aren't in the readahead zone.
-func (me *requestFiller) completePartial() bool {
-       t := me.t
-       th := me.s.requestHeat[t]
-       lro, lroOk := me.s.lastReadOffset[t]
-       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-               p := e.Value.(int)
-               // Stop when we reach pieces that aren't partial and aren't smaller
-               // than usual.
-               if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
-                       break
-               }
-               // Skip pieces that are entirely inside the readahead zone.
-               if lroOk {
-                       pieceOff := int64(p) * int64(t.UsualPieceSize())
-                       pieceEndOff := pieceOff + int64(t.PieceLength(pp.Integer(p)))
-                       if pieceOff >= lro && pieceEndOff < lro+me.s.Readahead {
-                               continue
-                       }
-               }
-               for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
-                       r := request{pp.Integer(p), chunkSpec}
-                       if th[r] >= 1 {
-                               continue
-                       }
-                       if lroOk {
-                               off := me.t.requestOffset(r)
-                               if off >= lro && off < lro+me.s.Readahead {
-                                       continue
-                               }
-                       }
-                       if !me.conservativelyRequest(r) {
-                               return false
-                       }
-               }
-       }
-       return true
-}
-
-// Returns all wanted chunk specs in the readahead zone.
-func (me *requestFiller) pendingReadaheadChunks() (ret []request) {
-       t := me.t
-       lastReadOffset, ok := me.s.lastReadOffset[t]
-       if !ok {
-               return
-       }
-       ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize)
-       for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ {
-               if t.havePiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) {
-                       continue
-               }
-               for cs := range t.Pieces[pi].PendingChunkSpecs {
-                       r := request{pp.Integer(pi), cs}
-                       if _, ok := me.c.Requests[r]; ok {
-                               continue
-                       }
-                       if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead {
-                               continue
-                       }
-                       ret = append(ret, r)
-               }
-       }
-       return
-}
-
-// Min-heap of int.
-type intHeap []int
-
-func (h intHeap) Len() int            { return len(h) }
-func (h intHeap) Less(i, j int) bool  { return h[i] < h[j] }
-func (h intHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
-func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
-func (h *intHeap) Pop() interface{} {
-       old := *h
-       n := len(old)
-       x := old[n-1]
-       *h = old[0 : n-1]
-       return x
-}
-
-func (me *requestFiller) readahead() bool {
-       rr := me.pendingReadaheadChunks()
-       if len(rr) == 0 {
-               return true
-       }
-       // Produce a partially sorted random permutation into the readahead chunks
-       // to somewhat preserve order but reducing wasted chunks due to overlap
-       // with other peers.
-       ii := new(intHeap)
-       *ii = me.s.rand.Perm(len(rr))
-       heap.Init(ii)
-       for _, i := range *ii {
-               if !me.conservativelyRequest(rr[i]) {
-                       return false
-               }
-       }
-       return true
-}
-
-func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
-       rf := requestFiller{c: c, t: t, s: me}
-       rf.Run()
-       return
-}
-
-func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
-       delete(me.priorities[t], req)
-}
-
-func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
-       for _, cs := range t.pieceChunks(piece) {
-               delete(me.priorities[t], request{pp.Integer(piece), cs})
-       }
-}
-
-func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
-       s.lastReadOffset[t] = off
-       for _len > 0 {
-               req, ok := t.offsetRequest(off)
-               if !ok {
-                       panic("bad offset")
-               }
-               reqOff := t.requestOffset(req)
-               // Gain the alignment adjustment.
-               _len += off - reqOff
-               // Lose the length of this block.
-               _len -= int64(req.Length)
-               off = reqOff + int64(req.Length)
-               if !t.haveChunk(req) {
-                       s.priorities[t][req] = struct{}{}
-               }
-       }
-}
-
-func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
-       if s.requestHeat[t][r] != 0 {
-               panic("outstanding requests invariant broken")
-       }
-}
-
-func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool {
-       if len(me.priorities[t]) != 0 {
-               return true
-       }
-       for index := range t.Pieces {
-               if t.wantPiece(index) {
-                       return true
-               }
-       }
-       return false
-}
index 52591aedd1ea396a860d5c7972cae7cadd8cfde3..a3297455627b3ed54923c48a13843357cd691948 100644 (file)
@@ -182,11 +182,10 @@ func TestDownloadOnDemand(t *testing.T) {
                t.Fatal(err)
        }
        leecher, err := torrent.NewClient(&torrent.Config{
-               DataDir:          filepath.Join(layout.BaseDir, "download"),
-               DownloadStrategy: torrent.NewResponsiveDownloadStrategy(0),
-               DisableTrackers:  true,
-               NoDHT:            true,
-               ListenAddr:       ":0",
+               DataDir:         filepath.Join(layout.BaseDir, "download"),
+               DisableTrackers: true,
+               NoDHT:           true,
+               ListenAddr:      ":0",
 
                NoDefaultBlocklist: true,
 
index 469d2b15f11ed9fd3e5d165a90573a45a8112d82..f6ced0e78ce67e392821e14189add9d20278ab2c 100644 (file)
@@ -58,10 +58,9 @@ type torrent struct {
        // announcing, and communicating with peers.
        ceasingNetworking chan struct{}
 
-       InfoHash                    InfoHash
-       Pieces                      []*torrentPiece
-       IncompletePiecesByBytesLeft *OrderedList
-       length                      int64
+       InfoHash InfoHash
+       Pieces   []*torrentPiece
+       length   int64
        // Prevent mutations to Data memory maps while in use as they're not safe.
        dataLock sync.RWMutex
        Data     *mmap_span.MMapSpan
@@ -124,34 +123,6 @@ func (t *torrent) CeaseNetworking() {
        }
 }
 
-func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() {
-       allIndexes := make(map[int]struct{}, t.NumPieces())
-       for i := 0; i < t.NumPieces(); i++ {
-               allIndexes[i] = struct{}{}
-       }
-       var lastBytesLeft int
-       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-               i := e.Value.(int)
-               if _, ok := allIndexes[i]; !ok {
-                       panic("duplicate entry")
-               }
-               delete(allIndexes, i)
-               if t.Pieces[i].Complete() {
-                       panic("complete piece")
-               }
-               bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i)))
-               if bytesLeft < lastBytesLeft {
-                       panic("ordering broken")
-               }
-               lastBytesLeft = bytesLeft
-       }
-       for i := range allIndexes {
-               if !t.Pieces[i].Complete() {
-                       panic("leaked incomplete piece")
-               }
-       }
-}
-
 func (t *torrent) AddPeers(pp []Peer) {
        for _, p := range pp {
                t.Peers[peersKey{string(p.IP), p.Port}] = p
@@ -214,25 +185,12 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
                return
        }
        t.length = t.Data.Size()
-       t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool {
-               apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
-               bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
-               if apb < bpb {
-                       return true
-               }
-               if apb > bpb {
-                       return false
-               }
-               return a.(int) < b.(int)
-       })
-       for index, hash := range infoPieceHashes(&md) {
+       for _, hash := range infoPieceHashes(&md) {
                piece := &torrentPiece{}
                piece.Event.L = eventLocker
                util.CopyExact(piece.Hash[:], hash)
                t.Pieces = append(t.Pieces, piece)
-               piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
        }
-       t.assertIncompletePiecesByBytesLeftOrdering()
        for _, conn := range t.Conns {
                t.initRequestOrdering(conn)
                if err := conn.setNumPieces(t.NumPieces()); err != nil {
@@ -578,19 +536,9 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
        for _, cs := range t.pieceChunks(int(index)) {
                pcss[cs] = struct{}{}
        }
-       t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
        return
 }
 
-func (t *torrent) PieceBytesLeftChanged(index int) {
-       p := t.Pieces[index]
-       if p.Complete() {
-               t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement)
-       } else {
-               t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement)
-       }
-}
-
 type Peer struct {
        Id     [20]byte
        IP     net.IP