type Client struct {
// An aggregate of stats over all connections. First in struct to ensure
// 64-bit alignment of fields. See #262.
- stats ConnStats
- mu sync.RWMutex
+ stats ConnStats
+
+ _mu sync.RWMutex
event sync.Cond
closed missinggo.Event
type ipStr string
func (cl *Client) BadPeerIPs() []string {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
return cl.badPeerIPsLocked()
}
// Writes out a human readable status of the client, such as for writing to a
// HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
w := bufio.NewWriter(_w)
defer w.Flush()
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
cl.Close()
}()
cl.extensionBytes = defaultPeerExtensionBytes()
- cl.event.L = &cl.mu
+ cl.event.L = cl.locker()
storageImpl := cfg.DefaultStorage
if storageImpl == nil {
// We'd use mmap but HFS+ doesn't support sparse files.
}
func (cl *Client) Closed() <-chan struct{} {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
return cl.closed.C()
}
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (cl *Client) Close() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.closed.Set()
cl.eachDhtServer(func(s *dht.Server) { s.Close() })
cl.closeSockets()
for {
conn, err := l.Accept()
conn = pproffd.WrapNetConn(conn)
- cl.mu.RLock()
+ cl.rLock()
closed := cl.closed.IsSet()
reject := false
if conn != nil {
reject = cl.rejectAccepted(conn)
}
- cl.mu.RUnlock()
+ cl.rUnlock()
if closed {
if conn != nil {
conn.Close()
// Returns a handle to the given torrent, if it's present in the client.
func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t, ok = cl.torrents[ih]
return
}
}()
}
func() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.eachListener(func(s socket) bool {
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
dial(s.dial)
// for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
return t.dialTimeout()
}())
defer cancel()
// considered half-open.
func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
c, err := cl.establishOutgoingConn(t, addr)
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
// Don't release lock between here and addConnection, unless it's for
// failure.
cl.noLongerHalfOpen(t, addr)
// Calls f with any secret keys.
func (cl *Client) forSkeys(f func([]byte) bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
for ih := range cl.torrents {
if !f(ih[:]) {
break
if !ok {
return
}
- cl.mu.Lock()
+ cl.lock()
t = cl.torrents[ih]
- cl.mu.Unlock()
+ cl.unlock()
return
}
"network", c.remoteAddr().Network(),
).Log(cl.logger)
torrent.Add("error receiving handshake", 1)
- cl.mu.Lock()
+ cl.lock()
cl.onBadAccept(c.remoteAddr())
- cl.mu.Unlock()
+ cl.unlock()
return
}
if t == nil {
torrent.Add("received handshake for unloaded torrent", 1)
- cl.mu.Lock()
+ cl.lock()
cl.onBadAccept(c.remoteAddr())
- cl.mu.Unlock()
+ cl.unlock()
return
}
torrent.Add("received handshake for loaded torrent", 1)
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.runHandshookConn(c, t)
}
networkingEnabled: true,
requestStrategy: 3,
metadataChanged: sync.Cond{
- L: &cl.mu,
+ L: cl.locker(),
},
duplicateRequestTimeout: 1 * time.Second,
}
// If the torrent already exists then this Storage is ignored and the
// existing torrent returned with `new` set to `false`
func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t, ok := cl.torrents[infoHash]
if ok {
return
return
}
}
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
if spec.ChunkSize != 0 {
t.setChunkSize(pp.Integer(spec.ChunkSize))
}
// Returns true when all torrents are completely downloaded and false if the
// client is stopped before that.
func (cl *Client) WaitAll() bool {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
for !cl.allTorrentsCompleted() {
if cl.closed.IsSet() {
return false
// Returns handles to all the torrents loaded in the Client.
func (cl *Client) Torrents() []*Torrent {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
return cl.torrentsAsSlice()
}
PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer),
}
- c.writerCond.L = &cl.mu
+ c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
}
func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t := cl.torrent(ih)
if t == nil {
return
}
func (cl *Client) ListenAddrs() (ret []net.Addr) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.eachListener(func(l socket) bool {
ret = append(ret, l.Addr())
return true
func (cl *Client) acceptLimitClearer() {
for {
select {
- case <-cl.closed.LockedChan(&cl.mu):
+ case <-cl.closed.LockedChan(cl.locker()):
return
case <-time.After(15 * time.Minute):
- cl.mu.Lock()
+ cl.lock()
cl.clearAcceptLimits()
- cl.mu.Unlock()
+ cl.unlock()
}
}
}
}
return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
}
+
+func (cl *Client) rLock() {
+ cl._mu.RLock()
+}
+
+func (cl *Client) rUnlock() {
+ cl._mu.RUnlock()
+}
+
+func (cl *Client) lock() {
+ cl._mu.Lock()
+}
+
+func (cl *Client) unlock() {
+ cl._mu.Unlock()
+}
+
+func (cl *Client) locker() sync.Locker {
+ return clientLocker{cl}
+}
+
+type clientLocker struct {
+ *Client
+}
+
+func (cl clientLocker) Lock() {
+ cl.lock()
+}
+
+func (cl clientLocker) Unlock() {
+ cl.unlock()
+}