]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Law of Demeter Client.mu
authorMatt Joiner <anacrolix@gmail.com>
Wed, 25 Jul 2018 03:41:50 +0000 (13:41 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 25 Jul 2018 03:42:28 +0000 (13:42 +1000)
12 files changed:
client.go
client_test.go
connection.go
connection_test.go
file.go
piece.go
portfwd.go
reader.go
t.go
torrent.go
torrent_test.go
tracker_scraper.go

index ec388ceacb9c605c074af17756741e5c4ecba271..09226f5452e611ecdee716b96a0e362b2844b053 100644 (file)
--- a/client.go
+++ b/client.go
@@ -41,8 +41,9 @@ import (
 type Client struct {
        // An aggregate of stats over all connections. First in struct to ensure
        // 64-bit alignment of fields. See #262.
-       stats  ConnStats
-       mu     sync.RWMutex
+       stats ConnStats
+
+       _mu    sync.RWMutex
        event  sync.Cond
        closed missinggo.Event
 
@@ -71,8 +72,8 @@ type Client struct {
 type ipStr string
 
 func (cl *Client) BadPeerIPs() []string {
-       cl.mu.RLock()
-       defer cl.mu.RUnlock()
+       cl.rLock()
+       defer cl.rUnlock()
        return cl.badPeerIPsLocked()
 }
 
@@ -117,8 +118,8 @@ func writeDhtServerStatus(w io.Writer, s *dht.Server) {
 // Writes out a human readable status of the client, such as for writing to a
 // HTTP status page.
 func (cl *Client) WriteStatus(_w io.Writer) {
-       cl.mu.RLock()
-       defer cl.mu.RUnlock()
+       cl.rLock()
+       defer cl.rUnlock()
        w := bufio.NewWriter(_w)
        defer w.Flush()
        fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
@@ -193,7 +194,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                cl.Close()
        }()
        cl.extensionBytes = defaultPeerExtensionBytes()
-       cl.event.L = &cl.mu
+       cl.event.L = cl.locker()
        storageImpl := cfg.DefaultStorage
        if storageImpl == nil {
                // We'd use mmap but HFS+ doesn't support sparse files.
@@ -291,8 +292,8 @@ func firstNonEmptyString(ss ...string) string {
 }
 
 func (cl *Client) Closed() <-chan struct{} {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        return cl.closed.C()
 }
 
@@ -313,8 +314,8 @@ func (cl *Client) closeSockets() {
 // Stops the client. All connections to peers are closed and all activity will
 // come to a halt.
 func (cl *Client) Close() {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.closed.Set()
        cl.eachDhtServer(func(s *dht.Server) { s.Close() })
        cl.closeSockets()
@@ -375,13 +376,13 @@ func (cl *Client) acceptConnections(l net.Listener) {
        for {
                conn, err := l.Accept()
                conn = pproffd.WrapNetConn(conn)
-               cl.mu.RLock()
+               cl.rLock()
                closed := cl.closed.IsSet()
                reject := false
                if conn != nil {
                        reject = cl.rejectAccepted(conn)
                }
-               cl.mu.RUnlock()
+               cl.rUnlock()
                if closed {
                        if conn != nil {
                                conn.Close()
@@ -418,8 +419,8 @@ func (cl *Client) incomingConnection(nc net.Conn) {
 
 // Returns a handle to the given torrent, if it's present in the client.
 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t, ok = cl.torrents[ih]
        return
 }
@@ -513,8 +514,8 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
                }()
        }
        func() {
-               cl.mu.Lock()
-               defer cl.mu.Unlock()
+               cl.lock()
+               defer cl.unlock()
                cl.eachListener(func(s socket) bool {
                        if peerNetworkEnabled(s.Addr().Network(), cl.config) {
                                dial(s.dial)
@@ -594,8 +595,8 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C
 // for valid reasons.
 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
        ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
-               cl.mu.RLock()
-               defer cl.mu.RUnlock()
+               cl.rLock()
+               defer cl.rUnlock()
                return t.dialTimeout()
        }())
        defer cancel()
@@ -628,8 +629,8 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
 // considered half-open.
 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
        c, err := cl.establishOutgoingConn(t, addr)
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        // Don't release lock between here and addConnection, unless it's for
        // failure.
        cl.noLongerHalfOpen(t, addr)
@@ -688,8 +689,8 @@ func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err er
 
 // Calls f with any secret keys.
 func (cl *Client) forSkeys(f func([]byte) bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        for ih := range cl.torrents {
                if !f(ih[:]) {
                        break
@@ -721,9 +722,9 @@ func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
        if !ok {
                return
        }
-       cl.mu.Lock()
+       cl.lock()
        t = cl.torrents[ih]
-       cl.mu.Unlock()
+       cl.unlock()
        return
 }
 
@@ -755,21 +756,21 @@ func (cl *Client) runReceivedConn(c *connection) {
                        "network", c.remoteAddr().Network(),
                ).Log(cl.logger)
                torrent.Add("error receiving handshake", 1)
-               cl.mu.Lock()
+               cl.lock()
                cl.onBadAccept(c.remoteAddr())
-               cl.mu.Unlock()
+               cl.unlock()
                return
        }
        if t == nil {
                torrent.Add("received handshake for unloaded torrent", 1)
-               cl.mu.Lock()
+               cl.lock()
                cl.onBadAccept(c.remoteAddr())
-               cl.mu.Unlock()
+               cl.unlock()
                return
        }
        torrent.Add("received handshake for loaded torrent", 1)
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.runHandshookConn(c, t)
 }
 
@@ -958,7 +959,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                networkingEnabled: true,
                requestStrategy:   3,
                metadataChanged: sync.Cond{
-                       L: &cl.mu,
+                       L: cl.locker(),
                },
                duplicateRequestTimeout: 1 * time.Second,
        }
@@ -983,8 +984,8 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
 // If the torrent already exists then this Storage is ignored and the
 // existing torrent returned with `new` set to `false`
 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t, ok := cl.torrents[infoHash]
        if ok {
                return
@@ -1020,8 +1021,8 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e
                        return
                }
        }
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        if spec.ChunkSize != 0 {
                t.setChunkSize(pp.Integer(spec.ChunkSize))
        }
@@ -1059,8 +1060,8 @@ func (cl *Client) allTorrentsCompleted() bool {
 // Returns true when all torrents are completely downloaded and false if the
 // client is stopped before that.
 func (cl *Client) WaitAll() bool {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        for !cl.allTorrentsCompleted() {
                if cl.closed.IsSet() {
                        return false
@@ -1072,8 +1073,8 @@ func (cl *Client) WaitAll() bool {
 
 // Returns handles to all the torrents loaded in the Client.
 func (cl *Client) Torrents() []*Torrent {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        return cl.torrentsAsSlice()
 }
 
@@ -1149,7 +1150,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
                PeerMaxRequests: 250,
                writeBuffer:     new(bytes.Buffer),
        }
-       c.writerCond.L = &cl.mu
+       c.writerCond.L = cl.locker()
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
@@ -1159,8 +1160,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
 }
 
 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t := cl.torrent(ih)
        if t == nil {
                return
@@ -1224,8 +1225,8 @@ func (cl *Client) publicAddr(peer net.IP) ipPort {
 }
 
 func (cl *Client) ListenAddrs() (ret []net.Addr) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.eachListener(func(l socket) bool {
                ret = append(ret, l.Addr())
                return true
@@ -1255,12 +1256,12 @@ func (cl *Client) clearAcceptLimits() {
 func (cl *Client) acceptLimitClearer() {
        for {
                select {
-               case <-cl.closed.LockedChan(&cl.mu):
+               case <-cl.closed.LockedChan(cl.locker()):
                        return
                case <-time.After(15 * time.Minute):
-                       cl.mu.Lock()
+                       cl.lock()
                        cl.clearAcceptLimits()
-                       cl.mu.Unlock()
+                       cl.unlock()
                }
        }
 }
@@ -1271,3 +1272,35 @@ func (cl *Client) rateLimitAccept(ip net.IP) bool {
        }
        return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
 }
+
+func (cl *Client) rLock() {
+       cl._mu.RLock()
+}
+
+func (cl *Client) rUnlock() {
+       cl._mu.RUnlock()
+}
+
+func (cl *Client) lock() {
+       cl._mu.Lock()
+}
+
+func (cl *Client) unlock() {
+       cl._mu.Unlock()
+}
+
+func (cl *Client) locker() sync.Locker {
+       return clientLocker{cl}
+}
+
+type clientLocker struct {
+       *Client
+}
+
+func (cl clientLocker) Lock() {
+       cl.lock()
+}
+
+func (cl clientLocker) Unlock() {
+       cl.unlock()
+}
index d8ada22e007b6103eadefd93f62c41ae9a5c537f..f36db55254b4e3416d5b278674cd395855dea5a6 100644 (file)
@@ -112,15 +112,15 @@ func TestTorrentInitialState(t *testing.T) {
                storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
        )
        tor.setChunkSize(2)
-       tor.cl.mu.Lock()
+       tor.cl.lock()
        err := tor.setInfoBytes(mi.InfoBytes)
-       tor.cl.mu.Unlock()
+       tor.cl.unlock()
        require.NoError(t, err)
        require.Len(t, tor.pieces, 3)
        tor.pendAllChunkSpecs(0)
-       tor.cl.mu.Lock()
+       tor.cl.lock()
        assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
-       tor.cl.mu.Unlock()
+       tor.cl.unlock()
        assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
 }
 
@@ -782,12 +782,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
        psc := leecherGreeting.SubscribePieceStateChanges()
        defer psc.Close()
 
-       leecherGreeting.cl.mu.Lock()
+       leecherGreeting.cl.lock()
        leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
        if ps.Cancel {
                leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
        }
-       leecherGreeting.cl.mu.Unlock()
+       leecherGreeting.cl.unlock()
        done := make(chan struct{})
        defer close(done)
        go leecherGreeting.AddClientPeer(seeder)
@@ -909,9 +909,9 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) {
 
 func totalConns(tts []*Torrent) (ret int) {
        for _, tt := range tts {
-               tt.cl.mu.Lock()
+               tt.cl.lock()
                ret += len(tt.conns)
-               tt.cl.mu.Unlock()
+               tt.cl.unlock()
        }
        return
 }
index 883d6c8ecb5da60714a7ef4ce41684ffa5913256..563961f52def67706a587f606b4e54d774183b6d 100644 (file)
@@ -177,7 +177,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) {
 }
 
 func (cn *connection) mu() sync.Locker {
-       return &cn.t.cl.mu
+       return cn.t.cl.locker()
 }
 
 func (cn *connection) remoteAddr() net.Addr {
@@ -1084,8 +1084,8 @@ func (c *connection) mainReadLoop() (err error) {
        for {
                var msg pp.Message
                func() {
-                       cl.mu.Unlock()
-                       defer cl.mu.Lock()
+                       cl.unlock()
+                       defer cl.lock()
                        err = decoder.Decode(&msg)
                }()
                if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
@@ -1324,8 +1324,8 @@ func (c *connection) receiveChunk(msg *pp.Message) error {
        }
 
        err := func() error {
-               cl.mu.Unlock()
-               defer cl.mu.Lock()
+               cl.unlock()
+               defer cl.lock()
                // Write the chunk out. Note that the upper bound on chunk writing
                // concurrency will be the number of connections. We write inline with
                // receiving the chunk (with this lock dance), because we want to
index a6c3f318bd16c93ff1efb4e17696c0074029c4f3..949c55f44ecc8cb82a3b825a8627bfc2c7224b81 100644 (file)
@@ -114,7 +114,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                Piece: make([]byte, defaultChunkSize),
        }
        go func() {
-               cl.mu.Lock()
+               cl.lock()
                err := cn.mainReadLoop()
                if err != nil {
                        mrlErr <- err
@@ -127,12 +127,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                defer w.Close()
                ts.writeSem.Lock()
                for range iter.N(b.N) {
-                       cl.mu.Lock()
+                       cl.lock()
                        // The chunk must be written to storage everytime, to ensure the
                        // writeSem is unlocked.
                        t.pieces[0].dirtyChunks.Clear()
                        cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
-                       cl.mu.Unlock()
+                       cl.unlock()
                        n, err := w.Write(wb)
                        require.NoError(b, err)
                        require.EqualValues(b, len(wb), n)
diff --git a/file.go b/file.go
index 6c728d15e3d5de5662a431091993cf34b9f4e61a..4074700a53d31ca6afc88c008c808c408c559464 100644 (file)
--- a/file.go
+++ b/file.go
@@ -59,8 +59,8 @@ type FilePieceState struct {
 
 // Returns the state of pieces in this file.
 func (f *File) State() (ret []FilePieceState) {
-       f.t.cl.mu.RLock()
-       defer f.t.cl.mu.RUnlock()
+       f.t.cl.rLock()
+       defer f.t.cl.rUnlock()
        pieceSize := int64(f.t.usualPieceSize())
        off := f.offset % pieceSize
        remaining := f.length
@@ -102,7 +102,7 @@ func (f *File) Cancel() {
 
 func (f *File) NewReader() Reader {
        tr := reader{
-               mu:        &f.t.cl.mu,
+               mu:        f.t.cl.locker(),
                t:         f.t,
                readahead: 5 * 1024 * 1024,
                offset:    f.Offset(),
@@ -114,8 +114,8 @@ func (f *File) NewReader() Reader {
 
 // Sets the minimum priority for pieces in the File.
 func (f *File) SetPriority(prio piecePriority) {
-       f.t.cl.mu.Lock()
-       defer f.t.cl.mu.Unlock()
+       f.t.cl.lock()
+       defer f.t.cl.unlock()
        if prio == f.prio {
                return
        }
@@ -125,8 +125,8 @@ func (f *File) SetPriority(prio piecePriority) {
 
 // Returns the priority per File.SetPriority.
 func (f *File) Priority() piecePriority {
-       f.t.cl.mu.Lock()
-       defer f.t.cl.mu.Unlock()
+       f.t.cl.lock()
+       defer f.t.cl.unlock()
        return f.prio
 }
 
index 9729c44c2d71a9a21c2bb5f42c8b14c11295bd3d..6eef0cc13b199b37df647bb9bed5042fef01a3bb 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -180,8 +180,8 @@ func (p *Piece) bytesLeft() (ret pp.Integer) {
 }
 
 func (p *Piece) VerifyData() {
-       p.t.cl.mu.Lock()
-       defer p.t.cl.mu.Unlock()
+       p.t.cl.lock()
+       defer p.t.cl.unlock()
        target := p.numVerifies + 1
        if p.hashing {
                target++
@@ -208,8 +208,8 @@ func (p *Piece) torrentEndOffset() int64 {
 }
 
 func (p *Piece) SetPriority(prio piecePriority) {
-       p.t.cl.mu.Lock()
-       defer p.t.cl.mu.Unlock()
+       p.t.cl.lock()
+       defer p.t.cl.unlock()
        p.priority = prio
        p.t.updatePiecePriority(p.index)
 }
index 03a5c7bc144106400f5a667f2afe2bcfaf5df2cc..27f8549e7f7b87697671332ce497e6cc8a2f8513 100644 (file)
@@ -20,20 +20,20 @@ func addPortMapping(d upnp.Device, proto upnp.Protocol, internalPort int, debug
 }
 
 func (cl *Client) forwardPort() {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        if cl.config.NoDefaultPortForwarding {
                return
        }
-       cl.mu.Unlock()
+       cl.unlock()
        ds := upnp.Discover(0, 2*time.Second)
-       cl.mu.Lock()
+       cl.lock()
        flog.Default.Handle(flog.Fmsg("discovered %d upnp devices", len(ds)))
        port := cl.incomingPeerPort()
-       cl.mu.Unlock()
+       cl.unlock()
        for _, d := range ds {
                go addPortMapping(d, upnp.TCP, port, cl.config.Debug)
                go addPortMapping(d, upnp.UDP, port, cl.config.Debug)
        }
-       cl.mu.Lock()
+       cl.lock()
 }
index d65a18f5cd0cbdf331b1cf0cc4543787770b6348..07eefc87db05e67dccc952f24f245e9dc81966aa 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -69,8 +69,8 @@ func (r *reader) SetReadahead(readahead int64) {
        r.mu.Lock()
        r.readahead = readahead
        r.mu.Unlock()
-       r.t.cl.mu.Lock()
-       defer r.t.cl.mu.Unlock()
+       r.t.cl.lock()
+       defer r.t.cl.unlock()
        r.posChanged()
 }
 
@@ -146,10 +146,10 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
                defer cancel()
                go func() {
                        <-ctx.Done()
-                       r.t.cl.mu.Lock()
+                       r.t.cl.lock()
                        ctxErr = ctx.Err()
                        r.t.tickleReaders()
-                       r.t.cl.mu.Unlock()
+                       r.t.cl.unlock()
                }()
        }
        // Hmmm, if a Read gets stuck, this means you can't change position for
@@ -183,8 +183,8 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
 // Wait until some data should be available to read. Tickles the client if it
 // isn't. Returns how much should be readable without blocking.
 func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
-       r.t.cl.mu.Lock()
-       defer r.t.cl.mu.Unlock()
+       r.t.cl.lock()
+       defer r.t.cl.unlock()
        for !r.readable(pos) && *ctxErr == nil {
                r.waitReadable(pos)
        }
@@ -222,19 +222,19 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
                        err = nil
                        return
                }
-               r.t.cl.mu.Lock()
+               r.t.cl.lock()
                // TODO: Just reset pieces in the readahead window. This might help
                // prevent thrashing with small caches and file and piece priorities.
                log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
                r.t.updateAllPieceCompletions()
                r.t.updateAllPiecePriorities()
-               r.t.cl.mu.Unlock()
+               r.t.cl.unlock()
        }
 }
 
 func (r *reader) Close() error {
-       r.t.cl.mu.Lock()
-       defer r.t.cl.mu.Unlock()
+       r.t.cl.lock()
+       defer r.t.cl.unlock()
        r.t.deleteReader(r)
        return nil
 }
diff --git a/t.go b/t.go
index c9886e5c918d4b3f2abbdc93e71e64accbae03cf..c66935c20c11a2ad6253b63d78de480db8c50eeb 100644 (file)
--- a/t.go
+++ b/t.go
@@ -17,15 +17,15 @@ func (t *Torrent) InfoHash() metainfo.Hash {
 // Returns a channel that is closed when the info (.Info()) for the torrent
 // has become available.
 func (t *Torrent) GotInfo() <-chan struct{} {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.gotMetainfo.C()
 }
 
 // Returns the metainfo info dictionary, or nil if it's not yet available.
 func (t *Torrent) Info() *metainfo.Info {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.info
 }
 
@@ -33,7 +33,7 @@ func (t *Torrent) Info() *metainfo.Info {
 // the data requested is actually available.
 func (t *Torrent) NewReader() Reader {
        r := reader{
-               mu:        &t.cl.mu,
+               mu:        t.cl.locker(),
                t:         t,
                readahead: 5 * 1024 * 1024,
                length:    *t.length,
@@ -46,14 +46,14 @@ func (t *Torrent) NewReader() Reader {
 // same state. The sum of the state run lengths is the number of pieces
 // in the torrent.
 func (t *Torrent) PieceStateRuns() []PieceStateRun {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.pieceStateRuns()
 }
 
 func (t *Torrent) PieceState(piece pieceIndex) PieceState {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.pieceState(piece)
 }
 
@@ -65,8 +65,8 @@ func (t *Torrent) NumPieces() pieceIndex {
 
 // Get missing bytes count for specific piece.
 func (t *Torrent) PieceBytesMissing(piece int) int64 {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
 
        return int64(t.pieces[piece].bytesLeft())
 }
@@ -75,9 +75,9 @@ func (t *Torrent) PieceBytesMissing(piece int) int64 {
 // this. No data corruption can, or should occur to either the torrent's data,
 // or connected peers.
 func (t *Torrent) Drop() {
-       t.cl.mu.Lock()
+       t.cl.lock()
        t.cl.dropTorrent(t.infoHash)
-       t.cl.mu.Unlock()
+       t.cl.unlock()
 }
 
 // Number of bytes of the entire torrent we have completed. This is the sum of
@@ -85,8 +85,8 @@ func (t *Torrent) Drop() {
 // for download rate, as it can go down when pieces are lost or fail checks.
 // Sample Torrent.Stats.DataBytesRead for actual file data download rate.
 func (t *Torrent) BytesCompleted() int64 {
-       t.cl.mu.RLock()
-       defer t.cl.mu.RUnlock()
+       t.cl.rLock()
+       defer t.cl.rUnlock()
        return t.bytesCompleted()
 }
 
@@ -99,24 +99,24 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
 // Returns true if the torrent is currently being seeded. This occurs when the
 // client is willing to upload without wanting anything in return.
 func (t *Torrent) Seeding() bool {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.seeding()
 }
 
 // Clobbers the torrent display name. The display name is used as the torrent
 // name if the metainfo is not available.
 func (t *Torrent) SetDisplayName(dn string) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        t.setDisplayName(dn)
 }
 
 // The current working name for the torrent. Either the name in the info dict,
 // or a display name given such as by the dn value in a magnet link, or "".
 func (t *Torrent) Name() string {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.name()
 }
 
@@ -129,14 +129,14 @@ func (t *Torrent) Length() int64 {
 // Returns a run-time generated metainfo for the torrent that includes the
 // info bytes and announce-list as currently known to the client.
 func (t *Torrent) Metainfo() metainfo.MetaInfo {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.newMetaInfo()
 }
 
 func (t *Torrent) addReader(r *reader) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        if t.readers == nil {
                t.readers = make(map[*reader]struct{})
        }
@@ -153,8 +153,8 @@ func (t *Torrent) deleteReader(r *reader) {
 // priority. Piece indexes are not the same as bytes. Requires that the info
 // has been obtained, see Torrent.Info and Torrent.GotInfo.
 func (t *Torrent) DownloadPieces(begin, end pieceIndex) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        t.downloadPiecesLocked(begin, end)
 }
 
@@ -167,8 +167,8 @@ func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) {
 }
 
 func (t *Torrent) CancelPieces(begin, end pieceIndex) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        t.cancelPiecesLocked(begin, end)
 }
 
@@ -208,8 +208,8 @@ func (t *Torrent) Files() []*File {
 
 func (t *Torrent) AddPeers(pp []Peer) {
        cl := t.cl
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t.addPeers(pp)
 }
 
@@ -228,13 +228,13 @@ func (t *Torrent) String() string {
 }
 
 func (t *Torrent) AddTrackers(announceList [][]string) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        t.addTrackers(announceList)
 }
 
 func (t *Torrent) Piece(i pieceIndex) *Piece {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return &t.pieces[i]
 }
index 9aebad9030b9d51c6ead022182adb4f591053536..0b13fe257608753b4581e3962c35fabc8c70d5b7 100644 (file)
@@ -155,7 +155,7 @@ func (t *Torrent) tickleReaders() {
 
 // Returns a channel that is closed when the Torrent is closed.
 func (t *Torrent) Closed() <-chan struct{} {
-       return t.closed.LockedChan(&t.cl.mu)
+       return t.closed.LockedChan(t.cl.locker())
 }
 
 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
@@ -640,8 +640,8 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
 }
 
 func (t *Torrent) BytesMissing() int64 {
-       t.mu().RLock()
-       defer t.mu().RUnlock()
+       t.cl.rLock()
+       defer t.cl.rUnlock()
        return t.bytesMissingLocked()
 }
 
@@ -1210,8 +1210,8 @@ func (t *Torrent) bytesCompleted() int64 {
 }
 
 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        return t.setInfoBytes(b)
 }
 
@@ -1388,14 +1388,14 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
                                }).String()
                                allAddrs[key] = struct{}{}
                        }
-                       cl.mu.Lock()
+                       cl.lock()
                        t.addPeers(addPeers)
                        numPeers := t.peers.Len()
-                       cl.mu.Unlock()
+                       cl.unlock()
                        if numPeers >= cl.config.TorrentPeersHighWater {
                                return
                        }
-               case <-t.closed.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(cl.locker()):
                        return
                }
        }
@@ -1416,14 +1416,14 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
        cl := t.cl
        for {
                select {
-               case <-t.wantPeersEvent.LockedChan(&cl.mu):
-               case <-t.closed.LockedChan(&cl.mu):
+               case <-t.wantPeersEvent.LockedChan(cl.locker()):
+               case <-t.closed.LockedChan(cl.locker()):
                        return
                }
                err := t.announceDHT(true, s)
                func() {
-                       cl.mu.Lock()
-                       defer cl.mu.Unlock()
+                       cl.lock()
+                       defer cl.unlock()
                        if err == nil {
                                t.numDHTAnnounces++
                        } else {
@@ -1431,7 +1431,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
                        }
                }()
                select {
-               case <-t.closed.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(cl.locker()):
                        return
                case <-time.After(5 * time.Minute):
                }
@@ -1445,8 +1445,8 @@ func (t *Torrent) addPeers(peers []Peer) {
 }
 
 func (t *Torrent) Stats() TorrentStats {
-       t.cl.mu.RLock()
-       defer t.cl.mu.RUnlock()
+       t.cl.rLock()
+       defer t.cl.rUnlock()
        return t.statsLocked()
 }
 
@@ -1561,8 +1561,8 @@ func (t *Torrent) wantConns() bool {
 }
 
 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
+       t.cl.lock()
+       defer t.cl.unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
        wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
@@ -1573,10 +1573,6 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        return oldMax
 }
 
-func (t *Torrent) mu() missinggo.RWLocker {
-       return &t.cl.mu
-}
-
 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
        log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
        if t.closed.IsSet() {
@@ -1679,8 +1675,8 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
 
 func (t *Torrent) verifyPiece(piece pieceIndex) {
        cl := t.cl
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        p := &t.pieces[piece]
        defer func() {
                p.numVerifies++
@@ -1700,10 +1696,10 @@ func (t *Torrent) verifyPiece(piece pieceIndex) {
        t.publishPieceChange(piece)
        t.updatePiecePriority(piece)
        t.storageLock.RLock()
-       cl.mu.Unlock()
+       cl.unlock()
        sum := t.hashPiece(piece)
        t.storageLock.RUnlock()
-       cl.mu.Lock()
+       cl.lock()
        p.hashing = false
        t.updatePiecePriority(piece)
        t.pieceHashed(piece, sum == p.hash)
index e10cc5289bd830f0245e0c296267623438eb19f2..22053794368c87b888657fefee096919ca735505 100644 (file)
@@ -154,13 +154,13 @@ func TestPieceHashFailed(t *testing.T) {
        tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
        tt.setChunkSize(2)
        require.NoError(t, tt.setInfoBytes(mi.InfoBytes))
-       tt.cl.mu.Lock()
+       tt.cl.lock()
        tt.pieces[1].dirtyChunks.AddRange(0, 3)
        require.True(t, tt.pieceAllDirty(1))
        tt.pieceHashed(1, false)
        // Dirty chunks should be cleared so we can try again.
        require.False(t, tt.pieceAllDirty(1))
-       tt.cl.mu.Unlock()
+       tt.cl.unlock()
 }
 
 // Check the behaviour of Torrent.Metainfo when metadata is not completed.
@@ -194,8 +194,8 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
        assert.EqualValues(t, 0, tt.metadataSize())
 
        func() {
-               cl.mu.Lock()
-               defer cl.mu.Unlock()
+               cl.lock()
+               defer cl.unlock()
                go func() {
                        _, err = nc.Write(pp.Message{
                                Type:       pp.Extended,
index 7b2b5137e8aa995a08ee1597680698c5b34186c9..ed20a7fdb620c74a9356a824f135710580a11003 100644 (file)
@@ -107,9 +107,9 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
                ret.Err = fmt.Errorf("error getting ip: %s", err)
                return
        }
-       me.t.cl.mu.Lock()
+       me.t.cl.lock()
        req := me.t.announceRequest()
-       me.t.cl.mu.Unlock()
+       me.t.cl.unlock()
        res, err := tracker.Announce{
                HttpClient: me.t.cl.config.TrackerHttpClient,
                UserAgent:  me.t.cl.config.HTTPUserAgent,
@@ -133,24 +133,24 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
 func (me *trackerScraper) Run() {
        for {
                select {
-               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+               case <-me.t.closed.LockedChan(me.t.cl.locker()):
                        return
-               case <-me.stop.LockedChan(&me.t.cl.mu):
+               case <-me.stop.LockedChan(me.t.cl.locker()):
                        return
-               case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+               case <-me.t.wantPeersEvent.LockedChan(me.t.cl.locker()):
                }
 
                ar := me.announce()
-               me.t.cl.mu.Lock()
+               me.t.cl.lock()
                me.lastAnnounce = ar
-               me.t.cl.mu.Unlock()
+               me.t.cl.unlock()
 
                intervalChan := time.After(time.Until(ar.Completed.Add(ar.Interval)))
 
                select {
-               case <-me.t.closed.LockedChan(&me.t.cl.mu):
+               case <-me.t.closed.LockedChan(me.t.cl.locker()):
                        return
-               case <-me.stop.LockedChan(&me.t.cl.mu):
+               case <-me.stop.LockedChan(me.t.cl.locker()):
                        return
                case <-intervalChan:
                }