]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Get rid of obsoleted Torrent.ceaseNetworking, and clean up Torrent.closing
authorMatt Joiner <anacrolix@gmail.com>
Wed, 11 May 2016 11:44:55 +0000 (21:44 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 11 May 2016 11:44:55 +0000 (21:44 +1000)
client.go
reader.go
t.go
torrent.go
worst_conns.go

index 242e83a5495b3655d5a75cb34c4ce8b53f2972d5..79e9680b9628a51436d5c08913053a7245a947d4 100644 (file)
--- a/client.go
+++ b/client.go
@@ -921,7 +921,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
        if !cl.addConnection(t, c) {
                return
        }
        if !cl.addConnection(t, c) {
                return
        }
-       defer cl.dropConnection(t, c)
+       defer t.dropConnection(c)
        go c.writer(time.Minute)
        cl.sendInitialMessages(c, t)
        err = cl.connectionLoop(t, c)
        go c.writer(time.Minute)
        cl.sendInitialMessages(c, t)
        err = cl.connectionLoop(t, c)
@@ -1080,7 +1080,7 @@ func (cl *Client) upload(t *Torrent, c *connection) {
        if !c.PeerInterested {
                return
        }
        if !c.PeerInterested {
                return
        }
-       seeding := cl.seeding(t)
+       seeding := t.seeding()
        if !seeding && !t.connHasWantedPieces(c) {
                return
        }
        if !seeding && !t.connHasWantedPieces(c) {
                return
        }
@@ -1336,40 +1336,11 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
        }
 }
 
        }
 }
 
-// Returns true if connection is removed from torrent.Conns.
-func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
-       for i0, _c := range t.conns {
-               if _c != c {
-                       continue
-               }
-               i1 := len(t.conns) - 1
-               if i0 != i1 {
-                       t.conns[i0] = t.conns[i1]
-               }
-               t.conns = t.conns[:i1]
-               return true
-       }
-       return false
-}
-
-func (cl *Client) dropConnection(t *Torrent, c *connection) {
-       cl.event.Broadcast()
-       c.Close()
-       if cl.deleteConnection(t, c) {
-               cl.openNewConns(t)
-       }
-}
-
 // Returns true if the connection is added.
 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
        if cl.closed.IsSet() {
                return false
        }
 // Returns true if the connection is added.
 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
        if cl.closed.IsSet() {
                return false
        }
-       select {
-       case <-t.ceasingNetworking:
-               return false
-       default:
-       }
        if !cl.wantConns(t) {
                return false
        }
        if !cl.wantConns(t) {
                return false
        }
@@ -1389,7 +1360,7 @@ func (cl *Client) addConnection(t *Torrent, c *connection) bool {
                        log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
                }
                c.Close()
                        log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
                }
                c.Close()
-               cl.deleteConnection(t, c)
+               t.deleteConnection(c)
        }
        if len(t.conns) >= socketsPerTorrent {
                panic(len(t.conns))
        }
        if len(t.conns) >= socketsPerTorrent {
                panic(len(t.conns))
@@ -1406,14 +1377,14 @@ func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
        if !t.haveInfo() {
                return c.supportsExtension("ut_metadata")
        }
        if !t.haveInfo() {
                return c.supportsExtension("ut_metadata")
        }
-       if cl.seeding(t) {
+       if t.seeding() {
                return c.PeerInterested
        }
        return t.connHasWantedPieces(c)
 }
 
 func (cl *Client) wantConns(t *Torrent) bool {
                return c.PeerInterested
        }
        return t.connHasWantedPieces(c)
 }
 
 func (cl *Client) wantConns(t *Torrent) bool {
-       if !cl.seeding(t) && !t.needData() {
+       if !t.seeding() && !t.needData() {
                return false
        }
        if len(t.conns) < socketsPerTorrent {
                return false
        }
        if len(t.conns) < socketsPerTorrent {
@@ -1423,11 +1394,6 @@ func (cl *Client) wantConns(t *Torrent) bool {
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
-       select {
-       case <-t.ceasingNetworking:
-               return
-       default:
-       }
        for len(t.peers) != 0 {
                if !cl.wantConns(t) {
                        return
        for len(t.peers) != 0 {
                if !cl.wantConns(t) {
                        return
@@ -1477,9 +1443,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
                chunkSize: defaultChunkSize,
                peers:     make(map[peersKey]Peer),
 
                chunkSize: defaultChunkSize,
                peers:     make(map[peersKey]Peer),
 
-               closing:           make(chan struct{}),
-               ceasingNetworking: make(chan struct{}),
-
                halfOpen:          make(map[string]struct{}),
                pieceStateChanges: pubsub.NewPubSub(),
 
                halfOpen:          make(map[string]struct{}),
                pieceStateChanges: pubsub.NewPubSub(),
 
@@ -1635,43 +1598,8 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
        return
 }
 
        return
 }
 
-// Returns true when peers are required, or false if the torrent is closing.
-func (cl *Client) waitWantPeers(t *Torrent) bool {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       for {
-               select {
-               case <-t.ceasingNetworking:
-                       return false
-               default:
-               }
-               if len(t.peers) > torrentPeersLowWater {
-                       goto wait
-               }
-               if t.needData() || cl.seeding(t) {
-                       return true
-               }
-       wait:
-               t.wantPeers.Wait()
-       }
-}
-
-// Returns whether the client should make effort to seed the torrent.
-func (cl *Client) seeding(t *Torrent) bool {
-       if cl.config.NoUpload {
-               return false
-       }
-       if !cl.config.Seed {
-               return false
-       }
-       if t.needData() {
-               return false
-       }
-       return true
-}
-
 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
-       for cl.waitWantPeers(t) {
+       for t.waitWantPeers() {
                // log.Printf("getting peers for %q from DHT", t)
                ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
                // log.Printf("getting peers for %q from DHT", t)
                ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
@@ -1711,7 +1639,7 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
                                if numPeers >= torrentPeersHighWater {
                                        break getPeers
                                }
                                if numPeers >= torrentPeersHighWater {
                                        break getPeers
                                }
-                       case <-t.ceasingNetworking:
+                       case <-t.closed.LockedChan(&cl.mu):
                                ps.Close()
                                return
                        }
                                ps.Close()
                                return
                        }
@@ -1800,7 +1728,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) {
                PeerId:   cl.peerID,
                InfoHash: t.infoHash,
        }
                PeerId:   cl.peerID,
                InfoHash: t.infoHash,
        }
-       if !cl.waitWantPeers(t) {
+       if !t.waitWantPeers() {
                return
        }
        cl.mu.RLock()
                return
        }
        cl.mu.RLock()
@@ -1811,7 +1739,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) {
                req.Event = tracker.None
        }
 newAnnounce:
                req.Event = tracker.None
        }
 newAnnounce:
-       for cl.waitWantPeers(t) {
+       for t.waitWantPeers() {
                cl.mu.RLock()
                req.Left = t.bytesLeftAnnounce()
                trackers = t.trackers
                cl.mu.RLock()
                req.Left = t.bytesLeftAnnounce()
                trackers = t.trackers
@@ -1975,7 +1903,7 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
        } else if len(touchers) != 0 {
                log.Printf("dropping %d conns that touched piece", len(touchers))
                for _, c := range touchers {
        } else if len(touchers) != 0 {
                log.Printf("dropping %d conns that touched piece", len(touchers))
                for _, c := range touchers {
-                       cl.dropConnection(t, c)
+                       t.dropConnection(c)
                }
        }
        cl.pieceChanged(t, piece)
                }
        }
        cl.pieceChanged(t, piece)
@@ -2034,7 +1962,7 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) {
                cl.event.Wait()
        }
        p.QueuedForHash = false
                cl.event.Wait()
        }
        p.QueuedForHash = false
-       if t.isClosed() || t.pieceComplete(piece) {
+       if t.closed.IsSet() || t.pieceComplete(piece) {
                t.updatePiecePriority(piece)
                t.publishPieceChange(piece)
                return
                t.updatePiecePriority(piece)
                t.publishPieceChange(piece)
                return
@@ -2044,11 +1972,6 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) {
        cl.mu.Unlock()
        sum := t.hashPiece(piece)
        cl.mu.Lock()
        cl.mu.Unlock()
        sum := t.hashPiece(piece)
        cl.mu.Lock()
-       select {
-       case <-t.closing:
-               return
-       default:
-       }
        p.Hashing = false
        cl.pieceHashed(t, piece, sum == p.Hash)
 }
        p.Hashing = false
        cl.pieceHashed(t, piece, sum == p.Hash)
 }
index 4fd9af977e886c6f2c6f6e9d33a2bc6c9617e36e..f14bc0c354b0ab40cbfa9e0350909780f50b99bb 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -45,7 +45,7 @@ func (r *Reader) SetReadahead(readahead int64) {
 }
 
 func (r *Reader) readable(off int64) (ret bool) {
 }
 
 func (r *Reader) readable(off int64) (ret bool) {
-       if r.torrentClosed() {
+       if r.t.closed.IsSet() {
                return true
        }
        req, ok := r.t.offsetRequest(off)
                return true
        }
        req, ok := r.t.offsetRequest(off)
@@ -137,11 +137,6 @@ func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
        return
 }
 
        return
 }
 
-// Safe to call with or without client lock.
-func (r *Reader) torrentClosed() bool {
-       return r.t.isClosed()
-}
-
 // Wait until some data should be available to read. Tickles the client if it
 // isn't. Returns how much should be readable without blocking.
 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
 // Wait until some data should be available to read. Tickles the client if it
 // isn't. Returns how much should be readable without blocking.
 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
@@ -162,7 +157,7 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
        for {
                avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
                if avail == 0 {
        for {
                avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
                if avail == 0 {
-                       if r.torrentClosed() {
+                       if r.t.closed.IsSet() {
                                err = errors.New("torrent closed")
                                return
                        }
                                err = errors.New("torrent closed")
                                return
                        }
diff --git a/t.go b/t.go
index 4a0f65adcf0836e35e4f65f34497ba220132699e..9a726a7713d58f6e22d386d66afb174f74eefb74 100644 (file)
--- a/t.go
+++ b/t.go
@@ -87,7 +87,7 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
 func (t *Torrent) Seeding() bool {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
 func (t *Torrent) Seeding() bool {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
-       return t.cl.seeding(t)
+       return t.seeding()
 }
 
 // Clobbers the torrent display name. The display name is used as the torrent
 }
 
 // Clobbers the torrent display name. The display name is used as the torrent
index 056f4b4747d0237e7a4169c6f3404e646b1f9b32..f8dfea2c59d6e6ffe51d34dcd12ca5ff90be0c1f 100644 (file)
@@ -38,12 +38,7 @@ type peersKey struct {
 type Torrent struct {
        cl *Client
 
 type Torrent struct {
        cl *Client
 
-       closing chan struct{}
-
-       // Closed when no more network activity is desired. This includes
-       // announcing, and communicating with peers.
-       ceasingNetworking chan struct{}
-
+       closed   missinggo.Event
        infoHash metainfo.Hash
        pieces   []piece
        // Values are the piece indices that changed.
        infoHash metainfo.Hash
        pieces   []piece
        // Values are the piece indices that changed.
@@ -142,18 +137,6 @@ func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
        return
 }
 
        return
 }
 
-func (t *Torrent) ceaseNetworking() {
-       select {
-       case <-t.ceasingNetworking:
-               return
-       default:
-       }
-       close(t.ceasingNetworking)
-       for _, c := range t.conns {
-               c.Close()
-       }
-}
-
 func (t *Torrent) addPeer(p Peer, cl *Client) {
        cl.openNewConns(t)
        if len(t.peers) >= torrentPeersHighWater {
 func (t *Torrent) addPeer(p Peer, cl *Client) {
        cl.openNewConns(t)
        if len(t.peers) >= torrentPeersHighWater {
@@ -527,22 +510,8 @@ func (t *Torrent) numPiecesCompleted() (num int) {
        return t.completedPieces.Len()
 }
 
        return t.completedPieces.Len()
 }
 
-// Safe to call with or without client lock.
-func (t *Torrent) isClosed() bool {
-       select {
-       case <-t.closing:
-               return true
-       default:
-               return false
-       }
-}
-
 func (t *Torrent) close() (err error) {
 func (t *Torrent) close() (err error) {
-       if t.isClosed() {
-               return
-       }
-       t.ceaseNetworking()
-       close(t.closing)
+       t.closed.Set()
        if c, ok := t.storage.(io.Closer); ok {
                c.Close()
        }
        if c, ok := t.storage.(io.Closer); ok {
                c.Close()
        }
@@ -1107,3 +1076,61 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) {
        }
        return t.setInfoBytes(b)
 }
        }
        return t.setInfoBytes(b)
 }
+
+// Returns true if connection is removed from torrent.Conns.
+func (t *Torrent) deleteConnection(c *connection) bool {
+       for i0, _c := range t.conns {
+               if _c != c {
+                       continue
+               }
+               i1 := len(t.conns) - 1
+               if i0 != i1 {
+                       t.conns[i0] = t.conns[i1]
+               }
+               t.conns = t.conns[:i1]
+               return true
+       }
+       return false
+}
+
+func (t *Torrent) dropConnection(c *connection) {
+       t.cl.event.Broadcast()
+       c.Close()
+       if t.deleteConnection(c) {
+               t.openNewConns()
+       }
+}
+
+// Returns true when peers are required, or false if the torrent is closing.
+func (t *Torrent) waitWantPeers() bool {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
+       for {
+               if t.closed.IsSet() {
+                       return false
+               }
+               if len(t.peers) > torrentPeersLowWater {
+                       goto wait
+               }
+               if t.needData() || t.seeding() {
+                       return true
+               }
+       wait:
+               t.wantPeers.Wait()
+       }
+}
+
+// Returns whether the client should make effort to seed the torrent.
+func (t *Torrent) seeding() bool {
+       cl := t.cl
+       if cl.config.NoUpload {
+               return false
+       }
+       if !cl.config.Seed {
+               return false
+       }
+       if t.needData() {
+               return false
+       }
+       return true
+}
index 1f41c561d8eff0115b50c6b46d4982c7540c1473..181b9ccc1b935ff2ddba6e744e4e76d31f7e12d0 100644 (file)
@@ -45,7 +45,7 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {
 func (wc *worstConns) key(i int) (key worstConnsSortKey) {
        c := wc.c[i]
        key.useful = wc.cl.usefulConn(wc.t, c)
 func (wc *worstConns) key(i int) (key worstConnsSortKey) {
        c := wc.c[i]
        key.useful = wc.cl.usefulConn(wc.t, c)
-       if wc.cl.seeding(wc.t) {
+       if wc.t.seeding() {
                key.lastHelpful = c.lastChunkSent
        }
        // Intentionally consider the last time a chunk was received when seeding,
                key.lastHelpful = c.lastChunkSent
        }
        // Intentionally consider the last time a chunk was received when seeding,