client.go | 20 +++++++++++++++++--- piece.go | 5 ++++- reader.go | 6 +++++- torrent.go | 7 ++++++- diff --git a/client.go b/client.go index a885eb6013c0bb3f7dd7a17868f594b0182f423b..198c04b9d779e80fac365c669210c3430839e071 100644 --- a/client.go +++ b/client.go @@ -1440,7 +1440,12 @@ } func (me *Client) sendChunk(t *torrent, c *connection, r request) error { b := make([]byte, r.Length) - t.Pieces[r.Index].pendingWrites.Wait() + tp := t.Pieces[r.Index] + tp.pendingWritesMutex.Lock() + for tp.pendingWrites != 0 { + tp.noPendingWrites.Wait() + } + tp.pendingWritesMutex.Unlock() p := t.Info.Piece(int(r.Index)) n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin)) if err != nil { @@ -2536,9 +2541,18 @@ c.lastUsefulChunkReceived = time.Now() me.upload(t, c) - piece.pendingWrites.Add(1) + piece.pendingWritesMutex.Lock() + piece.pendingWrites++ + piece.pendingWritesMutex.Unlock() go func() { - defer piece.pendingWrites.Done() + defer func() { + piece.pendingWritesMutex.Lock() + piece.pendingWrites-- + if piece.pendingWrites == 0 { + piece.noPendingWrites.Broadcast() + } + piece.pendingWritesMutex.Unlock() + }() // Write the chunk out. tr := perf.NewTimer() err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) diff --git a/piece.go b/piece.go index 9a3a172917e55a2943958ecf0d67ce8e69c85352..3892155235485925def628def99d7be14142ae0d 100644 --- a/piece.go +++ b/piece.go @@ -30,7 +30,10 @@ QueuedForHash bool EverHashed bool Event sync.Cond Priority piecePriority - pendingWrites sync.WaitGroup + + pendingWritesMutex sync.Mutex + pendingWrites int + noPendingWrites sync.Cond } func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { diff --git a/reader.go b/reader.go index c9a0675eb21cf910ac24ef1c9c8695e978efb41a..32dae43270c9fce90988535d55f9a12d395d4e1b 100644 --- a/reader.go +++ b/reader.go @@ -128,7 +128,11 @@ po := pos % ip.Length() if int64(len(b1)) > ip.Length()-po { b1 = b1[:ip.Length()-po] } - tp.pendingWrites.Wait() + tp.pendingWritesMutex.Lock() + for tp.pendingWrites != 0 { + tp.noPendingWrites.Wait() + } + tp.pendingWritesMutex.Unlock() n, err = dataReadAt(r.t.data, b1, pos) if n != 0 { err = nil diff --git a/torrent.go b/torrent.go index 66123a8e95e95820ac90593e12dc54d060bfd4a5..bb8465df591144ba5d77bb2acb3768e59232ae1f 100644 --- a/torrent.go +++ b/torrent.go @@ -225,6 +225,7 @@ t.metadataHave = nil for _, hash := range infoPieceHashes(md) { piece := &piece{} piece.Event.L = eventLocker + piece.noPendingWrites.L = &piece.pendingWritesMutex missinggo.CopyExact(piece.Hash[:], hash) t.Pieces = append(t.Pieces, piece) } @@ -638,7 +639,11 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() p := t.Pieces[piece] - p.pendingWrites.Wait() + p.pendingWritesMutex.Lock() + for p.pendingWrites != 0 { + p.noPendingWrites.Wait() + } + p.pendingWritesMutex.Unlock() t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) missinggo.CopyExact(ps[:], hash.Sum(nil)) return