]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Law of Demeter Client.mu
[btrtrc.git] / client.go
index ec388ceacb9c605c074af17756741e5c4ecba271..09226f5452e611ecdee716b96a0e362b2844b053 100644 (file)
--- a/client.go
+++ b/client.go
@@ -41,8 +41,9 @@ import (
 type Client struct {
        // An aggregate of stats over all connections. First in struct to ensure
        // 64-bit alignment of fields. See #262.
-       stats  ConnStats
-       mu     sync.RWMutex
+       stats ConnStats
+
+       _mu    sync.RWMutex
        event  sync.Cond
        closed missinggo.Event
 
@@ -71,8 +72,8 @@ type Client struct {
 type ipStr string
 
 func (cl *Client) BadPeerIPs() []string {
-       cl.mu.RLock()
-       defer cl.mu.RUnlock()
+       cl.rLock()
+       defer cl.rUnlock()
        return cl.badPeerIPsLocked()
 }
 
@@ -117,8 +118,8 @@ func writeDhtServerStatus(w io.Writer, s *dht.Server) {
 // Writes out a human readable status of the client, such as for writing to a
 // HTTP status page.
 func (cl *Client) WriteStatus(_w io.Writer) {
-       cl.mu.RLock()
-       defer cl.mu.RUnlock()
+       cl.rLock()
+       defer cl.rUnlock()
        w := bufio.NewWriter(_w)
        defer w.Flush()
        fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
@@ -193,7 +194,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                cl.Close()
        }()
        cl.extensionBytes = defaultPeerExtensionBytes()
-       cl.event.L = &cl.mu
+       cl.event.L = cl.locker()
        storageImpl := cfg.DefaultStorage
        if storageImpl == nil {
                // We'd use mmap but HFS+ doesn't support sparse files.
@@ -291,8 +292,8 @@ func firstNonEmptyString(ss ...string) string {
 }
 
 func (cl *Client) Closed() <-chan struct{} {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        return cl.closed.C()
 }
 
@@ -313,8 +314,8 @@ func (cl *Client) closeSockets() {
 // Stops the client. All connections to peers are closed and all activity will
 // come to a halt.
 func (cl *Client) Close() {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.closed.Set()
        cl.eachDhtServer(func(s *dht.Server) { s.Close() })
        cl.closeSockets()
@@ -375,13 +376,13 @@ func (cl *Client) acceptConnections(l net.Listener) {
        for {
                conn, err := l.Accept()
                conn = pproffd.WrapNetConn(conn)
-               cl.mu.RLock()
+               cl.rLock()
                closed := cl.closed.IsSet()
                reject := false
                if conn != nil {
                        reject = cl.rejectAccepted(conn)
                }
-               cl.mu.RUnlock()
+               cl.rUnlock()
                if closed {
                        if conn != nil {
                                conn.Close()
@@ -418,8 +419,8 @@ func (cl *Client) incomingConnection(nc net.Conn) {
 
 // Returns a handle to the given torrent, if it's present in the client.
 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t, ok = cl.torrents[ih]
        return
 }
@@ -513,8 +514,8 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
                }()
        }
        func() {
-               cl.mu.Lock()
-               defer cl.mu.Unlock()
+               cl.lock()
+               defer cl.unlock()
                cl.eachListener(func(s socket) bool {
                        if peerNetworkEnabled(s.Addr().Network(), cl.config) {
                                dial(s.dial)
@@ -594,8 +595,8 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C
 // for valid reasons.
 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
        ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
-               cl.mu.RLock()
-               defer cl.mu.RUnlock()
+               cl.rLock()
+               defer cl.rUnlock()
                return t.dialTimeout()
        }())
        defer cancel()
@@ -628,8 +629,8 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
 // considered half-open.
 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
        c, err := cl.establishOutgoingConn(t, addr)
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        // Don't release lock between here and addConnection, unless it's for
        // failure.
        cl.noLongerHalfOpen(t, addr)
@@ -688,8 +689,8 @@ func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err er
 
 // Calls f with any secret keys.
 func (cl *Client) forSkeys(f func([]byte) bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        for ih := range cl.torrents {
                if !f(ih[:]) {
                        break
@@ -721,9 +722,9 @@ func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
        if !ok {
                return
        }
-       cl.mu.Lock()
+       cl.lock()
        t = cl.torrents[ih]
-       cl.mu.Unlock()
+       cl.unlock()
        return
 }
 
@@ -755,21 +756,21 @@ func (cl *Client) runReceivedConn(c *connection) {
                        "network", c.remoteAddr().Network(),
                ).Log(cl.logger)
                torrent.Add("error receiving handshake", 1)
-               cl.mu.Lock()
+               cl.lock()
                cl.onBadAccept(c.remoteAddr())
-               cl.mu.Unlock()
+               cl.unlock()
                return
        }
        if t == nil {
                torrent.Add("received handshake for unloaded torrent", 1)
-               cl.mu.Lock()
+               cl.lock()
                cl.onBadAccept(c.remoteAddr())
-               cl.mu.Unlock()
+               cl.unlock()
                return
        }
        torrent.Add("received handshake for loaded torrent", 1)
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.runHandshookConn(c, t)
 }
 
@@ -958,7 +959,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                networkingEnabled: true,
                requestStrategy:   3,
                metadataChanged: sync.Cond{
-                       L: &cl.mu,
+                       L: cl.locker(),
                },
                duplicateRequestTimeout: 1 * time.Second,
        }
@@ -983,8 +984,8 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
 // If the torrent already exists then this Storage is ignored and the
 // existing torrent returned with `new` set to `false`
 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t, ok := cl.torrents[infoHash]
        if ok {
                return
@@ -1020,8 +1021,8 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e
                        return
                }
        }
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        if spec.ChunkSize != 0 {
                t.setChunkSize(pp.Integer(spec.ChunkSize))
        }
@@ -1059,8 +1060,8 @@ func (cl *Client) allTorrentsCompleted() bool {
 // Returns true when all torrents are completely downloaded and false if the
 // client is stopped before that.
 func (cl *Client) WaitAll() bool {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        for !cl.allTorrentsCompleted() {
                if cl.closed.IsSet() {
                        return false
@@ -1072,8 +1073,8 @@ func (cl *Client) WaitAll() bool {
 
 // Returns handles to all the torrents loaded in the Client.
 func (cl *Client) Torrents() []*Torrent {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        return cl.torrentsAsSlice()
 }
 
@@ -1149,7 +1150,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
                PeerMaxRequests: 250,
                writeBuffer:     new(bytes.Buffer),
        }
-       c.writerCond.L = &cl.mu
+       c.writerCond.L = cl.locker()
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
@@ -1159,8 +1160,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
 }
 
 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        t := cl.torrent(ih)
        if t == nil {
                return
@@ -1224,8 +1225,8 @@ func (cl *Client) publicAddr(peer net.IP) ipPort {
 }
 
 func (cl *Client) ListenAddrs() (ret []net.Addr) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
+       cl.lock()
+       defer cl.unlock()
        cl.eachListener(func(l socket) bool {
                ret = append(ret, l.Addr())
                return true
@@ -1255,12 +1256,12 @@ func (cl *Client) clearAcceptLimits() {
 func (cl *Client) acceptLimitClearer() {
        for {
                select {
-               case <-cl.closed.LockedChan(&cl.mu):
+               case <-cl.closed.LockedChan(cl.locker()):
                        return
                case <-time.After(15 * time.Minute):
-                       cl.mu.Lock()
+                       cl.lock()
                        cl.clearAcceptLimits()
-                       cl.mu.Unlock()
+                       cl.unlock()
                }
        }
 }
@@ -1271,3 +1272,35 @@ func (cl *Client) rateLimitAccept(ip net.IP) bool {
        }
        return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
 }
+
+func (cl *Client) rLock() {
+       cl._mu.RLock()
+}
+
+func (cl *Client) rUnlock() {
+       cl._mu.RUnlock()
+}
+
+func (cl *Client) lock() {
+       cl._mu.Lock()
+}
+
+func (cl *Client) unlock() {
+       cl._mu.Unlock()
+}
+
+func (cl *Client) locker() sync.Locker {
+       return clientLocker{cl}
+}
+
+type clientLocker struct {
+       *Client
+}
+
+func (cl clientLocker) Lock() {
+       cl.lock()
+}
+
+func (cl clientLocker) Unlock() {
+       cl.unlock()
+}