From b3a8020401bb5ac1772693234dc92c83105b2796 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 16 Oct 2015 22:10:03 +1100 Subject: [PATCH] Store pieces inplace in torrent.Pieces This should save some allocation overhead, especially for torrents that have 20k+ pieces. --- client.go | 22 +++++++++++----------- client_test.go | 2 +- reader.go | 2 +- torrent.go | 26 ++++++++++++++------------ 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/client.go b/client.go index a5139307..121b0849 100644 --- a/client.go +++ b/client.go @@ -110,7 +110,7 @@ const ( // Currently doesn't really queue, but should in the future. func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { - piece := t.Pieces[pieceIndex] + piece := &t.Pieces[pieceIndex] if piece.QueuedForHash { return } @@ -122,7 +122,7 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { // Queue a piece check if one isn't already queued, and the piece has never // been checked before. func (cl *Client) queueFirstHash(t *torrent, piece int) { - p := t.Pieces[piece] + p := &t.Pieces[piece] if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) { return } @@ -347,7 +347,7 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) if priority != PiecePriorityNone { cl.queueFirstHash(t, piece) } - p := t.Pieces[piece] + p := &t.Pieces[piece] if p.Priority != priority { p.Priority = priority cl.pieceChanged(t, piece) @@ -1448,7 +1448,7 @@ func (me *Client) sendChunk(t *torrent, c *connection, r request) error { // Count the chunk being sent, even if it isn't. c.chunksSent++ b := make([]byte, r.Length) - tp := t.Pieces[r.Index] + tp := &t.Pieces[r.Index] tp.pendingWritesMutex.Lock() for tp.pendingWrites != 0 { tp.noPendingWrites.Wait() @@ -1895,8 +1895,8 @@ func (cl *Client) startTorrent(t *torrent) { if !cl.config.NoUpload { // Queue all pieces for hashing. This is done sequentially to avoid // spamming goroutines. - for _, p := range t.Pieces { - p.QueuedForHash = true + for i := range t.Pieces { + t.Pieces[i].QueuedForHash = true } go func() { for i := range t.Pieces { @@ -2511,7 +2511,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) { c.pieceRequestOrder.DeletePiece(pieceIndex) continue } - piece := t.Pieces[pieceIndex] + piece := &t.Pieces[pieceIndex] for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) { r := request{pp.Integer(pieceIndex), cs} if !addRequest(r) { @@ -2549,7 +2549,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er unexpectedChunksReceived.Add(1) } - piece := t.Pieces[req.Index] + piece := &t.Pieces[req.Index] // Do we actually want this chunk? if !t.wantChunk(req) { @@ -2630,7 +2630,7 @@ func (me *Client) reapPieceTouches(t *torrent, piece int) (ret []*connection) { } func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { - p := t.Pieces[piece] + p := &t.Pieces[piece] if p.EverHashed { // Don't score the first time a piece is hashed, it could be an // initial check. @@ -2661,7 +2661,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { // TODO: Check this isn't called more than once for each piece being correct. func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) - p := t.Pieces[piece] + p := &t.Pieces[piece] defer t.publishPieceChange(piece) defer p.Event.Broadcast() if correct { @@ -2705,7 +2705,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) { func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { cl.mu.Lock() defer cl.mu.Unlock() - p := t.Pieces[index] + p := &t.Pieces[index] for p.Hashing || t.data == nil { cl.event.Wait() } diff --git a/client_test.go b/client_test.go index d8707a9f..b7e4238a 100644 --- a/client_test.go +++ b/client_test.go @@ -106,7 +106,7 @@ func TestTorrentInitialState(t *testing.T) { if len(tor.Pieces) != 3 { t.Fatal("wrong number of pieces") } - p := tor.Pieces[0] + p := &tor.Pieces[0] tor.pendAllChunkSpecs(0) assert.EqualValues(t, 3, p.numPendingChunks()) assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) diff --git a/reader.go b/reader.go index 32dae432..0d2561d3 100644 --- a/reader.go +++ b/reader.go @@ -122,7 +122,7 @@ again: r.t.cl.mu.Unlock() b1 := b[:avail] pi := int(pos / r.t.Info().PieceLength) - tp := r.t.torrent.Pieces[pi] + tp := &r.t.torrent.Pieces[pi] ip := r.t.Info().Piece(pi) po := pos % ip.Length() if int64(len(b1)) > ip.Length()-po { diff --git a/torrent.go b/torrent.go index e1faf0ae..e1ad41ca 100644 --- a/torrent.go +++ b/torrent.go @@ -26,7 +26,7 @@ func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) { if t.pieceComplete(index) { return 0 } - piece := t.Pieces[index] + piece := &t.Pieces[index] pieceLength := t.pieceLength(index) if !piece.EverHashed { return pieceLength @@ -54,7 +54,7 @@ type torrent struct { ceasingNetworking chan struct{} InfoHash InfoHash - Pieces []*piece + Pieces []piece // Values are the piece indices that changed. pieceStateChanges *pubsub.PubSub chunkSize pp.Integer @@ -245,12 +245,13 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s } t.MetaData = infoBytes t.metadataHave = nil - for _, hash := range infoPieceHashes(md) { - piece := &piece{} + hashes := infoPieceHashes(md) + t.Pieces = make([]piece, len(hashes)) + for i, hash := range hashes { + piece := &t.Pieces[i] piece.Event.L = eventLocker piece.noPendingWrites.L = &piece.pendingWritesMutex missinggo.CopyExact(piece.Hash[:], hash) - t.Pieces = append(t.Pieces, piece) } for _, conn := range t.Conns { t.initRequestOrdering(conn) @@ -315,7 +316,7 @@ func (t *torrent) Name() string { } func (t *torrent) pieceState(index int) (ret PieceState) { - p := t.Pieces[index] + p := &t.Pieces[index] ret.Priority = p.Priority if t.pieceComplete(index) { ret.Complete = true @@ -583,7 +584,8 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { } func (t *torrent) bitfield() (bf []bool) { - for _, p := range t.Pieces { + for i := range t.Pieces { + p := &t.Pieces[i] // TODO: Check this logic. bf = append(bf, p.EverHashed && p.numPendingChunks() == 0) } @@ -622,7 +624,7 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) { } func (t *torrent) pendAllChunkSpecs(pieceIndex int) { - piece := t.Pieces[pieceIndex] + piece := &t.Pieces[pieceIndex] if piece.PendingChunkSpecs == nil { // Allocate to exact size. piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize) @@ -656,7 +658,7 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) { func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() - p := t.Pieces[piece] + p := &t.Pieces[piece] p.pendingWritesMutex.Lock() for p.pendingWrites != 0 { p.noPendingWrites.Wait() @@ -709,7 +711,7 @@ func (t *torrent) haveChunk(r request) bool { if t.pieceComplete(int(r.Index)) { return true } - p := t.Pieces[r.Index] + p := &t.Pieces[r.Index] if p.PendingChunkSpecs == nil { return false } @@ -747,7 +749,7 @@ func (t *torrent) wantPiece(index int) bool { if !t.haveInfo() { return false } - p := t.Pieces[index] + p := &t.Pieces[index] if p.QueuedForHash { return false } @@ -795,7 +797,7 @@ func (t *torrent) worstBadConn(cl *Client) *connection { func (t *torrent) publishPieceChange(piece int) { cur := t.pieceState(piece) - p := t.Pieces[piece] + p := &t.Pieces[piece] if cur != p.PublicPieceState { t.pieceStateChanges.Publish(piece) } -- 2.48.1