From f5bd377941a89e2b0bc38b30a2181f354aac256e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 12 Jul 2018 09:15:15 +1000 Subject: [PATCH] Change pieceIndex to peer_protocol.Integer --- client.go | 3 +- connection.go | 107 ++++++++++++++++----------------- file.go | 4 +- metainfo/info.go | 2 +- metainfo/piece.go | 12 ++-- metainfo/piece_key.go | 2 +- misc.go | 4 +- piece.go | 26 ++++----- reader.go | 9 +-- t.go | 14 ++--- torrent.go | 133 +++++++++++++++++++++--------------------- torrent_test.go | 2 +- 12 files changed, 162 insertions(+), 156 deletions(-) diff --git a/client.go b/client.go index e3182fad..ecd73b7b 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,7 @@ import ( "github.com/anacrolix/dht/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pproffd" "github.com/anacrolix/missinggo/pubsub" @@ -814,7 +815,7 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) { if conn.fastEnabled() { if torrent.haveAllPieces() { conn.Post(pp.Message{Type: pp.HaveAll}) - conn.sentHaves.AddRange(0, conn.t.NumPieces()) + conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces())) return } else if !torrent.haveAnyPieces() { conn.Post(pp.Message{Type: pp.HaveNone}) diff --git a/connection.go b/connection.go index c1581499..c791fb15 100644 --- a/connection.go +++ b/connection.go @@ -98,9 +98,9 @@ type connection struct { // The highest possible number of pieces the torrent could have based on // communication with the peer. Generally only useful until we have the // torrent info. - peerMinPieces int + peerMinPieces pieceIndex // Pieces we've accepted chunks for from the peer. - peerTouchedPieces map[int]struct{} + peerTouchedPieces map[pieceIndex]struct{} peerAllowedFast bitmap.Bitmap PeerMaxRequests int // Maximum pending requests the peer allows. @@ -173,7 +173,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) { if !cn.t.haveInfo() { return false, false } - return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true + return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true } func (cn *connection) mu() sync.Locker { @@ -194,7 +194,7 @@ func (cn *connection) supportsExtension(ext pp.ExtensionName) bool { } // The best guess at number of pieces in the torrent for this peer. -func (cn *connection) bestPeerNumPieces() int { +func (cn *connection) bestPeerNumPieces() pieceIndex { if cn.t.haveInfo() { return cn.t.numPieces() } @@ -202,7 +202,7 @@ func (cn *connection) bestPeerNumPieces() int { } func (cn *connection) completedString() string { - have := cn.peerPieces.Len() + have := pieceIndex(cn.peerPieces.Len()) if cn.peerSentHaveAll { have = cn.bestPeerNumPieces() } @@ -212,8 +212,8 @@ func (cn *connection) completedString() string { // Correct the PeerPieces slice length. Return false if the existing slice is // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. -func (cn *connection) setNumPieces(num int) error { - cn.peerPieces.RemoveRange(num, bitmap.ToEnd) +func (cn *connection) setNumPieces(num pieceIndex) error { + cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd) cn.peerPiecesChanged() return nil } @@ -325,8 +325,8 @@ func (cn *connection) Close() { } } -func (cn *connection) PeerHasPiece(piece int) bool { - return cn.peerSentHaveAll || cn.peerPieces.Contains(piece) +func (cn *connection) PeerHasPiece(piece pieceIndex) bool { + return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece)) } // Writes a message into the write buffer. @@ -486,7 +486,7 @@ func (cn *connection) request(r request, mw messageWriter) bool { if _, ok := cn.requests[r]; ok { panic("chunk already requested") } - if !cn.PeerHasPiece(r.Index.Int()) { + if !cn.PeerHasPiece(r.Index) { panic("requesting piece peer doesn't have") } if _, ok := cn.t.conns[cn]; !ok { @@ -554,7 +554,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { } if len(cn.requests) <= cn.requestsLowWater { filledBuffer := false - cn.iterPendingPieces(func(pieceIndex int) bool { + cn.iterPendingPieces(func(pieceIndex pieceIndex) bool { cn.iterPendingRequests(pieceIndex, func(r request) bool { if !cn.SetInterested(true, msg) { filledBuffer = true @@ -651,15 +651,15 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { } } -func (cn *connection) Have(piece int) { - if cn.sentHaves.Get(piece) { +func (cn *connection) Have(piece pieceIndex) { + if cn.sentHaves.Get(bitmap.BitIndex(piece)) { return } cn.Post(pp.Message{ Type: pp.Have, Index: pp.Integer(piece), }) - cn.sentHaves.Add(piece) + cn.sentHaves.Add(bitmap.BitIndex(piece)) } func (cn *connection) PostBitfield() { @@ -697,12 +697,12 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { } } -func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool { +func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool { now, readahead := cn.t.readerPiecePriorities() var skip bitmap.Bitmap if !cn.peerSentHaveAll { // Pieces to skip include pieces the peer doesn't have. - skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()) + skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())) } // And pieces that we already have. skip.Union(cn.t.completedPieces) @@ -711,11 +711,11 @@ func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool // pieces. return iter.All( func(_piece interface{}) bool { - i := _piece.(pieceIndex) - if cn.t.hashingPiece(i) { + i := _piece.(bitmap.BitIndex) + if cn.t.hashingPiece(pieceIndex(i)) { return true } - return f(i) + return f(pieceIndex(i)) }, iterBitmapsDistinct(&skip, now, readahead), func(cb iter.Callback) { @@ -754,7 +754,7 @@ func (cn *connection) shouldRequestWithoutBias() bool { return false } -func (cn *connection) iterPendingPieces(f func(int) bool) bool { +func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool { if !cn.t.haveInfo() { return false } @@ -764,15 +764,17 @@ func (cn *connection) iterPendingPieces(f func(int) bool) bool { if cn.shouldRequestWithoutBias() { return cn.iterUnbiasedPieceRequestOrder(f) } else { - return cn.pieceRequestOrder.IterTyped(f) + return cn.pieceRequestOrder.IterTyped(func(i int) bool { + return f(pieceIndex(i)) + }) } } func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) { - cn.iterPendingPieces(func(i int) bool { return f(i) }) + cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) } -func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool { +func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool { return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool { r := request{pp.Integer(piece), cs} if cn.t.requestStrategy == 3 { @@ -786,24 +788,24 @@ func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool }) } -func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { +func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool { chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice() // TODO: Use "math/rand".Shuffle >= Go 1.10 return iter.ForPerm(len(chunkIndices), func(i int) bool { - return f(t.chunkIndexSpec(chunkIndices[i], piece)) + return f(t.chunkIndexSpec(pieceIndex(chunkIndices[i]), piece)) }) } // check callers updaterequests -func (cn *connection) stopRequestingPiece(piece int) bool { - return cn.pieceRequestOrder.Remove(piece) +func (cn *connection) stopRequestingPiece(piece pieceIndex) bool { + return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece)) } // This is distinct from Torrent piece priority, which is the user's // preference. Connection piece priority is specific to a connection and is // used to pseudorandomly avoid connections always requesting the same pieces // and thus wasting effort. -func (cn *connection) updatePiecePriority(piece int) bool { +func (cn *connection) updatePiecePriority(piece pieceIndex) bool { tpp := cn.t.piecePriority(piece) if !cn.PeerHasPiece(piece) { tpp = PiecePriorityNone @@ -817,16 +819,16 @@ func (cn *connection) updatePiecePriority(piece int) bool { switch tpp { case PiecePriorityNormal: case PiecePriorityReadahead: - prio -= cn.t.numPieces() + prio -= int(cn.t.numPieces()) case PiecePriorityNext, PiecePriorityNow: - prio -= 2 * cn.t.numPieces() + prio -= 2 * int(cn.t.numPieces()) default: panic(tpp) } - prio += piece / 3 + prio += int(piece / 3) default: } - return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias() + return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() } func (cn *connection) getPieceInclination() []int { @@ -847,7 +849,7 @@ func (cn *connection) discardPieceInclination() { func (cn *connection) peerPiecesChanged() { if cn.t.haveInfo() { prioritiesChanged := false - for i := range iter.N(cn.t.numPieces()) { + for i := pieceIndex(0); i < cn.t.numPieces(); i++ { if cn.updatePiecePriority(i) { prioritiesChanged = true } @@ -858,13 +860,13 @@ func (cn *connection) peerPiecesChanged() { } } -func (cn *connection) raisePeerMinPieces(newMin int) { +func (cn *connection) raisePeerMinPieces(newMin pieceIndex) { if newMin > cn.peerMinPieces { cn.peerMinPieces = newMin } } -func (cn *connection) peerSentHave(piece int) error { +func (cn *connection) peerSentHave(piece pieceIndex) error { if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 { return errors.New("invalid piece") } @@ -872,7 +874,7 @@ func (cn *connection) peerSentHave(piece int) error { return nil } cn.raisePeerMinPieces(piece + 1) - cn.peerPieces.Set(piece, true) + cn.peerPieces.Set(bitmap.BitIndex(piece), true) if cn.updatePiecePriority(piece) { cn.updateRequests() } @@ -886,14 +888,14 @@ func (cn *connection) peerSentBitfield(bf []bool) error { } // We know that the last byte means that at most the last 7 bits are // wasted. - cn.raisePeerMinPieces(len(bf) - 7) - if cn.t.haveInfo() && len(bf) > cn.t.numPieces() { + cn.raisePeerMinPieces(pieceIndex(len(bf) - 7)) + if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) { // Ignore known excess pieces. bf = bf[:cn.t.numPieces()] } for i, have := range bf { if have { - cn.raisePeerMinPieces(i + 1) + cn.raisePeerMinPieces(pieceIndex(i) + 1) } cn.peerPieces.Set(i, have) } @@ -1011,7 +1013,7 @@ func (c *connection) reject(r request) { func (c *connection) onReadRequest(r request) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) - if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) { + if r.Begin+r.Length > c.t.pieceLength(r.Index) { torrent.Add("bad requests received", 1) return errors.New("bad request") } @@ -1035,7 +1037,7 @@ func (c *connection) onReadRequest(r request) error { // BEP 6 says we may close here if we choose. return nil } - if !c.t.havePiece(r.Index.Int()) { + if !c.t.havePiece(r.Index) { // This isn't necessarily them screwing up. We can drop pieces // from our storage, and can't communicate this to peers // except by reconnecting. @@ -1114,7 +1116,7 @@ func (c *connection) mainReadLoop() (err error) { // We'll probably choke them for this, which will clear them if // appropriate, and is clearly specified. case pp.Have: - err = c.peerSentHave(int(msg.Index)) + err = c.peerSentHave(msg.Index) case pp.Request: r := newRequestFromMessage(&msg) err = c.onReadRequest(r) @@ -1288,8 +1290,7 @@ func (c *connection) receiveChunk(msg *pp.Message) error { return nil } - index := int(req.Index) - piece := &t.pieces[index] + piece := &t.pieces[req.Index] c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) @@ -1328,28 +1329,28 @@ func (c *connection) receiveChunk(msg *pp.Message) error { if err != nil { log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err) t.pendRequest(req) - t.updatePieceCompletion(int(msg.Index)) + t.updatePieceCompletion(msg.Index) return nil } // It's important that the piece is potentially queued before we check if // the piece is still wanted, because if it is queued, it won't be wanted. - if t.pieceAllDirty(index) { - t.queuePieceCheck(int(req.Index)) - t.pendAllChunkSpecs(index) + if t.pieceAllDirty(req.Index) { + t.queuePieceCheck(req.Index) + t.pendAllChunkSpecs(req.Index) } - c.onDirtiedPiece(index) + c.onDirtiedPiece(req.Index) cl.event.Broadcast() - t.publishPieceChange(int(req.Index)) + t.publishPieceChange(req.Index) return nil } -func (c *connection) onDirtiedPiece(piece int) { +func (c *connection) onDirtiedPiece(piece pieceIndex) { if c.peerTouchedPieces == nil { - c.peerTouchedPieces = make(map[int]struct{}) + c.peerTouchedPieces = make(map[pieceIndex]struct{}) } c.peerTouchedPieces[piece] = struct{}{} ds := &c.t.pieces[piece].dirtiers @@ -1408,7 +1409,7 @@ another: } more, err := c.sendChunk(r, msg) if err != nil { - i := int(r.Index) + i := r.Index if c.t.pieceComplete(i) { c.t.updatePieceCompletion(i) if !c.t.pieceComplete(i) { diff --git a/file.go b/file.go index 5983b855..5fd1babb 100644 --- a/file.go +++ b/file.go @@ -65,7 +65,7 @@ func (f *File) State() (ret []FilePieceState) { pieceSize := int64(f.t.usualPieceSize()) off := f.offset % pieceSize remaining := f.length - for i := int(f.offset / pieceSize); ; i++ { + for i := pieceIndex(f.offset / pieceSize); ; i++ { if remaining == 0 { break } @@ -121,7 +121,7 @@ func (f *File) SetPriority(prio piecePriority) { return } f.prio = prio - f.t.updatePiecePriorities(f.firstPieceIndex().Int(), f.endPieceIndex().Int()) + f.t.updatePiecePriorities(f.firstPieceIndex(), f.endPieceIndex()) } // Returns the priority per File.SetPriority. diff --git a/metainfo/info.go b/metainfo/info.go index 22aca56f..53223130 100644 --- a/metainfo/info.go +++ b/metainfo/info.go @@ -152,5 +152,5 @@ func (info *Info) UpvertedFiles() []FileInfo { } func (info *Info) Piece(index int) Piece { - return Piece{info, index} + return Piece{info, pieceIndex(index)} } diff --git a/metainfo/piece.go b/metainfo/piece.go index 55cdc43c..8f50fa45 100644 --- a/metainfo/piece.go +++ b/metainfo/piece.go @@ -1,14 +1,18 @@ package metainfo -import "github.com/anacrolix/missinggo" +import ( + "github.com/anacrolix/missinggo" +) type Piece struct { Info *Info - i int + i pieceIndex } +type pieceIndex = int + func (p Piece) Length() int64 { - if p.i == p.Info.NumPieces()-1 { + if int(p.i) == p.Info.NumPieces()-1 { return p.Info.TotalLength() - int64(p.i)*p.Info.PieceLength } return p.Info.PieceLength @@ -23,6 +27,6 @@ func (p Piece) Hash() (ret Hash) { return } -func (p Piece) Index() int { +func (p Piece) Index() pieceIndex { return p.i } diff --git a/metainfo/piece_key.go b/metainfo/piece_key.go index c476da61..6ddf065c 100644 --- a/metainfo/piece_key.go +++ b/metainfo/piece_key.go @@ -3,5 +3,5 @@ package metainfo // Uniquely identifies a piece. type PieceKey struct { InfoHash Hash - Index int + Index pieceIndex } diff --git a/misc.go b/misc.go index 915ed34d..3ca4d77b 100644 --- a/misc.go +++ b/misc.go @@ -98,7 +98,7 @@ func validateInfo(info *metainfo.Info) error { return nil } -func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec { +func chunkIndexSpec(index pieceIndex, pieceLength, chunkSize pp.Integer) chunkSpec { ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} if ret.Begin+ret.Length > pieceLength { ret.Length = pieceLength - ret.Begin @@ -154,6 +154,6 @@ func min(as ...int64) int64 { var unlimited = rate.NewLimiter(rate.Inf, 0) type ( - pieceIndex = int + pieceIndex = pp.Integer InfoHash = metainfo.Hash ) diff --git a/piece.go b/piece.go index 5e0c82a8..9729c44c 100644 --- a/piece.go +++ b/piece.go @@ -42,7 +42,7 @@ type Piece struct { // The completed piece SHA1 hash, from the metainfo "pieces" field. hash metainfo.Hash t *Torrent - index int + index pieceIndex files []*File // Chunks we've written to since the last check. The chunk offset and // length can be determined by the request chunkSize in use. @@ -69,7 +69,7 @@ func (p *Piece) String() string { } func (p *Piece) Info() metainfo.Piece { - return p.t.info.Piece(p.index) + return p.t.info.Piece(int(p.index)) } func (p *Piece) Storage() storage.Piece { @@ -88,8 +88,8 @@ func (p *Piece) hasDirtyChunks() bool { return p.dirtyChunks.Len() != 0 } -func (p *Piece) numDirtyChunks() (ret int) { - return p.dirtyChunks.Len() +func (p *Piece) numDirtyChunks() pp.Integer { + return pp.Integer(p.dirtyChunks.Len()) } func (p *Piece) unpendChunkIndex(i int) { @@ -101,13 +101,13 @@ func (p *Piece) pendChunkIndex(i int) { p.dirtyChunks.Remove(i) } -func (p *Piece) numChunks() int { +func (p *Piece) numChunks() pp.Integer { return p.t.pieceNumChunks(p.index) } func (p *Piece) undirtiedChunkIndices() (ret bitmap.Bitmap) { ret = p.dirtyChunks.Copy() - ret.FlipRange(0, p.numChunks()) + ret.FlipRange(0, bitmap.BitIndex(p.numChunks())) return } @@ -137,11 +137,11 @@ func (p *Piece) waitNoPendingWrites() { p.pendingWritesMutex.Unlock() } -func (p *Piece) chunkIndexDirty(chunk int) bool { - return p.dirtyChunks.Contains(chunk) +func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool { + return p.dirtyChunks.Contains(bitmap.BitIndex(chunk)) } -func (p *Piece) chunkIndexSpec(chunk int) chunkSpec { +func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec { return chunkIndexSpec(chunk, p.length(), p.chunkSize()) } @@ -168,7 +168,7 @@ func (p *Piece) chunkSize() pp.Integer { return p.t.chunkSize } -func (p *Piece) lastChunkIndex() int { +func (p *Piece) lastChunkIndex() pp.Integer { return p.numChunks() - 1 } @@ -196,7 +196,7 @@ func (p *Piece) VerifyData() { } func (p *Piece) queuedForHash() bool { - return p.t.piecesQueuedForHash.Get(p.index) + return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index)) } func (p *Piece) torrentBeginOffset() int64 { @@ -221,13 +221,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) { for _, f := range p.files { ret.Raise(f.prio) } - if p.t.readerNowPieces.Contains(p.index) { + if p.t.readerNowPieces.Contains(int(p.index)) { ret.Raise(PiecePriorityNow) } // if t.readerNowPieces.Contains(piece - 1) { // return PiecePriorityNext // } - if p.t.readerReadaheadPieces.Contains(p.index) { + if p.t.readerReadaheadPieces.Contains(bitmap.BitIndex(p.index)) { ret.Raise(PiecePriorityReadahead) } ret.Raise(p.priority) diff --git a/reader.go b/reader.go index c8eae097..38d4406c 100644 --- a/reader.go +++ b/reader.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent/peer_protocol" ) type Reader interface { @@ -21,7 +22,7 @@ type Reader interface { // Piece range by piece index, [begin, end). type pieceRange struct { - begin, end int + begin, end pieceIndex } // Accesses Torrent data via a Client. Reads block until the data is @@ -84,7 +85,7 @@ func (r *reader) readable(off int64) (ret bool) { if r.responsive { return r.t.haveChunk(req) } - return r.t.pieceComplete(int(req.Index)) + return r.t.pieceComplete(req.Index) } // How many bytes are available to read. Max is the most we could require. @@ -212,8 +213,8 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro return } } - pi := int(r.torrentOffset(pos) / r.t.info.PieceLength) - ip := r.t.info.Piece(pi) + pi := peer_protocol.Integer(r.torrentOffset(pos) / r.t.info.PieceLength) + ip := r.t.info.Piece(int(pi)) po := r.torrentOffset(pos) % r.t.info.PieceLength b1 := missinggo.LimitLen(b, ip.Length()-po, avail) n, err = r.t.readAt(b1, r.torrentOffset(pos)) diff --git a/t.go b/t.go index 516a074a..c9886e5c 100644 --- a/t.go +++ b/t.go @@ -51,7 +51,7 @@ func (t *Torrent) PieceStateRuns() []PieceStateRun { return t.pieceStateRuns() } -func (t *Torrent) PieceState(piece int) PieceState { +func (t *Torrent) PieceState(piece pieceIndex) PieceState { t.cl.mu.Lock() defer t.cl.mu.Unlock() return t.pieceState(piece) @@ -59,7 +59,7 @@ func (t *Torrent) PieceState(piece int) PieceState { // The number of pieces in the torrent. This requires that the info has been // obtained first. -func (t *Torrent) NumPieces() int { +func (t *Torrent) NumPieces() pieceIndex { return t.numPieces() } @@ -152,13 +152,13 @@ func (t *Torrent) deleteReader(r *reader) { // Raise the priorities of pieces in the range [begin, end) to at least Normal // priority. Piece indexes are not the same as bytes. Requires that the info // has been obtained, see Torrent.Info and Torrent.GotInfo. -func (t *Torrent) DownloadPieces(begin, end int) { +func (t *Torrent) DownloadPieces(begin, end pieceIndex) { t.cl.mu.Lock() defer t.cl.mu.Unlock() t.downloadPiecesLocked(begin, end) } -func (t *Torrent) downloadPiecesLocked(begin, end int) { +func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) { for i := begin; i < end; i++ { if t.pieces[i].priority.Raise(PiecePriorityNormal) { t.updatePiecePriority(i) @@ -166,13 +166,13 @@ func (t *Torrent) downloadPiecesLocked(begin, end int) { } } -func (t *Torrent) CancelPieces(begin, end int) { +func (t *Torrent) CancelPieces(begin, end pieceIndex) { t.cl.mu.Lock() defer t.cl.mu.Unlock() t.cancelPiecesLocked(begin, end) } -func (t *Torrent) cancelPiecesLocked(begin, end int) { +func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex) { for i := begin; i < end; i++ { p := &t.pieces[i] if p.priority == PiecePriorityNone { @@ -233,7 +233,7 @@ func (t *Torrent) AddTrackers(announceList [][]string) { t.addTrackers(announceList) } -func (t *Torrent) Piece(i int) *Piece { +func (t *Torrent) Piece(i pieceIndex) *Piece { t.cl.mu.Lock() defer t.cl.mu.Unlock() return &t.pieces[i] diff --git a/torrent.go b/torrent.go index 0360c514..6c63a3c4 100644 --- a/torrent.go +++ b/torrent.go @@ -24,7 +24,6 @@ import ( "github.com/anacrolix/missinggo/prioritybitmap" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" - "github.com/bradfitz/iter" "github.com/davecgh/go-spew/spew" "github.com/anacrolix/torrent/bencode" @@ -34,7 +33,7 @@ import ( "github.com/anacrolix/torrent/tracker" ) -func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec { +func (t *Torrent) chunkIndexSpec(chunkIndex, piece pieceIndex) chunkSpec { return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize) } @@ -220,11 +219,11 @@ func (t *Torrent) setDisplayName(dn string) { t.displayName = dn } -func (t *Torrent) pieceComplete(piece int) bool { - return t.completedPieces.Get(piece) +func (t *Torrent) pieceComplete(piece pieceIndex) bool { + return t.completedPieces.Get(bitmap.BitIndex(piece)) } -func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion { +func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion { return t.pieces[piece].Storage().Completion() } @@ -329,7 +328,7 @@ func (t *Torrent) makePieces() { for i, hash := range hashes { piece := &t.pieces[i] piece.t = t - piece.index = i + piece.index = pieceIndex(i) piece.noPendingWrites.L = &piece.pendingWritesMutex missinggo.CopyExact(piece.hash[:], hash) files := *t.files @@ -396,11 +395,11 @@ func (t *Torrent) onSetInfo() { } } for i := range t.pieces { - t.updatePieceCompletion(i) + t.updatePieceCompletion(pieceIndex(i)) p := &t.pieces[i] if !p.storageCompletionOk { // log.Printf("piece %s completion unknown, queueing check", p) - t.queuePieceCheck(i) + t.queuePieceCheck(pieceIndex(i)) } } t.cl.event.Broadcast() @@ -473,7 +472,7 @@ func (t *Torrent) name() string { return t.displayName } -func (t *Torrent) pieceState(index int) (ret PieceState) { +func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) { p := &t.pieces[index] ret.Priority = t.piecePriority(index) ret.Completion = p.completion() @@ -514,7 +513,7 @@ func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) { }) }) for index := range t.pieces { - rle.Append(t.pieceState(index), 1) + rle.Append(t.pieceState(pieceIndex(index)), 1) } rle.Flush() return @@ -587,7 +586,7 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintln(w) } fmt.Fprintf(w, "Reader Pieces:") - t.forReaderOffsetPieces(func(begin, end int) (again bool) { + t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) { fmt.Fprintf(w, " %d:%d", begin, end) return true }) @@ -651,7 +650,7 @@ func (t *Torrent) bytesMissingLocked() int64 { } func (t *Torrent) bytesLeft() (left int64) { - bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool { + bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool { p := &t.pieces[piece] left += int64(p.length() - p.numDirtyBytes()) return true @@ -668,7 +667,7 @@ func (t *Torrent) bytesLeftAnnounce() uint64 { } } -func (t *Torrent) piecePartiallyDownloaded(piece int) bool { +func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool { if t.pieceComplete(piece) { return false } @@ -682,8 +681,8 @@ func (t *Torrent) usualPieceSize() int { return int(t.info.PieceLength) } -func (t *Torrent) numPieces() int { - return t.info.NumPieces() +func (t *Torrent) numPieces() pieceIndex { + return pieceIndex(t.info.NumPieces()) } func (t *Torrent) numPiecesCompleted() (num int) { @@ -735,15 +734,15 @@ func (t *Torrent) bitfield() (bf []bool) { return } -func (t *Torrent) pieceNumChunks(piece int) int { - return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) +func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer { + return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize } -func (t *Torrent) pendAllChunkSpecs(pieceIndex int) { +func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { t.pieces[pieceIndex].dirtyChunks.Clear() } -func (t *Torrent) pieceLength(piece int) pp.Integer { +func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { if t.info.PieceLength == 0 { // There will be no variance amongst pieces. Only pain. return 0 @@ -757,11 +756,11 @@ func (t *Torrent) pieceLength(piece int) pp.Integer { return pp.Integer(t.info.PieceLength) } -func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) { +func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) { hash := pieceHash.New() p := &t.pieces[piece] p.waitNoPendingWrites() - ip := t.info.Piece(piece) + ip := t.info.Piece(int(piece)) pl := ip.Length() n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)) if n == pl { @@ -782,10 +781,10 @@ func (t *Torrent) haveAllPieces() bool { if !t.haveInfo() { return false } - return t.completedPieces.Len() == t.numPieces() + return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces()) } -func (t *Torrent) havePiece(index int) bool { +func (t *Torrent) havePiece(index pieceIndex) bool { return t.haveInfo() && t.pieceComplete(index) } @@ -796,7 +795,7 @@ func (t *Torrent) haveChunk(r request) (ret bool) { if !t.haveInfo() { return false } - if t.pieceComplete(int(r.Index)) { + if t.pieceComplete(r.Index) { return true } p := &t.pieces[r.Index] @@ -808,7 +807,7 @@ func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int { } func (t *Torrent) wantPiece(r request) bool { - if !t.wantPieceIndex(int(r.Index)) { + if !t.wantPieceIndex(r.Index) { return false } if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) { @@ -819,7 +818,7 @@ func (t *Torrent) wantPiece(r request) bool { return false } -func (t *Torrent) wantPieceIndex(index int) bool { +func (t *Torrent) wantPieceIndex(index pieceIndex) bool { if !t.haveInfo() { return false } @@ -836,11 +835,11 @@ func (t *Torrent) wantPieceIndex(index int) bool { if t.pieceComplete(index) { return false } - if t.pendingPieces.Contains(index) { + if t.pendingPieces.Contains(bitmap.BitIndex(index)) { return true } // log.Printf("piece %d not pending", index) - return !t.forReaderOffsetPieces(func(begin, end int) bool { + return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { return index < begin || index >= end }) } @@ -874,27 +873,27 @@ type PieceStateChange struct { PieceState } -func (t *Torrent) publishPieceChange(piece int) { +func (t *Torrent) publishPieceChange(piece pieceIndex) { cur := t.pieceState(piece) p := &t.pieces[piece] if cur != p.publicPieceState { p.publicPieceState = cur t.pieceStateChanges.Publish(PieceStateChange{ - piece, + int(piece), cur, }) } } -func (t *Torrent) pieceNumPendingChunks(piece int) int { +func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { if t.pieceComplete(piece) { return 0 } return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks() } -func (t *Torrent) pieceAllDirty(piece int) bool { - return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece) +func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { + return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece)) } func (t *Torrent) readersChanged() { @@ -936,7 +935,7 @@ func (t *Torrent) maybeNewConns() { t.openNewConns() } -func (t *Torrent) piecePriorityChanged(piece int) { +func (t *Torrent) piecePriorityChanged(piece pieceIndex) { // log.Printf("piece %d priority changed", piece) for c := range t.conns { if c.updatePiecePriority(piece) { @@ -948,16 +947,16 @@ func (t *Torrent) piecePriorityChanged(piece int) { t.publishPieceChange(piece) } -func (t *Torrent) updatePiecePriority(piece int) { +func (t *Torrent) updatePiecePriority(piece pieceIndex) { p := &t.pieces[piece] newPrio := p.uncachedPriority() // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone { - if !t.pendingPieces.Remove(piece) { + if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) { return } } else { - if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) { + if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) { return } } @@ -965,19 +964,19 @@ func (t *Torrent) updatePiecePriority(piece int) { } func (t *Torrent) updateAllPiecePriorities() { - t.updatePiecePriorities(0, len(t.pieces)) + t.updatePiecePriorities(0, t.numPieces()) } // Update all piece priorities in one hit. This function should have the same // output as updatePiecePriority, but across all pieces. -func (t *Torrent) updatePiecePriorities(begin, end int) { +func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) { for i := begin; i < end; i++ { t.updatePiecePriority(i) } } // Returns the range of pieces [begin, end) that contains the extent of bytes. -func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { +func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) { if off >= *t.length { return } @@ -988,10 +987,10 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { if size <= 0 { return } - begin = int(off / t.info.PieceLength) - end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength) - if end > t.info.NumPieces() { - end = t.info.NumPieces() + begin = pieceIndex(off / t.info.PieceLength) + end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength) + if end > pieceIndex(t.info.NumPieces()) { + end = pieceIndex(t.info.NumPieces()) } return } @@ -999,7 +998,7 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) { // Returns true if all iterations complete without breaking. Returns the read // regions for all readers. The reader regions should not be merged as some // callers depend on this method to enumerate readers. -func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) { +func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) { for r := range t.readers { p := r.pieces if p.begin >= p.end { @@ -1012,8 +1011,8 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all return true } -func (t *Torrent) piecePriority(piece int) piecePriority { - prio, ok := t.pendingPieces.GetPriority(piece) +func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { + prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece)) if !ok { return PiecePriorityNone } @@ -1032,7 +1031,7 @@ func (t *Torrent) pendRequest(req request) { t.pieces[req.Index].pendChunkIndex(ci) } -func (t *Torrent) pieceCompletionChanged(piece int) { +func (t *Torrent) pieceCompletionChanged(piece pieceIndex) { log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger) t.cl.event.Broadcast() if t.pieceComplete(piece) { @@ -1080,7 +1079,7 @@ func (t *Torrent) getConnPieceInclination() []int { _ret := t.connPieceInclinationPool.Get() if _ret == nil { pieceInclinationsNew.Add(1) - return rand.Perm(t.numPieces()) + return rand.Perm(int(t.numPieces())) } pieceInclinationsReused.Add(1) return *_ret.(*[]int) @@ -1091,13 +1090,13 @@ func (t *Torrent) putPieceInclination(pi []int) { pieceInclinationsPut.Add(1) } -func (t *Torrent) updatePieceCompletion(piece int) { +func (t *Torrent) updatePieceCompletion(piece pieceIndex) { pcu := t.pieceCompleteUncached(piece) p := &t.pieces[piece] - changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok + changed := t.completedPieces.Get(bitmap.BitIndex(piece)) != pcu.Complete || p.storageCompletionOk != pcu.Ok log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger) p.storageCompletionOk = pcu.Ok - t.completedPieces.Set(piece, pcu.Complete) + t.completedPieces.Set(bitmap.BitIndex(piece), pcu.Complete) t.tickleReaders() // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete) // log.Printf("piece %d changed: %v", piece, changed) @@ -1114,7 +1113,7 @@ func (t *Torrent) readAt(b []byte, off int64) (n int, err error) { } func (t *Torrent) updateAllPieceCompletions() { - for i := range iter.N(t.numPieces()) { + for i := pieceIndex(0); i < t.numPieces(); i++ { t.updatePieceCompletion(i) } } @@ -1142,18 +1141,18 @@ func (t *Torrent) maybeCompleteMetadata() error { } func (t *Torrent) readerPieces() (ret bitmap.Bitmap) { - t.forReaderOffsetPieces(func(begin, end int) bool { - ret.AddRange(begin, end) + t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { + ret.AddRange(bitmap.BitIndex(begin), bitmap.BitIndex(end)) return true }) return } func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) { - t.forReaderOffsetPieces(func(begin, end int) bool { + t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { if end > begin { - now.Add(begin) - readahead.AddRange(begin+1, end) + now.Add(bitmap.BitIndex(begin)) + readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end)) } return true }) @@ -1578,7 +1577,7 @@ func (t *Torrent) mu() missinggo.RWLocker { return &t.cl.mu } -func (t *Torrent) pieceHashed(piece int, correct bool) { +func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger) if t.closed.IsSet() { return @@ -1636,14 +1635,14 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { t.updatePieceCompletion(piece) } -func (t *Torrent) cancelRequestsForPiece(piece int) { +func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { // TODO: Make faster for cn := range t.conns { cn.tickleWriter() } } -func (t *Torrent) onPieceCompleted(piece int) { +func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.pendAllChunkSpecs(piece) t.cancelRequestsForPiece(piece) for conn := range t.conns { @@ -1652,7 +1651,7 @@ func (t *Torrent) onPieceCompleted(piece int) { } // Called when a piece is found to be not complete. -func (t *Torrent) onIncompletePiece(piece int) { +func (t *Torrent) onIncompletePiece(piece pieceIndex) { if t.pieceAllDirty(piece) { t.pendAllChunkSpecs(piece) } @@ -1690,7 +1689,7 @@ func (t *Torrent) verifyPiece(piece pieceIndex) { for p.hashing || t.storage == nil { cl.event.Wait() } - if !p.t.piecesQueuedForHash.Remove(piece) { + if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) { panic("piece was not queued") } t.updatePiecePriority(piece) @@ -1713,7 +1712,7 @@ func (t *Torrent) verifyPiece(piece pieceIndex) { // Return the connections that touched a piece, and clear the entries while // doing it. -func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) { +func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) { for c := range t.pieces[piece].dirtiers { delete(c.peerTouchedPieces, piece) ret = append(ret, c) @@ -1730,19 +1729,19 @@ func (t *Torrent) connsAsSlice() (ret []*connection) { } // Currently doesn't really queue, but should in the future. -func (t *Torrent) queuePieceCheck(pieceIndex int) { +func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { piece := &t.pieces[pieceIndex] if piece.queuedForHash() { return } - t.piecesQueuedForHash.Add(pieceIndex) + t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) t.publishPieceChange(pieceIndex) t.updatePiecePriority(pieceIndex) go t.verifyPiece(pieceIndex) } func (t *Torrent) VerifyData() { - for i := range iter.N(t.NumPieces()) { + for i := pieceIndex(0); i < t.NumPieces(); i++ { t.Piece(i).VerifyData() } } diff --git a/torrent_test.go b/torrent_test.go index 2e28f96a..34e5e225 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -95,7 +95,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) { r.Seek(3500000, 0) } assert.Len(b, t.readers, 7) - for i := 0; i < t.numPieces(); i += 3 { + for i := 0; i < int(t.numPieces()); i += 3 { t.completedPieces.Set(i, true) } t.DownloadPieces(0, t.numPieces()) -- 2.44.0