type Client struct {
// An aggregate of stats over all connections. First in struct to ensure
// 64-bit alignment of fields. See #262.
- stats ConnStats
- mu sync.RWMutex
+ stats ConnStats
+
+ _mu sync.RWMutex
event sync.Cond
closed missinggo.Event
type ipStr string
func (cl *Client) BadPeerIPs() []string {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
return cl.badPeerIPsLocked()
}
// Writes out a human readable status of the client, such as for writing to a
// HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
w := bufio.NewWriter(_w)
defer w.Flush()
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
cl.Close()
}()
cl.extensionBytes = defaultPeerExtensionBytes()
- cl.event.L = &cl.mu
+ cl.event.L = cl.locker()
storageImpl := cfg.DefaultStorage
if storageImpl == nil {
// We'd use mmap but HFS+ doesn't support sparse files.
}
func (cl *Client) Closed() <-chan struct{} {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
return cl.closed.C()
}
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (cl *Client) Close() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.closed.Set()
cl.eachDhtServer(func(s *dht.Server) { s.Close() })
cl.closeSockets()
for {
conn, err := l.Accept()
conn = pproffd.WrapNetConn(conn)
- cl.mu.RLock()
+ cl.rLock()
closed := cl.closed.IsSet()
reject := false
if conn != nil {
reject = cl.rejectAccepted(conn)
}
- cl.mu.RUnlock()
+ cl.rUnlock()
if closed {
if conn != nil {
conn.Close()
// Returns a handle to the given torrent, if it's present in the client.
func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t, ok = cl.torrents[ih]
return
}
}()
}
func() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.eachListener(func(s socket) bool {
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
dial(s.dial)
// for valid reasons.
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
+ cl.rLock()
+ defer cl.rUnlock()
return t.dialTimeout()
}())
defer cancel()
// considered half-open.
func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
c, err := cl.establishOutgoingConn(t, addr)
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
// Don't release lock between here and addConnection, unless it's for
// failure.
cl.noLongerHalfOpen(t, addr)
// Calls f with any secret keys.
func (cl *Client) forSkeys(f func([]byte) bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
for ih := range cl.torrents {
if !f(ih[:]) {
break
if !ok {
return
}
- cl.mu.Lock()
+ cl.lock()
t = cl.torrents[ih]
- cl.mu.Unlock()
+ cl.unlock()
return
}
"network", c.remoteAddr().Network(),
).Log(cl.logger)
torrent.Add("error receiving handshake", 1)
- cl.mu.Lock()
+ cl.lock()
cl.onBadAccept(c.remoteAddr())
- cl.mu.Unlock()
+ cl.unlock()
return
}
if t == nil {
torrent.Add("received handshake for unloaded torrent", 1)
- cl.mu.Lock()
+ cl.lock()
cl.onBadAccept(c.remoteAddr())
- cl.mu.Unlock()
+ cl.unlock()
return
}
torrent.Add("received handshake for loaded torrent", 1)
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.runHandshookConn(c, t)
}
networkingEnabled: true,
requestStrategy: 3,
metadataChanged: sync.Cond{
- L: &cl.mu,
+ L: cl.locker(),
},
duplicateRequestTimeout: 1 * time.Second,
}
// If the torrent already exists then this Storage is ignored and the
// existing torrent returned with `new` set to `false`
func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t, ok := cl.torrents[infoHash]
if ok {
return
return
}
}
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
if spec.ChunkSize != 0 {
t.setChunkSize(pp.Integer(spec.ChunkSize))
}
// Returns true when all torrents are completely downloaded and false if the
// client is stopped before that.
func (cl *Client) WaitAll() bool {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
for !cl.allTorrentsCompleted() {
if cl.closed.IsSet() {
return false
// Returns handles to all the torrents loaded in the Client.
func (cl *Client) Torrents() []*Torrent {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
return cl.torrentsAsSlice()
}
PeerMaxRequests: 250,
writeBuffer: new(bytes.Buffer),
}
- c.writerCond.L = &cl.mu
+ c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
}
func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t := cl.torrent(ih)
if t == nil {
return
}
func (cl *Client) ListenAddrs() (ret []net.Addr) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
cl.eachListener(func(l socket) bool {
ret = append(ret, l.Addr())
return true
func (cl *Client) acceptLimitClearer() {
for {
select {
- case <-cl.closed.LockedChan(&cl.mu):
+ case <-cl.closed.LockedChan(cl.locker()):
return
case <-time.After(15 * time.Minute):
- cl.mu.Lock()
+ cl.lock()
cl.clearAcceptLimits()
- cl.mu.Unlock()
+ cl.unlock()
}
}
}
}
return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
}
+
+func (cl *Client) rLock() {
+ cl._mu.RLock()
+}
+
+func (cl *Client) rUnlock() {
+ cl._mu.RUnlock()
+}
+
+func (cl *Client) lock() {
+ cl._mu.Lock()
+}
+
+func (cl *Client) unlock() {
+ cl._mu.Unlock()
+}
+
+func (cl *Client) locker() sync.Locker {
+ return clientLocker{cl}
+}
+
+type clientLocker struct {
+ *Client
+}
+
+func (cl clientLocker) Lock() {
+ cl.lock()
+}
+
+func (cl clientLocker) Unlock() {
+ cl.unlock()
+}
storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
)
tor.setChunkSize(2)
- tor.cl.mu.Lock()
+ tor.cl.lock()
err := tor.setInfoBytes(mi.InfoBytes)
- tor.cl.mu.Unlock()
+ tor.cl.unlock()
require.NoError(t, err)
require.Len(t, tor.pieces, 3)
tor.pendAllChunkSpecs(0)
- tor.cl.mu.Lock()
+ tor.cl.lock()
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
- tor.cl.mu.Unlock()
+ tor.cl.unlock()
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}
psc := leecherGreeting.SubscribePieceStateChanges()
defer psc.Close()
- leecherGreeting.cl.mu.Lock()
+ leecherGreeting.cl.lock()
leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
if ps.Cancel {
leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
}
- leecherGreeting.cl.mu.Unlock()
+ leecherGreeting.cl.unlock()
done := make(chan struct{})
defer close(done)
go leecherGreeting.AddClientPeer(seeder)
func totalConns(tts []*Torrent) (ret int) {
for _, tt := range tts {
- tt.cl.mu.Lock()
+ tt.cl.lock()
ret += len(tt.conns)
- tt.cl.mu.Unlock()
+ tt.cl.unlock()
}
return
}
}
func (cn *connection) mu() sync.Locker {
- return &cn.t.cl.mu
+ return cn.t.cl.locker()
}
func (cn *connection) remoteAddr() net.Addr {
for {
var msg pp.Message
func() {
- cl.mu.Unlock()
- defer cl.mu.Lock()
+ cl.unlock()
+ defer cl.lock()
err = decoder.Decode(&msg)
}()
if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
}
err := func() error {
- cl.mu.Unlock()
- defer cl.mu.Lock()
+ cl.unlock()
+ defer cl.lock()
// Write the chunk out. Note that the upper bound on chunk writing
// concurrency will be the number of connections. We write inline with
// receiving the chunk (with this lock dance), because we want to
Piece: make([]byte, defaultChunkSize),
}
go func() {
- cl.mu.Lock()
+ cl.lock()
err := cn.mainReadLoop()
if err != nil {
mrlErr <- err
defer w.Close()
ts.writeSem.Lock()
for range iter.N(b.N) {
- cl.mu.Lock()
+ cl.lock()
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0].dirtyChunks.Clear()
cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
- cl.mu.Unlock()
+ cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)
require.EqualValues(b, len(wb), n)
// Returns the state of pieces in this file.
func (f *File) State() (ret []FilePieceState) {
- f.t.cl.mu.RLock()
- defer f.t.cl.mu.RUnlock()
+ f.t.cl.rLock()
+ defer f.t.cl.rUnlock()
pieceSize := int64(f.t.usualPieceSize())
off := f.offset % pieceSize
remaining := f.length
func (f *File) NewReader() Reader {
tr := reader{
- mu: &f.t.cl.mu,
+ mu: f.t.cl.locker(),
t: f.t,
readahead: 5 * 1024 * 1024,
offset: f.Offset(),
// Sets the minimum priority for pieces in the File.
func (f *File) SetPriority(prio piecePriority) {
- f.t.cl.mu.Lock()
- defer f.t.cl.mu.Unlock()
+ f.t.cl.lock()
+ defer f.t.cl.unlock()
if prio == f.prio {
return
}
// Returns the priority per File.SetPriority.
func (f *File) Priority() piecePriority {
- f.t.cl.mu.Lock()
- defer f.t.cl.mu.Unlock()
+ f.t.cl.lock()
+ defer f.t.cl.unlock()
return f.prio
}
}
func (p *Piece) VerifyData() {
- p.t.cl.mu.Lock()
- defer p.t.cl.mu.Unlock()
+ p.t.cl.lock()
+ defer p.t.cl.unlock()
target := p.numVerifies + 1
if p.hashing {
target++
}
func (p *Piece) SetPriority(prio piecePriority) {
- p.t.cl.mu.Lock()
- defer p.t.cl.mu.Unlock()
+ p.t.cl.lock()
+ defer p.t.cl.unlock()
p.priority = prio
p.t.updatePiecePriority(p.index)
}
}
func (cl *Client) forwardPort() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
if cl.config.NoDefaultPortForwarding {
return
}
- cl.mu.Unlock()
+ cl.unlock()
ds := upnp.Discover(0, 2*time.Second)
- cl.mu.Lock()
+ cl.lock()
flog.Default.Handle(flog.Fmsg("discovered %d upnp devices", len(ds)))
port := cl.incomingPeerPort()
- cl.mu.Unlock()
+ cl.unlock()
for _, d := range ds {
go addPortMapping(d, upnp.TCP, port, cl.config.Debug)
go addPortMapping(d, upnp.UDP, port, cl.config.Debug)
}
- cl.mu.Lock()
+ cl.lock()
}
r.mu.Lock()
r.readahead = readahead
r.mu.Unlock()
- r.t.cl.mu.Lock()
- defer r.t.cl.mu.Unlock()
+ r.t.cl.lock()
+ defer r.t.cl.unlock()
r.posChanged()
}
defer cancel()
go func() {
<-ctx.Done()
- r.t.cl.mu.Lock()
+ r.t.cl.lock()
ctxErr = ctx.Err()
r.t.tickleReaders()
- r.t.cl.mu.Unlock()
+ r.t.cl.unlock()
}()
}
// Hmmm, if a Read gets stuck, this means you can't change position for
// Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking.
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
- r.t.cl.mu.Lock()
- defer r.t.cl.mu.Unlock()
+ r.t.cl.lock()
+ defer r.t.cl.unlock()
for !r.readable(pos) && *ctxErr == nil {
r.waitReadable(pos)
}
err = nil
return
}
- r.t.cl.mu.Lock()
+ r.t.cl.lock()
// TODO: Just reset pieces in the readahead window. This might help
// prevent thrashing with small caches and file and piece priorities.
log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
r.t.updateAllPieceCompletions()
r.t.updateAllPiecePriorities()
- r.t.cl.mu.Unlock()
+ r.t.cl.unlock()
}
}
func (r *reader) Close() error {
- r.t.cl.mu.Lock()
- defer r.t.cl.mu.Unlock()
+ r.t.cl.lock()
+ defer r.t.cl.unlock()
r.t.deleteReader(r)
return nil
}
// Returns a channel that is closed when the info (.Info()) for the torrent
// has become available.
func (t *Torrent) GotInfo() <-chan struct{} {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.gotMetainfo.C()
}
// Returns the metainfo info dictionary, or nil if it's not yet available.
func (t *Torrent) Info() *metainfo.Info {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.info
}
// the data requested is actually available.
func (t *Torrent) NewReader() Reader {
r := reader{
- mu: &t.cl.mu,
+ mu: t.cl.locker(),
t: t,
readahead: 5 * 1024 * 1024,
length: *t.length,
// same state. The sum of the state run lengths is the number of pieces
// in the torrent.
func (t *Torrent) PieceStateRuns() []PieceStateRun {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.pieceStateRuns()
}
func (t *Torrent) PieceState(piece pieceIndex) PieceState {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.pieceState(piece)
}
// Get missing bytes count for specific piece.
func (t *Torrent) PieceBytesMissing(piece int) int64 {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return int64(t.pieces[piece].bytesLeft())
}
// this. No data corruption can, or should occur to either the torrent's data,
// or connected peers.
func (t *Torrent) Drop() {
- t.cl.mu.Lock()
+ t.cl.lock()
t.cl.dropTorrent(t.infoHash)
- t.cl.mu.Unlock()
+ t.cl.unlock()
}
// Number of bytes of the entire torrent we have completed. This is the sum of
// for download rate, as it can go down when pieces are lost or fail checks.
// Sample Torrent.Stats.DataBytesRead for actual file data download rate.
func (t *Torrent) BytesCompleted() int64 {
- t.cl.mu.RLock()
- defer t.cl.mu.RUnlock()
+ t.cl.rLock()
+ defer t.cl.rUnlock()
return t.bytesCompleted()
}
// Returns true if the torrent is currently being seeded. This occurs when the
// client is willing to upload without wanting anything in return.
func (t *Torrent) Seeding() bool {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.seeding()
}
// Clobbers the torrent display name. The display name is used as the torrent
// name if the metainfo is not available.
func (t *Torrent) SetDisplayName(dn string) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
t.setDisplayName(dn)
}
// The current working name for the torrent. Either the name in the info dict,
// or a display name given such as by the dn value in a magnet link, or "".
func (t *Torrent) Name() string {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.name()
}
// Returns a run-time generated metainfo for the torrent that includes the
// info bytes and announce-list as currently known to the client.
func (t *Torrent) Metainfo() metainfo.MetaInfo {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.newMetaInfo()
}
func (t *Torrent) addReader(r *reader) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
if t.readers == nil {
t.readers = make(map[*reader]struct{})
}
// priority. Piece indexes are not the same as bytes. Requires that the info
// has been obtained, see Torrent.Info and Torrent.GotInfo.
func (t *Torrent) DownloadPieces(begin, end pieceIndex) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
t.downloadPiecesLocked(begin, end)
}
}
func (t *Torrent) CancelPieces(begin, end pieceIndex) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
t.cancelPiecesLocked(begin, end)
}
func (t *Torrent) AddPeers(pp []Peer) {
cl := t.cl
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
t.addPeers(pp)
}
}
func (t *Torrent) AddTrackers(announceList [][]string) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
t.addTrackers(announceList)
}
func (t *Torrent) Piece(i pieceIndex) *Piece {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return &t.pieces[i]
}
// Returns a channel that is closed when the Torrent is closed.
func (t *Torrent) Closed() <-chan struct{} {
- return t.closed.LockedChan(&t.cl.mu)
+ return t.closed.LockedChan(t.cl.locker())
}
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
}
func (t *Torrent) BytesMissing() int64 {
- t.mu().RLock()
- defer t.mu().RUnlock()
+ t.cl.rLock()
+ defer t.cl.rUnlock()
return t.bytesMissingLocked()
}
}
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
return t.setInfoBytes(b)
}
}).String()
allAddrs[key] = struct{}{}
}
- cl.mu.Lock()
+ cl.lock()
t.addPeers(addPeers)
numPeers := t.peers.Len()
- cl.mu.Unlock()
+ cl.unlock()
if numPeers >= cl.config.TorrentPeersHighWater {
return
}
- case <-t.closed.LockedChan(&cl.mu):
+ case <-t.closed.LockedChan(cl.locker()):
return
}
}
cl := t.cl
for {
select {
- case <-t.wantPeersEvent.LockedChan(&cl.mu):
- case <-t.closed.LockedChan(&cl.mu):
+ case <-t.wantPeersEvent.LockedChan(cl.locker()):
+ case <-t.closed.LockedChan(cl.locker()):
return
}
err := t.announceDHT(true, s)
func() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
if err == nil {
t.numDHTAnnounces++
} else {
}
}()
select {
- case <-t.closed.LockedChan(&cl.mu):
+ case <-t.closed.LockedChan(cl.locker()):
return
case <-time.After(5 * time.Minute):
}
}
func (t *Torrent) Stats() TorrentStats {
- t.cl.mu.RLock()
- defer t.cl.mu.RUnlock()
+ t.cl.rLock()
+ defer t.cl.rUnlock()
return t.statsLocked()
}
}
func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
+ t.cl.lock()
+ defer t.cl.unlock()
oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max
wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
return oldMax
}
-func (t *Torrent) mu() missinggo.RWLocker {
- return &t.cl.mu
-}
-
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
if t.closed.IsSet() {
func (t *Torrent) verifyPiece(piece pieceIndex) {
cl := t.cl
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
p := &t.pieces[piece]
defer func() {
p.numVerifies++
t.publishPieceChange(piece)
t.updatePiecePriority(piece)
t.storageLock.RLock()
- cl.mu.Unlock()
+ cl.unlock()
sum := t.hashPiece(piece)
t.storageLock.RUnlock()
- cl.mu.Lock()
+ cl.lock()
p.hashing = false
t.updatePiecePriority(piece)
t.pieceHashed(piece, sum == p.hash)
tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
tt.setChunkSize(2)
require.NoError(t, tt.setInfoBytes(mi.InfoBytes))
- tt.cl.mu.Lock()
+ tt.cl.lock()
tt.pieces[1].dirtyChunks.AddRange(0, 3)
require.True(t, tt.pieceAllDirty(1))
tt.pieceHashed(1, false)
// Dirty chunks should be cleared so we can try again.
require.False(t, tt.pieceAllDirty(1))
- tt.cl.mu.Unlock()
+ tt.cl.unlock()
}
// Check the behaviour of Torrent.Metainfo when metadata is not completed.
assert.EqualValues(t, 0, tt.metadataSize())
func() {
- cl.mu.Lock()
- defer cl.mu.Unlock()
+ cl.lock()
+ defer cl.unlock()
go func() {
_, err = nc.Write(pp.Message{
Type: pp.Extended,
ret.Err = fmt.Errorf("error getting ip: %s", err)
return
}
- me.t.cl.mu.Lock()
+ me.t.cl.lock()
req := me.t.announceRequest()
- me.t.cl.mu.Unlock()
+ me.t.cl.unlock()
res, err := tracker.Announce{
HttpClient: me.t.cl.config.TrackerHttpClient,
UserAgent: me.t.cl.config.HTTPUserAgent,
func (me *trackerScraper) Run() {
for {
select {
- case <-me.t.closed.LockedChan(&me.t.cl.mu):
+ case <-me.t.closed.LockedChan(me.t.cl.locker()):
return
- case <-me.stop.LockedChan(&me.t.cl.mu):
+ case <-me.stop.LockedChan(me.t.cl.locker()):
return
- case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
+ case <-me.t.wantPeersEvent.LockedChan(me.t.cl.locker()):
}
ar := me.announce()
- me.t.cl.mu.Lock()
+ me.t.cl.lock()
me.lastAnnounce = ar
- me.t.cl.mu.Unlock()
+ me.t.cl.unlock()
intervalChan := time.After(time.Until(ar.Completed.Add(ar.Interval)))
select {
- case <-me.t.closed.LockedChan(&me.t.cl.mu):
+ case <-me.t.closed.LockedChan(me.t.cl.locker()):
return
- case <-me.stop.LockedChan(&me.t.cl.mu):
+ case <-me.stop.LockedChan(me.t.cl.locker()):
return
case <-intervalChan:
}