Reported by Craig Campbell <iamcraigcampbell@gmail.com>.
        // 64-bit alignment of fields. See #262.
        stats ConnStats
 
-       _mu    sync.RWMutex
+       _mu    lockWithDeferreds
        event  sync.Cond
        closed missinggo.Event
 
 
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
        "path/filepath"
        "reflect"
        LeecherDownloadRateLimiter *rate.Limiter
 }
 
+func logPieceStateChanges(t *Torrent) {
+       sub := t.SubscribePieceStateChanges()
+       go func() {
+               defer sub.Close()
+               for e := range sub.Values {
+                       log.Printf("%p %#v", t, e)
+               }
+       }()
+}
+
 // Creates a seeder and a leecher, and ensures the data transfers when a read
 // is attempted on the leecher.
 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        }())
        require.NoError(t, err)
        assert.True(t, new)
+
+       //// This was used when observing coalescing of piece state changes.
+       //logPieceStateChanges(leecherTorrent)
+
        // Now do some things with leecher and seeder.
        leecherTorrent.AddClientPeer(seeder)
        // The Torrent should not be interested in obtaining peers, so the one we
 
--- /dev/null
+package torrent
+
+import "sync"
+
+// Runs deferred actions on Unlock. Note that actions are assumed to be the results of changes that
+// would only occur with a write lock at present. The race detector should catch instances of defers
+// without the write lock being held.
+type lockWithDeferreds struct {
+       internal      sync.RWMutex
+       unlockActions []func()
+}
+
+func (me *lockWithDeferreds) Lock() {
+       me.internal.Lock()
+}
+
+func (me *lockWithDeferreds) Unlock() {
+       for _, a := range me.unlockActions {
+               a()
+       }
+       me.unlockActions = me.unlockActions[:0]
+       me.internal.Unlock()
+}
+
+func (me *lockWithDeferreds) RLock() {
+       me.internal.RLock()
+}
+
+func (me *lockWithDeferreds) RUnlock() {
+       me.internal.RUnlock()
+}
+
+func (me *lockWithDeferreds) Defer(action func()) {
+       me.unlockActions = append(me.unlockActions, action)
+}
 
 }
 
 func (t *Torrent) publishPieceChange(piece pieceIndex) {
-       cur := t.pieceState(piece)
-       p := &t.pieces[piece]
-       if cur != p.publicPieceState {
-               p.publicPieceState = cur
-               t.pieceStateChanges.Publish(PieceStateChange{
-                       int(piece),
-                       cur,
-               })
-       }
+       t.cl._mu.Defer(func() {
+               cur := t.pieceState(piece)
+               p := &t.pieces[piece]
+               if cur != p.publicPieceState {
+                       p.publicPieceState = cur
+                       t.pieceStateChanges.Publish(PieceStateChange{
+                               int(piece),
+                               cur,
+                       })
+               }
+       })
 }
 
 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
        return
 }
 
-// Currently doesn't really queue, but should in the future.
 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
        piece := t.piece(pieceIndex)
        if piece.queuedForHash() {