]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Replacing pendingWrites WaitGroup with Mutex/Cond/int
authorMatt Joiner <anacrolix@gmail.com>
Tue, 4 Aug 2015 16:40:46 +0000 (02:40 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 4 Aug 2015 16:40:46 +0000 (02:40 +1000)
Apparently I've been using WaitGroups wrong all along.

client.go
piece.go
reader.go
torrent.go

index a885eb6013c0bb3f7dd7a17868f594b0182f423b..198c04b9d779e80fac365c669210c3430839e071 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1440,7 +1440,12 @@ another:
 
 func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
        b := make([]byte, r.Length)
-       t.Pieces[r.Index].pendingWrites.Wait()
+       tp := t.Pieces[r.Index]
+       tp.pendingWritesMutex.Lock()
+       for tp.pendingWrites != 0 {
+               tp.noPendingWrites.Wait()
+       }
+       tp.pendingWritesMutex.Unlock()
        p := t.Info.Piece(int(r.Index))
        n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
        if err != nil {
@@ -2536,9 +2541,18 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        me.upload(t, c)
 
-       piece.pendingWrites.Add(1)
+       piece.pendingWritesMutex.Lock()
+       piece.pendingWrites++
+       piece.pendingWritesMutex.Unlock()
        go func() {
-               defer piece.pendingWrites.Done()
+               defer func() {
+                       piece.pendingWritesMutex.Lock()
+                       piece.pendingWrites--
+                       if piece.pendingWrites == 0 {
+                               piece.noPendingWrites.Broadcast()
+                       }
+                       piece.pendingWritesMutex.Unlock()
+               }()
                // Write the chunk out.
                tr := perf.NewTimer()
                err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
index 9a3a172917e55a2943958ecf0d67ce8e69c85352..3892155235485925def628def99d7be14142ae0d 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,7 +30,10 @@ type piece struct {
        EverHashed        bool
        Event             sync.Cond
        Priority          piecePriority
-       pendingWrites     sync.WaitGroup
+
+       pendingWritesMutex sync.Mutex
+       pendingWrites      int
+       noPendingWrites    sync.Cond
 }
 
 func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
index c9a0675eb21cf910ac24ef1c9c8695e978efb41a..32dae43270c9fce90988535d55f9a12d395d4e1b 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -128,7 +128,11 @@ again:
        if int64(len(b1)) > ip.Length()-po {
                b1 = b1[:ip.Length()-po]
        }
-       tp.pendingWrites.Wait()
+       tp.pendingWritesMutex.Lock()
+       for tp.pendingWrites != 0 {
+               tp.noPendingWrites.Wait()
+       }
+       tp.pendingWritesMutex.Unlock()
        n, err = dataReadAt(r.t.data, b1, pos)
        if n != 0 {
                err = nil
index 66123a8e95e95820ac90593e12dc54d060bfd4a5..bb8465df591144ba5d77bb2acb3768e59232ae1f 100644 (file)
@@ -225,6 +225,7 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker s
        for _, hash := range infoPieceHashes(md) {
                piece := &piece{}
                piece.Event.L = eventLocker
+               piece.noPendingWrites.L = &piece.pendingWritesMutex
                missinggo.CopyExact(piece.Hash[:], hash)
                t.Pieces = append(t.Pieces, piece)
        }
@@ -638,7 +639,11 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
        hash := pieceHash.New()
        p := t.Pieces[piece]
-       p.pendingWrites.Wait()
+       p.pendingWritesMutex.Lock()
+       for p.pendingWrites != 0 {
+               p.noPendingWrites.Wait()
+       }
+       p.pendingWritesMutex.Unlock()
        t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
        missinggo.CopyExact(ps[:], hash.Sum(nil))
        return