]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Track dirty chunks, instead of pending chunk specs
authorMatt Joiner <anacrolix@gmail.com>
Wed, 13 Jan 2016 06:11:59 +0000 (17:11 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 13 Jan 2016 06:11:59 +0000 (17:11 +1100)
This is the first step toward having purely Reader-based priorities. If a chunk is pending, that currently implies that we want to download it. I want to move that kind of state out to the readers.

client.go
client_test.go
piece.go
torrent.go

index 322c7e704b138531fab1188465086016247b4254..0c383cafd4de9e1195828b76dd01937de4f454b0 100644 (file)
--- a/client.go
+++ b/client.go
@@ -2509,7 +2509,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
                        continue
                }
                piece := &t.Pieces[pieceIndex]
-               for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) {
+               for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
                        r := request{pp.Integer(pieceIndex), cs}
                        if !addRequest(r) {
                                return
@@ -2546,7 +2546,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                unexpectedChunksReceived.Add(1)
        }
 
-       piece := &t.Pieces[req.Index]
+       index := int(req.Index)
+       piece := &t.Pieces[index]
 
        // Do we actually want this chunk?
        if !t.wantChunk(req) {
@@ -2584,7 +2585,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                if c.peerTouchedPieces == nil {
                        c.peerTouchedPieces = make(map[int]struct{})
                }
-               c.peerTouchedPieces[int(req.Index)] = struct{}{}
+               c.peerTouchedPieces[index] = struct{}{}
                me.mu.Unlock()
        }()
 
@@ -2596,7 +2597,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        delete(t.urgent, req)
        // It's important that the piece is potentially queued before we check if
        // the piece is still wanted, because if it is queued, it won't be wanted.
-       if piece.numPendingChunks() == 0 {
+       if t.pieceAllDirty(index) {
                me.queuePieceCheck(t, int(req.Index))
        }
        if !t.wantPiece(int(req.Index)) {
@@ -2663,15 +2664,14 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
        defer me.event.Broadcast()
        if correct {
                p.Priority = PiecePriorityNone
-               p.PendingChunkSpecs = nil
                for req := range t.urgent {
                        if int(req.Index) == piece {
                                delete(t.urgent, req)
                        }
                }
        } else {
-               if p.numPendingChunks() == 0 {
-                       t.pendAllChunkSpecs(int(piece))
+               if t.pieceAllDirty(piece) {
+                       t.pendAllChunkSpecs(piece)
                }
                if t.wantPiece(piece) {
                        me.openNewConns(t)
index 80d1535c18e69becb7129568f99cbc74565f4abe..f073f5eebdba2536f38f8e83655129727efeb34d 100644 (file)
@@ -106,9 +106,8 @@ func TestTorrentInitialState(t *testing.T) {
        if len(tor.Pieces) != 3 {
                t.Fatal("wrong number of pieces")
        }
-       p := &tor.Pieces[0]
        tor.pendAllChunkSpecs(0)
-       assert.EqualValues(t, 3, p.numPendingChunks())
+       assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
        assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
 }
 
index 35c8a7b2ded8619f180889ba638006312bad900d..c25bd2e41362cf6de9bd2c8a6e371a66d85e0b0d 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -4,6 +4,8 @@ import (
        "math/rand"
        "sync"
 
+       "github.com/bradfitz/iter"
+
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
@@ -22,14 +24,14 @@ const (
 type piece struct {
        // The completed piece SHA1 hash, from the metainfo "pieces" field.
        Hash pieceSum
-       // Chunks we don't have. The offset and length can be determined by the
-       // request chunkSize in use.
-       PendingChunkSpecs []bool
-       Hashing           bool
-       QueuedForHash     bool
-       EverHashed        bool
-       Priority          piecePriority
-       PublicPieceState  PieceState
+       // Chunks we've written to since the last check. The chunk offset and
+       // length can be determined by the request chunkSize in use.
+       DirtyChunks      []bool
+       Hashing          bool
+       QueuedForHash    bool
+       EverHashed       bool
+       Priority         piecePriority
+       PublicPieceState PieceState
 
        pendingWritesMutex sync.Mutex
        pendingWrites      int
@@ -37,15 +39,16 @@ type piece struct {
 }
 
 func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
-       if p.PendingChunkSpecs == nil {
-               return false
+       ci := chunkIndex(cs, chunkSize)
+       if ci >= len(p.DirtyChunks) {
+               return true
        }
-       return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)]
+       return !p.DirtyChunks[ci]
 }
 
-func (p *piece) numPendingChunks() (ret int) {
-       for _, pending := range p.PendingChunkSpecs {
-               if pending {
+func (p *piece) numDirtyChunks() (ret int) {
+       for _, dirty := range p.DirtyChunks {
+               if dirty {
                        ret++
                }
        }
@@ -53,10 +56,10 @@ func (p *piece) numPendingChunks() (ret int) {
 }
 
 func (p *piece) unpendChunkIndex(i int) {
-       if p.PendingChunkSpecs == nil {
-               return
+       for i >= len(p.DirtyChunks) {
+               p.DirtyChunks = append(p.DirtyChunks, false)
        }
-       p.PendingChunkSpecs[i] = false
+       p.DirtyChunks[i] = true
 }
 
 func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
@@ -67,14 +70,18 @@ func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
        return ret
 }
 
-func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) {
-       if p.numPendingChunks() == 0 {
+func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpec) {
+       // defer func() {
+       //      log.Println(piece, css)
+       // }()
+       numPending := t.pieceNumPendingChunks(piece)
+       if numPending == 0 {
                return
        }
-       css = make([]chunkSpec, 0, p.numPendingChunks())
-       for i, pending := range p.PendingChunkSpecs {
-               if pending {
-                       css = append(css, chunkIndexSpec(i, pieceLength, chunkSize))
+       css = make([]chunkSpec, 0, numPending)
+       for ci := range iter.N(t.pieceNumChunks(piece)) {
+               if ci >= len(p.DirtyChunks) || !p.DirtyChunks[ci] {
+                       css = append(css, t.chunkIndexSpec(ci, piece))
                }
        }
        if len(css) <= 1 {
index b2478656cc72d5fe7f4cf4d19dc662dc5956bf6b..928f5ed95849040d9bc44f94b2375460cdcf2078 100644 (file)
@@ -22,18 +22,22 @@ import (
        "github.com/anacrolix/torrent/tracker"
 )
 
+func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
+       return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
+}
+
 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
        if t.pieceComplete(index) {
-               return 0
+               return
        }
        piece := &t.Pieces[index]
-       pieceLength := t.pieceLength(index)
+       count = t.pieceLength(index)
        if !piece.EverHashed {
-               return pieceLength
+               return
        }
-       for i, pending := range piece.PendingChunkSpecs {
-               if pending {
-                       count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length
+       for i, dirty := range piece.DirtyChunks {
+               if dirty {
+                       count -= t.chunkIndexSpec(i, index).Length
                }
        }
        return
@@ -588,9 +592,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
 
 func (t *torrent) bitfield() (bf []bool) {
        for i := range t.Pieces {
-               p := &t.Pieces[i]
-               // TODO: Check this logic.
-               bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
+               bf = append(bf, t.havePiece(i))
        }
        return
 }
@@ -626,18 +628,12 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
        return
 }
 
+func (t *torrent) pieceNumChunks(piece int) int {
+       return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
+}
+
 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
-       piece := &t.Pieces[pieceIndex]
-       if piece.PendingChunkSpecs == nil {
-               // Allocate to exact size.
-               piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize)
-       }
-       // Pend all the chunks.
-       pcss := piece.PendingChunkSpecs
-       for i := range pcss {
-               pcss[i] = true
-       }
-       return
+       t.Pieces[pieceIndex].DirtyChunks = nil
 }
 
 type Peer struct {
@@ -650,6 +646,9 @@ type Peer struct {
 }
 
 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
+       if piece < 0 || piece > t.Info.NumPieces() {
+               return
+       }
        if int(piece) == t.numPieces()-1 {
                len_ = pp.Integer(t.Length() % t.Info.PieceLength)
        }
@@ -707,7 +706,10 @@ func (t *torrent) havePiece(index int) bool {
        return t.haveInfo() && t.pieceComplete(index)
 }
 
-func (t *torrent) haveChunk(r request) bool {
+func (t *torrent) haveChunk(r request) (ret bool) {
+       // defer func() {
+       //      log.Println("have chunk", r, ret)
+       // }()
        if !t.haveInfo() {
                return false
        }
@@ -715,9 +717,6 @@ func (t *torrent) haveChunk(r request) bool {
                return true
        }
        p := &t.Pieces[r.Index]
-       if p.PendingChunkSpecs == nil {
-               return false
-       }
        return !p.pendingChunk(r.chunkSpec, t.chunkSize)
 }
 
@@ -806,3 +805,20 @@ func (t *torrent) publishPieceChange(piece int) {
        }
        p.PublicPieceState = cur
 }
+
+func (t *torrent) pieceNumPendingChunks(piece int) int {
+       return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
+}
+
+func (t *torrent) pieceAllDirty(piece int) bool {
+       p := &t.Pieces[piece]
+       if len(p.DirtyChunks) != t.pieceNumChunks(piece) {
+               return false
+       }
+       for _, dirty := range p.DirtyChunks {
+               if !dirty {
+                       return false
+               }
+       }
+       return true
+}