From 6dd3b9c12c523818b35ef513942bb828a1b18dbe Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 25 Jul 2018 13:41:50 +1000 Subject: [PATCH] Law of Demeter Client.mu --- client.go | 129 ++++++++++++++++++++++++++++----------------- client_test.go | 16 +++--- connection.go | 10 ++-- connection_test.go | 6 +-- file.go | 14 ++--- piece.go | 8 +-- portfwd.go | 12 ++--- reader.go | 20 +++---- t.go | 70 ++++++++++++------------ torrent.go | 46 ++++++++-------- torrent_test.go | 8 +-- tracker_scraper.go | 18 +++---- 12 files changed, 193 insertions(+), 164 deletions(-) diff --git a/client.go b/client.go index ec388cea..09226f54 100644 --- a/client.go +++ b/client.go @@ -41,8 +41,9 @@ import ( type Client struct { // An aggregate of stats over all connections. First in struct to ensure // 64-bit alignment of fields. See #262. - stats ConnStats - mu sync.RWMutex + stats ConnStats + + _mu sync.RWMutex event sync.Cond closed missinggo.Event @@ -71,8 +72,8 @@ type Client struct { type ipStr string func (cl *Client) BadPeerIPs() []string { - cl.mu.RLock() - defer cl.mu.RUnlock() + cl.rLock() + defer cl.rUnlock() return cl.badPeerIPsLocked() } @@ -117,8 +118,8 @@ func writeDhtServerStatus(w io.Writer, s *dht.Server) { // Writes out a human readable status of the client, such as for writing to a // HTTP status page. func (cl *Client) WriteStatus(_w io.Writer) { - cl.mu.RLock() - defer cl.mu.RUnlock() + cl.rLock() + defer cl.rUnlock() w := bufio.NewWriter(_w) defer w.Flush() fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort()) @@ -193,7 +194,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { cl.Close() }() cl.extensionBytes = defaultPeerExtensionBytes() - cl.event.L = &cl.mu + cl.event.L = cl.locker() storageImpl := cfg.DefaultStorage if storageImpl == nil { // We'd use mmap but HFS+ doesn't support sparse files. @@ -291,8 +292,8 @@ func firstNonEmptyString(ss ...string) string { } func (cl *Client) Closed() <-chan struct{} { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() return cl.closed.C() } @@ -313,8 +314,8 @@ func (cl *Client) closeSockets() { // Stops the client. All connections to peers are closed and all activity will // come to a halt. func (cl *Client) Close() { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() cl.closed.Set() cl.eachDhtServer(func(s *dht.Server) { s.Close() }) cl.closeSockets() @@ -375,13 +376,13 @@ func (cl *Client) acceptConnections(l net.Listener) { for { conn, err := l.Accept() conn = pproffd.WrapNetConn(conn) - cl.mu.RLock() + cl.rLock() closed := cl.closed.IsSet() reject := false if conn != nil { reject = cl.rejectAccepted(conn) } - cl.mu.RUnlock() + cl.rUnlock() if closed { if conn != nil { conn.Close() @@ -418,8 +419,8 @@ func (cl *Client) incomingConnection(nc net.Conn) { // Returns a handle to the given torrent, if it's present in the client. func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() t, ok = cl.torrents[ih] return } @@ -513,8 +514,8 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn { }() } func() { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() cl.eachListener(func(s socket) bool { if peerNetworkEnabled(s.Addr().Network(), cl.config) { dial(s.dial) @@ -594,8 +595,8 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C // for valid reasons. func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) { ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration { - cl.mu.RLock() - defer cl.mu.RUnlock() + cl.rLock() + defer cl.rUnlock() return t.dialTimeout() }()) defer cancel() @@ -628,8 +629,8 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, // considered half-open. func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) { c, err := cl.establishOutgoingConn(t, addr) - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() // Don't release lock between here and addConnection, unless it's for // failure. cl.noLongerHalfOpen(t, addr) @@ -688,8 +689,8 @@ func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err er // Calls f with any secret keys. func (cl *Client) forSkeys(f func([]byte) bool) { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() for ih := range cl.torrents { if !f(ih[:]) { break @@ -721,9 +722,9 @@ func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) { if !ok { return } - cl.mu.Lock() + cl.lock() t = cl.torrents[ih] - cl.mu.Unlock() + cl.unlock() return } @@ -755,21 +756,21 @@ func (cl *Client) runReceivedConn(c *connection) { "network", c.remoteAddr().Network(), ).Log(cl.logger) torrent.Add("error receiving handshake", 1) - cl.mu.Lock() + cl.lock() cl.onBadAccept(c.remoteAddr()) - cl.mu.Unlock() + cl.unlock() return } if t == nil { torrent.Add("received handshake for unloaded torrent", 1) - cl.mu.Lock() + cl.lock() cl.onBadAccept(c.remoteAddr()) - cl.mu.Unlock() + cl.unlock() return } torrent.Add("received handshake for loaded torrent", 1) - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() cl.runHandshookConn(c, t) } @@ -958,7 +959,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( networkingEnabled: true, requestStrategy: 3, metadataChanged: sync.Cond{ - L: &cl.mu, + L: cl.locker(), }, duplicateRequestTimeout: 1 * time.Second, } @@ -983,8 +984,8 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo // If the torrent already exists then this Storage is ignored and the // existing torrent returned with `new` set to `false` func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() t, ok := cl.torrents[infoHash] if ok { return @@ -1020,8 +1021,8 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e return } } - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() if spec.ChunkSize != 0 { t.setChunkSize(pp.Integer(spec.ChunkSize)) } @@ -1059,8 +1060,8 @@ func (cl *Client) allTorrentsCompleted() bool { // Returns true when all torrents are completely downloaded and false if the // client is stopped before that. func (cl *Client) WaitAll() bool { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() for !cl.allTorrentsCompleted() { if cl.closed.IsSet() { return false @@ -1072,8 +1073,8 @@ func (cl *Client) WaitAll() bool { // Returns handles to all the torrents loaded in the Client. func (cl *Client) Torrents() []*Torrent { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() return cl.torrentsAsSlice() } @@ -1149,7 +1150,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { PeerMaxRequests: 250, writeBuffer: new(bytes.Buffer), } - c.writerCond.L = &cl.mu + c.writerCond.L = cl.locker() c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, @@ -1159,8 +1160,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { } func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() t := cl.torrent(ih) if t == nil { return @@ -1224,8 +1225,8 @@ func (cl *Client) publicAddr(peer net.IP) ipPort { } func (cl *Client) ListenAddrs() (ret []net.Addr) { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() cl.eachListener(func(l socket) bool { ret = append(ret, l.Addr()) return true @@ -1255,12 +1256,12 @@ func (cl *Client) clearAcceptLimits() { func (cl *Client) acceptLimitClearer() { for { select { - case <-cl.closed.LockedChan(&cl.mu): + case <-cl.closed.LockedChan(cl.locker()): return case <-time.After(15 * time.Minute): - cl.mu.Lock() + cl.lock() cl.clearAcceptLimits() - cl.mu.Unlock() + cl.unlock() } } } @@ -1271,3 +1272,35 @@ func (cl *Client) rateLimitAccept(ip net.IP) bool { } return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0 } + +func (cl *Client) rLock() { + cl._mu.RLock() +} + +func (cl *Client) rUnlock() { + cl._mu.RUnlock() +} + +func (cl *Client) lock() { + cl._mu.Lock() +} + +func (cl *Client) unlock() { + cl._mu.Unlock() +} + +func (cl *Client) locker() sync.Locker { + return clientLocker{cl} +} + +type clientLocker struct { + *Client +} + +func (cl clientLocker) Lock() { + cl.lock() +} + +func (cl clientLocker) Unlock() { + cl.unlock() +} diff --git a/client_test.go b/client_test.go index d8ada22e..f36db552 100644 --- a/client_test.go +++ b/client_test.go @@ -112,15 +112,15 @@ func TestTorrentInitialState(t *testing.T) { storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()), ) tor.setChunkSize(2) - tor.cl.mu.Lock() + tor.cl.lock() err := tor.setInfoBytes(mi.InfoBytes) - tor.cl.mu.Unlock() + tor.cl.unlock() require.NoError(t, err) require.Len(t, tor.pieces, 3) tor.pendAllChunkSpecs(0) - tor.cl.mu.Lock() + tor.cl.lock() assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) - tor.cl.mu.Unlock() + tor.cl.unlock() assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } @@ -782,12 +782,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { psc := leecherGreeting.SubscribePieceStateChanges() defer psc.Close() - leecherGreeting.cl.mu.Lock() + leecherGreeting.cl.lock() leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces()) if ps.Cancel { leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces()) } - leecherGreeting.cl.mu.Unlock() + leecherGreeting.cl.unlock() done := make(chan struct{}) defer close(done) go leecherGreeting.AddClientPeer(seeder) @@ -909,9 +909,9 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) { func totalConns(tts []*Torrent) (ret int) { for _, tt := range tts { - tt.cl.mu.Lock() + tt.cl.lock() ret += len(tt.conns) - tt.cl.mu.Unlock() + tt.cl.unlock() } return } diff --git a/connection.go b/connection.go index 883d6c8e..563961f5 100644 --- a/connection.go +++ b/connection.go @@ -177,7 +177,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) { } func (cn *connection) mu() sync.Locker { - return &cn.t.cl.mu + return cn.t.cl.locker() } func (cn *connection) remoteAddr() net.Addr { @@ -1084,8 +1084,8 @@ func (c *connection) mainReadLoop() (err error) { for { var msg pp.Message func() { - cl.mu.Unlock() - defer cl.mu.Lock() + cl.unlock() + defer cl.lock() err = decoder.Decode(&msg) }() if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF { @@ -1324,8 +1324,8 @@ func (c *connection) receiveChunk(msg *pp.Message) error { } err := func() error { - cl.mu.Unlock() - defer cl.mu.Lock() + cl.unlock() + defer cl.lock() // Write the chunk out. Note that the upper bound on chunk writing // concurrency will be the number of connections. We write inline with // receiving the chunk (with this lock dance), because we want to diff --git a/connection_test.go b/connection_test.go index a6c3f318..949c55f4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -114,7 +114,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { Piece: make([]byte, defaultChunkSize), } go func() { - cl.mu.Lock() + cl.lock() err := cn.mainReadLoop() if err != nil { mrlErr <- err @@ -127,12 +127,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { defer w.Close() ts.writeSem.Lock() for range iter.N(b.N) { - cl.mu.Lock() + cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. t.pieces[0].dirtyChunks.Clear() cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}} - cl.mu.Unlock() + cl.unlock() n, err := w.Write(wb) require.NoError(b, err) require.EqualValues(b, len(wb), n) diff --git a/file.go b/file.go index 6c728d15..4074700a 100644 --- a/file.go +++ b/file.go @@ -59,8 +59,8 @@ type FilePieceState struct { // Returns the state of pieces in this file. func (f *File) State() (ret []FilePieceState) { - f.t.cl.mu.RLock() - defer f.t.cl.mu.RUnlock() + f.t.cl.rLock() + defer f.t.cl.rUnlock() pieceSize := int64(f.t.usualPieceSize()) off := f.offset % pieceSize remaining := f.length @@ -102,7 +102,7 @@ func (f *File) Cancel() { func (f *File) NewReader() Reader { tr := reader{ - mu: &f.t.cl.mu, + mu: f.t.cl.locker(), t: f.t, readahead: 5 * 1024 * 1024, offset: f.Offset(), @@ -114,8 +114,8 @@ func (f *File) NewReader() Reader { // Sets the minimum priority for pieces in the File. func (f *File) SetPriority(prio piecePriority) { - f.t.cl.mu.Lock() - defer f.t.cl.mu.Unlock() + f.t.cl.lock() + defer f.t.cl.unlock() if prio == f.prio { return } @@ -125,8 +125,8 @@ func (f *File) SetPriority(prio piecePriority) { // Returns the priority per File.SetPriority. func (f *File) Priority() piecePriority { - f.t.cl.mu.Lock() - defer f.t.cl.mu.Unlock() + f.t.cl.lock() + defer f.t.cl.unlock() return f.prio } diff --git a/piece.go b/piece.go index 9729c44c..6eef0cc1 100644 --- a/piece.go +++ b/piece.go @@ -180,8 +180,8 @@ func (p *Piece) bytesLeft() (ret pp.Integer) { } func (p *Piece) VerifyData() { - p.t.cl.mu.Lock() - defer p.t.cl.mu.Unlock() + p.t.cl.lock() + defer p.t.cl.unlock() target := p.numVerifies + 1 if p.hashing { target++ @@ -208,8 +208,8 @@ func (p *Piece) torrentEndOffset() int64 { } func (p *Piece) SetPriority(prio piecePriority) { - p.t.cl.mu.Lock() - defer p.t.cl.mu.Unlock() + p.t.cl.lock() + defer p.t.cl.unlock() p.priority = prio p.t.updatePiecePriority(p.index) } diff --git a/portfwd.go b/portfwd.go index 03a5c7bc..27f8549e 100644 --- a/portfwd.go +++ b/portfwd.go @@ -20,20 +20,20 @@ func addPortMapping(d upnp.Device, proto upnp.Protocol, internalPort int, debug } func (cl *Client) forwardPort() { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() if cl.config.NoDefaultPortForwarding { return } - cl.mu.Unlock() + cl.unlock() ds := upnp.Discover(0, 2*time.Second) - cl.mu.Lock() + cl.lock() flog.Default.Handle(flog.Fmsg("discovered %d upnp devices", len(ds))) port := cl.incomingPeerPort() - cl.mu.Unlock() + cl.unlock() for _, d := range ds { go addPortMapping(d, upnp.TCP, port, cl.config.Debug) go addPortMapping(d, upnp.UDP, port, cl.config.Debug) } - cl.mu.Lock() + cl.lock() } diff --git a/reader.go b/reader.go index d65a18f5..07eefc87 100644 --- a/reader.go +++ b/reader.go @@ -69,8 +69,8 @@ func (r *reader) SetReadahead(readahead int64) { r.mu.Lock() r.readahead = readahead r.mu.Unlock() - r.t.cl.mu.Lock() - defer r.t.cl.mu.Unlock() + r.t.cl.lock() + defer r.t.cl.unlock() r.posChanged() } @@ -146,10 +146,10 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { defer cancel() go func() { <-ctx.Done() - r.t.cl.mu.Lock() + r.t.cl.lock() ctxErr = ctx.Err() r.t.tickleReaders() - r.t.cl.mu.Unlock() + r.t.cl.unlock() }() } // Hmmm, if a Read gets stuck, this means you can't change position for @@ -183,8 +183,8 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // Wait until some data should be available to read. Tickles the client if it // isn't. Returns how much should be readable without blocking. func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { - r.t.cl.mu.Lock() - defer r.t.cl.mu.Unlock() + r.t.cl.lock() + defer r.t.cl.unlock() for !r.readable(pos) && *ctxErr == nil { r.waitReadable(pos) } @@ -222,19 +222,19 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro err = nil return } - r.t.cl.mu.Lock() + r.t.cl.lock() // TODO: Just reset pieces in the readahead window. This might help // prevent thrashing with small caches and file and piece priorities. log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err) r.t.updateAllPieceCompletions() r.t.updateAllPiecePriorities() - r.t.cl.mu.Unlock() + r.t.cl.unlock() } } func (r *reader) Close() error { - r.t.cl.mu.Lock() - defer r.t.cl.mu.Unlock() + r.t.cl.lock() + defer r.t.cl.unlock() r.t.deleteReader(r) return nil } diff --git a/t.go b/t.go index c9886e5c..c66935c2 100644 --- a/t.go +++ b/t.go @@ -17,15 +17,15 @@ func (t *Torrent) InfoHash() metainfo.Hash { // Returns a channel that is closed when the info (.Info()) for the torrent // has become available. func (t *Torrent) GotInfo() <-chan struct{} { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.gotMetainfo.C() } // Returns the metainfo info dictionary, or nil if it's not yet available. func (t *Torrent) Info() *metainfo.Info { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.info } @@ -33,7 +33,7 @@ func (t *Torrent) Info() *metainfo.Info { // the data requested is actually available. func (t *Torrent) NewReader() Reader { r := reader{ - mu: &t.cl.mu, + mu: t.cl.locker(), t: t, readahead: 5 * 1024 * 1024, length: *t.length, @@ -46,14 +46,14 @@ func (t *Torrent) NewReader() Reader { // same state. The sum of the state run lengths is the number of pieces // in the torrent. func (t *Torrent) PieceStateRuns() []PieceStateRun { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.pieceStateRuns() } func (t *Torrent) PieceState(piece pieceIndex) PieceState { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.pieceState(piece) } @@ -65,8 +65,8 @@ func (t *Torrent) NumPieces() pieceIndex { // Get missing bytes count for specific piece. func (t *Torrent) PieceBytesMissing(piece int) int64 { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return int64(t.pieces[piece].bytesLeft()) } @@ -75,9 +75,9 @@ func (t *Torrent) PieceBytesMissing(piece int) int64 { // this. No data corruption can, or should occur to either the torrent's data, // or connected peers. func (t *Torrent) Drop() { - t.cl.mu.Lock() + t.cl.lock() t.cl.dropTorrent(t.infoHash) - t.cl.mu.Unlock() + t.cl.unlock() } // Number of bytes of the entire torrent we have completed. This is the sum of @@ -85,8 +85,8 @@ func (t *Torrent) Drop() { // for download rate, as it can go down when pieces are lost or fail checks. // Sample Torrent.Stats.DataBytesRead for actual file data download rate. func (t *Torrent) BytesCompleted() int64 { - t.cl.mu.RLock() - defer t.cl.mu.RUnlock() + t.cl.rLock() + defer t.cl.rUnlock() return t.bytesCompleted() } @@ -99,24 +99,24 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription { // Returns true if the torrent is currently being seeded. This occurs when the // client is willing to upload without wanting anything in return. func (t *Torrent) Seeding() bool { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.seeding() } // Clobbers the torrent display name. The display name is used as the torrent // name if the metainfo is not available. func (t *Torrent) SetDisplayName(dn string) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() t.setDisplayName(dn) } // The current working name for the torrent. Either the name in the info dict, // or a display name given such as by the dn value in a magnet link, or "". func (t *Torrent) Name() string { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.name() } @@ -129,14 +129,14 @@ func (t *Torrent) Length() int64 { // Returns a run-time generated metainfo for the torrent that includes the // info bytes and announce-list as currently known to the client. func (t *Torrent) Metainfo() metainfo.MetaInfo { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.newMetaInfo() } func (t *Torrent) addReader(r *reader) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() if t.readers == nil { t.readers = make(map[*reader]struct{}) } @@ -153,8 +153,8 @@ func (t *Torrent) deleteReader(r *reader) { // priority. Piece indexes are not the same as bytes. Requires that the info // has been obtained, see Torrent.Info and Torrent.GotInfo. func (t *Torrent) DownloadPieces(begin, end pieceIndex) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() t.downloadPiecesLocked(begin, end) } @@ -167,8 +167,8 @@ func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) { } func (t *Torrent) CancelPieces(begin, end pieceIndex) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() t.cancelPiecesLocked(begin, end) } @@ -208,8 +208,8 @@ func (t *Torrent) Files() []*File { func (t *Torrent) AddPeers(pp []Peer) { cl := t.cl - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() t.addPeers(pp) } @@ -228,13 +228,13 @@ func (t *Torrent) String() string { } func (t *Torrent) AddTrackers(announceList [][]string) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() t.addTrackers(announceList) } func (t *Torrent) Piece(i pieceIndex) *Piece { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return &t.pieces[i] } diff --git a/torrent.go b/torrent.go index 9aebad90..0b13fe25 100644 --- a/torrent.go +++ b/torrent.go @@ -155,7 +155,7 @@ func (t *Torrent) tickleReaders() { // Returns a channel that is closed when the Torrent is closed. func (t *Torrent) Closed() <-chan struct{} { - return t.closed.LockedChan(&t.cl.mu) + return t.closed.LockedChan(t.cl.locker()) } // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active, @@ -640,8 +640,8 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } func (t *Torrent) BytesMissing() int64 { - t.mu().RLock() - defer t.mu().RUnlock() + t.cl.rLock() + defer t.cl.rUnlock() return t.bytesMissingLocked() } @@ -1210,8 +1210,8 @@ func (t *Torrent) bytesCompleted() int64 { } func (t *Torrent) SetInfoBytes(b []byte) (err error) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() return t.setInfoBytes(b) } @@ -1388,14 +1388,14 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) { }).String() allAddrs[key] = struct{}{} } - cl.mu.Lock() + cl.lock() t.addPeers(addPeers) numPeers := t.peers.Len() - cl.mu.Unlock() + cl.unlock() if numPeers >= cl.config.TorrentPeersHighWater { return } - case <-t.closed.LockedChan(&cl.mu): + case <-t.closed.LockedChan(cl.locker()): return } } @@ -1416,14 +1416,14 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { cl := t.cl for { select { - case <-t.wantPeersEvent.LockedChan(&cl.mu): - case <-t.closed.LockedChan(&cl.mu): + case <-t.wantPeersEvent.LockedChan(cl.locker()): + case <-t.closed.LockedChan(cl.locker()): return } err := t.announceDHT(true, s) func() { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() if err == nil { t.numDHTAnnounces++ } else { @@ -1431,7 +1431,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) { } }() select { - case <-t.closed.LockedChan(&cl.mu): + case <-t.closed.LockedChan(cl.locker()): return case <-time.After(5 * time.Minute): } @@ -1445,8 +1445,8 @@ func (t *Torrent) addPeers(peers []Peer) { } func (t *Torrent) Stats() TorrentStats { - t.cl.mu.RLock() - defer t.cl.mu.RUnlock() + t.cl.rLock() + defer t.cl.rUnlock() return t.statsLocked() } @@ -1561,8 +1561,8 @@ func (t *Torrent) wantConns() bool { } func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() + t.cl.lock() + defer t.cl.unlock() oldMax = t.maxEstablishedConns t.maxEstablishedConns = max wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn) @@ -1573,10 +1573,6 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { return oldMax } -func (t *Torrent) mu() missinggo.RWLocker { - return &t.cl.mu -} - func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger) if t.closed.IsSet() { @@ -1679,8 +1675,8 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { func (t *Torrent) verifyPiece(piece pieceIndex) { cl := t.cl - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() p := &t.pieces[piece] defer func() { p.numVerifies++ @@ -1700,10 +1696,10 @@ func (t *Torrent) verifyPiece(piece pieceIndex) { t.publishPieceChange(piece) t.updatePiecePriority(piece) t.storageLock.RLock() - cl.mu.Unlock() + cl.unlock() sum := t.hashPiece(piece) t.storageLock.RUnlock() - cl.mu.Lock() + cl.lock() p.hashing = false t.updatePiecePriority(piece) t.pieceHashed(piece, sum == p.hash) diff --git a/torrent_test.go b/torrent_test.go index e10cc528..22053794 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -154,13 +154,13 @@ func TestPieceHashFailed(t *testing.T) { tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{}) tt.setChunkSize(2) require.NoError(t, tt.setInfoBytes(mi.InfoBytes)) - tt.cl.mu.Lock() + tt.cl.lock() tt.pieces[1].dirtyChunks.AddRange(0, 3) require.True(t, tt.pieceAllDirty(1)) tt.pieceHashed(1, false) // Dirty chunks should be cleared so we can try again. require.False(t, tt.pieceAllDirty(1)) - tt.cl.mu.Unlock() + tt.cl.unlock() } // Check the behaviour of Torrent.Metainfo when metadata is not completed. @@ -194,8 +194,8 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { assert.EqualValues(t, 0, tt.metadataSize()) func() { - cl.mu.Lock() - defer cl.mu.Unlock() + cl.lock() + defer cl.unlock() go func() { _, err = nc.Write(pp.Message{ Type: pp.Extended, diff --git a/tracker_scraper.go b/tracker_scraper.go index 7b2b5137..ed20a7fd 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -107,9 +107,9 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) { ret.Err = fmt.Errorf("error getting ip: %s", err) return } - me.t.cl.mu.Lock() + me.t.cl.lock() req := me.t.announceRequest() - me.t.cl.mu.Unlock() + me.t.cl.unlock() res, err := tracker.Announce{ HttpClient: me.t.cl.config.TrackerHttpClient, UserAgent: me.t.cl.config.HTTPUserAgent, @@ -133,24 +133,24 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) { func (me *trackerScraper) Run() { for { select { - case <-me.t.closed.LockedChan(&me.t.cl.mu): + case <-me.t.closed.LockedChan(me.t.cl.locker()): return - case <-me.stop.LockedChan(&me.t.cl.mu): + case <-me.stop.LockedChan(me.t.cl.locker()): return - case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu): + case <-me.t.wantPeersEvent.LockedChan(me.t.cl.locker()): } ar := me.announce() - me.t.cl.mu.Lock() + me.t.cl.lock() me.lastAnnounce = ar - me.t.cl.mu.Unlock() + me.t.cl.unlock() intervalChan := time.After(time.Until(ar.Completed.Add(ar.Interval))) select { - case <-me.t.closed.LockedChan(&me.t.cl.mu): + case <-me.t.closed.LockedChan(me.t.cl.locker()): return - case <-me.stop.LockedChan(&me.t.cl.mu): + case <-me.stop.LockedChan(me.t.cl.locker()): return case <-intervalChan: } -- 2.44.0