From: Matt Joiner Date: Mon, 1 Jun 2015 08:22:12 +0000 (+1000) Subject: Change the way piece state is exposed to give more detail X-Git-Tag: v1.0.0~1173 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=21fb4933fc95f5656be0d626ab001a8c6439d9bd;p=btrtrc.git Change the way piece state is exposed to give more detail --- diff --git a/client.go b/client.go index ee15ee0f..1b8dbe0d 100644 --- a/client.go +++ b/client.go @@ -279,18 +279,18 @@ func readaheadPieces(readahead, pieceLength int64) int { func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) { index := int(off / int64(t.usualPieceSize())) - cl.raisePiecePriority(t, index, piecePriorityNow) + cl.raisePiecePriority(t, index, PiecePriorityNow) index++ if index >= t.numPieces() { return } - cl.raisePiecePriority(t, index, piecePriorityNext) + cl.raisePiecePriority(t, index, PiecePriorityNext) for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) { index++ if index >= t.numPieces() { break } - cl.raisePiecePriority(t, index, piecePriorityReadahead) + cl.raisePiecePriority(t, index, PiecePriorityReadahead) } } @@ -1981,7 +1981,7 @@ func (t Torrent) SetRegionPriority(off, len int64) { defer t.cl.mu.Unlock() pieceSize := int64(t.usualPieceSize()) for i := off / pieceSize; i*pieceSize < off+len; i++ { - t.cl.prioritizePiece(t.torrent, int(i), piecePriorityNormal) + t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal) } } @@ -1999,12 +1999,12 @@ func (t Torrent) DownloadAll() { t.cl.mu.Lock() defer t.cl.mu.Unlock() for i := range iter.N(t.numPieces()) { - t.cl.raisePiecePriority(t.torrent, i, piecePriorityNormal) + t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal) } // Nice to have the first and last pieces sooner for various interactive // purposes. - t.cl.raisePiecePriority(t.torrent, 0, piecePriorityReadahead) - t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) + t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead) + t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, PiecePriorityReadahead) } // Returns nil metainfo if it isn't in the cache. Checks that the retrieved @@ -2543,7 +2543,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) p := t.Pieces[piece] if correct { - p.Priority = piecePriorityNone + p.Priority = PiecePriorityNone p.PendingChunkSpecs = nil for req := range t.urgent { if int(req.Index) == piece { diff --git a/connection.go b/connection.go index 1bf69b15..51034e93 100644 --- a/connection.go +++ b/connection.go @@ -102,7 +102,7 @@ func (cn *connection) localAddr() net.Addr { // Adjust piece position in the request order for this connection based on the // given piece priority. func (cn *connection) pendPiece(piece int, priority piecePriority) { - if priority == piecePriorityNone { + if priority == PiecePriorityNone { cn.pieceRequestOrder.DeletePiece(piece) return } @@ -117,13 +117,13 @@ func (cn *connection) pendPiece(piece int, priority piecePriority) { // [ Normal ] key := func() int { switch priority { - case piecePriorityNow: + case PiecePriorityNow: return -3*len(cn.piecePriorities) + 3*pp - case piecePriorityNext: + case PiecePriorityNext: return -2*len(cn.piecePriorities) + 2*pp - case piecePriorityReadahead: + case PiecePriorityReadahead: return -len(cn.piecePriorities) + pp - case piecePriorityNormal: + case PiecePriorityNormal: return pp default: panic(priority) diff --git a/connection_test.go b/connection_test.go index d93b7d05..1ffb9aef 100644 --- a/connection_test.go +++ b/connection_test.go @@ -77,22 +77,22 @@ func (suite) TestPieceRequestOrder(t *C) { piecePriorities: []int{1, 4, 0, 3, 2}, } testRequestOrder(nil, c.pieceRequestOrder, t) - c.pendPiece(2, piecePriorityNone) + c.pendPiece(2, PiecePriorityNone) testRequestOrder(nil, c.pieceRequestOrder, t) - c.pendPiece(1, piecePriorityNormal) - c.pendPiece(2, piecePriorityNormal) + c.pendPiece(1, PiecePriorityNormal) + c.pendPiece(2, PiecePriorityNormal) testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t) - c.pendPiece(0, piecePriorityNormal) + c.pendPiece(0, PiecePriorityNormal) testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t) - c.pendPiece(1, piecePriorityReadahead) + c.pendPiece(1, PiecePriorityReadahead) testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t) - c.pendPiece(4, piecePriorityNow) + c.pendPiece(4, PiecePriorityNow) // now(4), r(1), normal(0, 2) testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t) - c.pendPiece(2, piecePriorityReadahead) + c.pendPiece(2, PiecePriorityReadahead) // N(4), R(1, 2), N(0) testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t) - c.pendPiece(1, piecePriorityNow) + c.pendPiece(1, PiecePriorityNow) // now(4, 1), readahead(2), normal(0) // in the same order, the keys will be: -15+6, -15+12, -5, 1 // so we test that a very low priority (for this connection), "now" @@ -100,7 +100,7 @@ func (suite) TestPieceRequestOrder(t *C) { testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t) // Note this intentially sets to None a piece that's not in the order. for i := range iter.N(5) { - c.pendPiece(i, piecePriorityNone) + c.pendPiece(i, PiecePriorityNone) } testRequestOrder(nil, c.pieceRequestOrder, t) } diff --git a/file.go b/file.go index d663a2f6..2efbaeaa 100644 --- a/file.go +++ b/file.go @@ -29,11 +29,12 @@ func (f *File) Length() int64 { } type FilePieceState struct { - Length int64 - State byte + Bytes int64 // Bytes within the piece that are part of this File. + PieceState } -func (f *File) Progress() (ret []FilePieceState) { +// Returns the state of pieces in this file. +func (f *File) State() (ret []FilePieceState) { pieceSize := int64(f.t.usualPieceSize()) off := f.offset % pieceSize remaining := f.length @@ -45,7 +46,7 @@ func (f *File) Progress() (ret []FilePieceState) { if len1 > remaining { len1 = remaining } - ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)}) + ret = append(ret, FilePieceState{len1, f.t.pieceState(i)}) off = 0 remaining -= len1 } diff --git a/piece.go b/piece.go index c45e7a42..bae74094 100644 --- a/piece.go +++ b/piece.go @@ -12,11 +12,11 @@ import ( type piecePriority byte const ( - piecePriorityNone piecePriority = iota - piecePriorityNormal - piecePriorityReadahead - piecePriorityNext - piecePriorityNow + PiecePriorityNone piecePriority = iota // Not wanted. + PiecePriorityNormal // Wanted. + PiecePriorityReadahead // May be required soon. + PiecePriorityNext // Succeeds a piece where a read occurred. + PiecePriorityNow // A read occurred in this piece. ) type piece struct { diff --git a/piecestate.go b/piecestate.go new file mode 100644 index 00000000..358ffcf2 --- /dev/null +++ b/piecestate.go @@ -0,0 +1,18 @@ +package torrent + +// The current state of a piece. +type PieceState struct { + Priority piecePriority + // The piece is available in its entirety. + Complete bool + // The piece is being hashed, or is queued for hash. + Checking bool + // Some of the piece has been obtained. + Partial bool +} + +// Represents a series of consecutive pieces with the same state. +type PieceStateRun struct { + PieceState + Length int // How many consecutive pieces have this state. +} diff --git a/t.go b/t.go index 70e6f8dd..2f0ddbee 100644 --- a/t.go +++ b/t.go @@ -30,3 +30,12 @@ func (t *Torrent) NewReader() (ret *Reader) { } return } + +// Returns the state of pieces of the torrent. They are grouped into runs of +// same state. The sum of the state run lengths is the number of pieces +// in the torrent. +func (t *Torrent) PieceStateRuns() []PieceStateRun { + t.stateMu.Lock() + defer t.stateMu.Unlock() + return t.torrent.pieceStateRuns() +} diff --git a/torrent.go b/torrent.go index a764a662..2fa69be3 100644 --- a/torrent.go +++ b/torrent.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/anacrolix/missinggo" "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" @@ -289,38 +290,19 @@ func (t *torrent) Name() string { return "" } -func (t *torrent) pieceStatusChar(index int) byte { +func (t *torrent) pieceState(index int) (ret PieceState) { p := t.Pieces[index] - switch { - case t.pieceComplete(index): - return 'C' - case p.QueuedForHash: - return 'Q' - case p.Hashing: - return 'H' - case !p.EverHashed: - return '?' - case t.piecePartiallyDownloaded(index): - switch p.Priority { - case piecePriorityNone: - return 'F' // Forgotten - default: - return 'P' - } - default: - switch p.Priority { - case piecePriorityNone: - return 'z' - case piecePriorityNow: - return '!' - case piecePriorityReadahead: - return 'R' - case piecePriorityNext: - return 'N' - default: - return '.' - } + ret.Priority = p.Priority + if t.pieceComplete(index) { + ret.Complete = true } + if p.QueuedForHash || p.Hashing { + ret.Checking = true + } + if t.piecePartiallyDownloaded(index) { + ret.Partial = true + } + return } func (t *torrent) metadataPieceSize(piece int) int { @@ -346,45 +328,45 @@ func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece } } -type PieceStatusCharSequence struct { - Char byte // The state of this sequence of pieces. - Count int // How many consecutive pieces have this state. -} - -// Returns the state of pieces of the torrent. They are grouped into runs of -// same state. The sum of the Counts of the sequences is the number of pieces -// in the torrent. See the function torrent.pieceStatusChar for the possible -// states. -func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence { - t.stateMu.Lock() - defer t.stateMu.Unlock() - return t.pieceStatusCharSequences() +func (t *torrent) pieceStateRuns() (ret []PieceStateRun) { + rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { + ret = append(ret, PieceStateRun{ + PieceState: el.(PieceState), + Length: int(count), + }) + }) + for index := range t.Pieces { + rle.Append(t.pieceState(index), 1) + } + rle.Flush() + return } -// Returns the length of sequences of identical piece status chars. -func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) { - var ( - char byte - count int - ) - writeSequence := func() { - ret = append(ret, PieceStatusCharSequence{char, count}) - } - if len(t.Pieces) != 0 { - char = t.pieceStatusChar(0) - } - for index := range t.Pieces { - char1 := t.pieceStatusChar(index) - if char1 == char { - count++ - } else { - writeSequence() - char = char1 - count = 1 +// Produces a small string representing a PieceStateRun. +func pieceStateRunStatusChars(psr PieceStateRun) (ret string) { + ret = fmt.Sprintf("%d", psr.Length) + ret += func() string { + switch psr.Priority { + case PiecePriorityNext: + return "N" + case PiecePriorityNormal: + return "." + case PiecePriorityReadahead: + return "R" + case PiecePriorityNow: + return "!" + default: + return "" } + }() + if psr.Checking { + ret += "H" + } + if psr.Partial { + ret += "P" } - if count != 0 { - writeSequence() + if psr.Complete { + ret += "C" } return } @@ -411,9 +393,10 @@ func (t *torrent) writeStatus(w io.Writer) { } }()) if t.haveInfo() { - fmt.Fprint(w, "Pieces: ") - for _, seq := range t.pieceStatusCharSequences() { - fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char) + fmt.Fprint(w, "Pieces:") + for _, psr := range t.pieceStateRuns() { + w.Write([]byte(" ")) + w.Write([]byte(pieceStateRunStatusChars(psr))) } fmt.Fprintln(w) } @@ -494,7 +477,8 @@ func (t *torrent) bytesLeft() (left int64) { } func (t *torrent) piecePartiallyDownloaded(index int) bool { - return t.pieceNumPendingBytes(index) != t.pieceLength(index) + pendingBytes := t.pieceNumPendingBytes(index) + return pendingBytes != 0 && pendingBytes != t.pieceLength(index) } func numChunksForPiece(chunkSize int, pieceSize int) int { @@ -713,7 +697,7 @@ func (t *torrent) wantPiece(index int) bool { if p.Hashing { return false } - if p.Priority == piecePriorityNone { + if p.Priority == PiecePriorityNone { if !t.urgentChunkInPiece(index) { return false }