From 528836ab4c945b9aeb53245c2ad16f20e0225451 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 11 May 2016 21:44:55 +1000 Subject: [PATCH] Get rid of obsoleted Torrent.ceaseNetworking, and clean up Torrent.closing --- client.go | 99 ++++++-------------------------------------------- reader.go | 9 +---- t.go | 2 +- torrent.go | 93 ++++++++++++++++++++++++++++++----------------- worst_conns.go | 2 +- 5 files changed, 75 insertions(+), 130 deletions(-) diff --git a/client.go b/client.go index 242e83a5..79e9680b 100644 --- a/client.go +++ b/client.go @@ -921,7 +921,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) { if !cl.addConnection(t, c) { return } - defer cl.dropConnection(t, c) + defer t.dropConnection(c) go c.writer(time.Minute) cl.sendInitialMessages(c, t) err = cl.connectionLoop(t, c) @@ -1080,7 +1080,7 @@ func (cl *Client) upload(t *Torrent, c *connection) { if !c.PeerInterested { return } - seeding := cl.seeding(t) + seeding := t.seeding() if !seeding && !t.connHasWantedPieces(c) { return } @@ -1336,40 +1336,11 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error { } } -// Returns true if connection is removed from torrent.Conns. -func (cl *Client) deleteConnection(t *Torrent, c *connection) bool { - for i0, _c := range t.conns { - if _c != c { - continue - } - i1 := len(t.conns) - 1 - if i0 != i1 { - t.conns[i0] = t.conns[i1] - } - t.conns = t.conns[:i1] - return true - } - return false -} - -func (cl *Client) dropConnection(t *Torrent, c *connection) { - cl.event.Broadcast() - c.Close() - if cl.deleteConnection(t, c) { - cl.openNewConns(t) - } -} - // Returns true if the connection is added. func (cl *Client) addConnection(t *Torrent, c *connection) bool { if cl.closed.IsSet() { return false } - select { - case <-t.ceasingNetworking: - return false - default: - } if !cl.wantConns(t) { return false } @@ -1389,7 +1360,7 @@ func (cl *Client) addConnection(t *Torrent, c *connection) bool { log.Printf("%s: dropping connection to make room for new one:\n %s", t, c) } c.Close() - cl.deleteConnection(t, c) + t.deleteConnection(c) } if len(t.conns) >= socketsPerTorrent { panic(len(t.conns)) @@ -1406,14 +1377,14 @@ func (cl *Client) usefulConn(t *Torrent, c *connection) bool { if !t.haveInfo() { return c.supportsExtension("ut_metadata") } - if cl.seeding(t) { + if t.seeding() { return c.PeerInterested } return t.connHasWantedPieces(c) } func (cl *Client) wantConns(t *Torrent) bool { - if !cl.seeding(t) && !t.needData() { + if !t.seeding() && !t.needData() { return false } if len(t.conns) < socketsPerTorrent { @@ -1423,11 +1394,6 @@ func (cl *Client) wantConns(t *Torrent) bool { } func (cl *Client) openNewConns(t *Torrent) { - select { - case <-t.ceasingNetworking: - return - default: - } for len(t.peers) != 0 { if !cl.wantConns(t) { return @@ -1477,9 +1443,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { chunkSize: defaultChunkSize, peers: make(map[peersKey]Peer), - closing: make(chan struct{}), - ceasingNetworking: make(chan struct{}), - halfOpen: make(map[string]struct{}), pieceStateChanges: pubsub.NewPubSub(), @@ -1635,43 +1598,8 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) { return } -// Returns true when peers are required, or false if the torrent is closing. -func (cl *Client) waitWantPeers(t *Torrent) bool { - cl.mu.Lock() - defer cl.mu.Unlock() - for { - select { - case <-t.ceasingNetworking: - return false - default: - } - if len(t.peers) > torrentPeersLowWater { - goto wait - } - if t.needData() || cl.seeding(t) { - return true - } - wait: - t.wantPeers.Wait() - } -} - -// Returns whether the client should make effort to seed the torrent. -func (cl *Client) seeding(t *Torrent) bool { - if cl.config.NoUpload { - return false - } - if !cl.config.Seed { - return false - } - if t.needData() { - return false - } - return true -} - func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) { - for cl.waitWantPeers(t) { + for t.waitWantPeers() { // log.Printf("getting peers for %q from DHT", t) ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort) if err != nil { @@ -1711,7 +1639,7 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) { if numPeers >= torrentPeersHighWater { break getPeers } - case <-t.ceasingNetworking: + case <-t.closed.LockedChan(&cl.mu): ps.Close() return } @@ -1800,7 +1728,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) { PeerId: cl.peerID, InfoHash: t.infoHash, } - if !cl.waitWantPeers(t) { + if !t.waitWantPeers() { return } cl.mu.RLock() @@ -1811,7 +1739,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) { req.Event = tracker.None } newAnnounce: - for cl.waitWantPeers(t) { + for t.waitWantPeers() { cl.mu.RLock() req.Left = t.bytesLeftAnnounce() trackers = t.trackers @@ -1975,7 +1903,7 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) { } else if len(touchers) != 0 { log.Printf("dropping %d conns that touched piece", len(touchers)) for _, c := range touchers { - cl.dropConnection(t, c) + t.dropConnection(c) } } cl.pieceChanged(t, piece) @@ -2034,7 +1962,7 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) { cl.event.Wait() } p.QueuedForHash = false - if t.isClosed() || t.pieceComplete(piece) { + if t.closed.IsSet() || t.pieceComplete(piece) { t.updatePiecePriority(piece) t.publishPieceChange(piece) return @@ -2044,11 +1972,6 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) { cl.mu.Unlock() sum := t.hashPiece(piece) cl.mu.Lock() - select { - case <-t.closing: - return - default: - } p.Hashing = false cl.pieceHashed(t, piece, sum == p.Hash) } diff --git a/reader.go b/reader.go index 4fd9af97..f14bc0c3 100644 --- a/reader.go +++ b/reader.go @@ -45,7 +45,7 @@ func (r *Reader) SetReadahead(readahead int64) { } func (r *Reader) readable(off int64) (ret bool) { - if r.torrentClosed() { + if r.t.closed.IsSet() { return true } req, ok := r.t.offsetRequest(off) @@ -137,11 +137,6 @@ func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) { return } -// Safe to call with or without client lock. -func (r *Reader) torrentClosed() bool { - return r.t.isClosed() -} - // Wait until some data should be available to read. Tickles the client if it // isn't. Returns how much should be readable without blocking. func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) { @@ -162,7 +157,7 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro for { avail := r.waitAvailable(pos, int64(len(b)), ctxErr) if avail == 0 { - if r.torrentClosed() { + if r.t.closed.IsSet() { err = errors.New("torrent closed") return } diff --git a/t.go b/t.go index 4a0f65ad..9a726a77 100644 --- a/t.go +++ b/t.go @@ -87,7 +87,7 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription { func (t *Torrent) Seeding() bool { t.cl.mu.Lock() defer t.cl.mu.Unlock() - return t.cl.seeding(t) + return t.seeding() } // Clobbers the torrent display name. The display name is used as the torrent diff --git a/torrent.go b/torrent.go index 056f4b47..f8dfea2c 100644 --- a/torrent.go +++ b/torrent.go @@ -38,12 +38,7 @@ type peersKey struct { type Torrent struct { cl *Client - closing chan struct{} - - // Closed when no more network activity is desired. This includes - // announcing, and communicating with peers. - ceasingNetworking chan struct{} - + closed missinggo.Event infoHash metainfo.Hash pieces []piece // Values are the piece indices that changed. @@ -142,18 +137,6 @@ func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) { return } -func (t *Torrent) ceaseNetworking() { - select { - case <-t.ceasingNetworking: - return - default: - } - close(t.ceasingNetworking) - for _, c := range t.conns { - c.Close() - } -} - func (t *Torrent) addPeer(p Peer, cl *Client) { cl.openNewConns(t) if len(t.peers) >= torrentPeersHighWater { @@ -527,22 +510,8 @@ func (t *Torrent) numPiecesCompleted() (num int) { return t.completedPieces.Len() } -// Safe to call with or without client lock. -func (t *Torrent) isClosed() bool { - select { - case <-t.closing: - return true - default: - return false - } -} - func (t *Torrent) close() (err error) { - if t.isClosed() { - return - } - t.ceaseNetworking() - close(t.closing) + t.closed.Set() if c, ok := t.storage.(io.Closer); ok { c.Close() } @@ -1107,3 +1076,61 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) { } return t.setInfoBytes(b) } + +// Returns true if connection is removed from torrent.Conns. +func (t *Torrent) deleteConnection(c *connection) bool { + for i0, _c := range t.conns { + if _c != c { + continue + } + i1 := len(t.conns) - 1 + if i0 != i1 { + t.conns[i0] = t.conns[i1] + } + t.conns = t.conns[:i1] + return true + } + return false +} + +func (t *Torrent) dropConnection(c *connection) { + t.cl.event.Broadcast() + c.Close() + if t.deleteConnection(c) { + t.openNewConns() + } +} + +// Returns true when peers are required, or false if the torrent is closing. +func (t *Torrent) waitWantPeers() bool { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + for { + if t.closed.IsSet() { + return false + } + if len(t.peers) > torrentPeersLowWater { + goto wait + } + if t.needData() || t.seeding() { + return true + } + wait: + t.wantPeers.Wait() + } +} + +// Returns whether the client should make effort to seed the torrent. +func (t *Torrent) seeding() bool { + cl := t.cl + if cl.config.NoUpload { + return false + } + if !cl.config.Seed { + return false + } + if t.needData() { + return false + } + return true +} diff --git a/worst_conns.go b/worst_conns.go index 1f41c561..181b9ccc 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -45,7 +45,7 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool { func (wc *worstConns) key(i int) (key worstConnsSortKey) { c := wc.c[i] key.useful = wc.cl.usefulConn(wc.t, c) - if wc.cl.seeding(wc.t) { + if wc.t.seeding() { key.lastHelpful = c.lastChunkSent } // Intentionally consider the last time a chunk was received when seeding, -- 2.44.0