From 085be622a08f61cf15d1fb6d653174fe3e6b167d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 5 Aug 2015 02:40:46 +1000 Subject: [PATCH] Replacing pendingWrites WaitGroup with Mutex/Cond/int Apparently I've been using WaitGroups wrong all along. --- client.go | 20 +++++++++++++++++--- piece.go | 5 ++++- reader.go | 6 +++++- torrent.go | 7 ++++++- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index a885eb60..198c04b9 100644 --- a/client.go +++ b/client.go @@ -1440,7 +1440,12 @@ another: func (me *Client) sendChunk(t *torrent, c *connection, r request) error { b := make([]byte, r.Length) - t.Pieces[r.Index].pendingWrites.Wait() + tp := t.Pieces[r.Index] + tp.pendingWritesMutex.Lock() + for tp.pendingWrites != 0 { + tp.noPendingWrites.Wait() + } + tp.pendingWritesMutex.Unlock() p := t.Info.Piece(int(r.Index)) n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) if err != nil { @@ -2536,9 +2541,18 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er me.upload(t, c) - piece.pendingWrites.Add(1) + piece.pendingWritesMutex.Lock() + piece.pendingWrites++ + piece.pendingWritesMutex.Unlock() go func() { - defer piece.pendingWrites.Done() + defer func() { + piece.pendingWritesMutex.Lock() + piece.pendingWrites-- + if piece.pendingWrites == 0 { + piece.noPendingWrites.Broadcast() + } + piece.pendingWritesMutex.Unlock() + }() // Write the chunk out. tr := perf.NewTimer() err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) diff --git a/piece.go b/piece.go index 9a3a1729..38921552 100644 --- a/piece.go +++ b/piece.go @@ -30,7 +30,10 @@ type piece struct { EverHashed bool Event sync.Cond Priority piecePriority - pendingWrites sync.WaitGroup + + pendingWritesMutex sync.Mutex + pendingWrites int + noPendingWrites sync.Cond } func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { diff --git a/reader.go b/reader.go index c9a0675e..32dae432 100644 --- a/reader.go +++ b/reader.go @@ -128,7 +128,11 @@ again: if int64(len(b1)) > ip.Length()-po { b1 = b1[:ip.Length()-po] } - tp.pendingWrites.Wait() + tp.pendingWritesMutex.Lock() + for tp.pendingWrites != 0 { + tp.noPendingWrites.Wait() + } + tp.pendingWritesMutex.Unlock() n, err = dataReadAt(r.t.data, b1, pos) if n != 0 { err = nil diff --git a/torrent.go b/torrent.go index 66123a8e..bb8465df 100644 --- a/torrent.go +++ b/torrent.go @@ -225,6 +225,7 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s for _, hash := range infoPieceHashes(md) { piece := &piece{} piece.Event.L = eventLocker + piece.noPendingWrites.L = &piece.pendingWritesMutex missinggo.CopyExact(piece.Hash[:], hash) t.Pieces = append(t.Pieces, piece) } @@ -638,7 +639,11 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) { func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() p := t.Pieces[piece] - p.pendingWrites.Wait() + p.pendingWritesMutex.Lock() + for p.pendingWrites != 0 { + p.noPendingWrites.Wait() + } + p.pendingWritesMutex.Unlock() t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) missinggo.CopyExact(ps[:], hash.Sum(nil)) return -- 2.44.0