]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Rework Reader waiting
[btrtrc.git] / torrent.go
index 8e2b39174a7be8e2980c9ce3c6412ec12e7b5c97..35d9a42914a05e7d2d11932f267ac30c328c57b6 100644 (file)
@@ -13,12 +13,12 @@ import (
        "net/url"
        "sort"
        "strings"
-       "sync"
        "text/tabwriter"
        "time"
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/chansync"
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/iter"
@@ -29,6 +29,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/sync"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -52,12 +53,12 @@ type Torrent struct {
        cl     *Client
        logger log.Logger
 
-       networkingEnabled      bool
-       dataDownloadDisallowed bool
+       networkingEnabled      chansync.Flag
+       dataDownloadDisallowed chansync.Flag
        dataUploadDisallowed   bool
        userOnWriteChunkErr    func(error)
 
-       closed   missinggo.Event
+       closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
        // Values are the piece indices that changed.
@@ -192,13 +193,9 @@ func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
        return &t._pendingPieces
 }
 
-func (t *Torrent) tickleReaders() {
-       t.cl.event.Broadcast()
-}
-
 // Returns a channel that is closed when the Torrent is closed.
-func (t *Torrent) Closed() <-chan struct{} {
-       return t.closed.LockedChan(t.cl.locker())
+func (t *Torrent) Closed() chansync.Done {
+       return t.closed.Done()
 }
 
 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
@@ -794,7 +791,6 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
 
 func (t *Torrent) close() (err error) {
        t.closed.Set()
-       t.tickleReaders()
        if t.storage != nil {
                go func() {
                        t.storageLock.Lock()
@@ -1162,7 +1158,6 @@ func (t *Torrent) pendRequest(req Request) {
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
-       t.tickleReaders()
        t.cl.event.Broadcast()
        if t.pieceComplete(piece) {
                t.onPieceCompleted(piece)
@@ -1501,7 +1496,7 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
        wtc, release := t.cl.websocketTrackers.Get(u.String())
        go func() {
-               <-t.closed.LockedChan(t.cl.locker())
+               <-t.closed.Done()
                release()
        }()
        wst := websocketTrackerStatus{u, wtc}
@@ -1664,7 +1659,7 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
                return err
        }
        select {
-       case <-t.closed.LockedChan(t.cl.locker()):
+       case <-t.closed.Done():
        case <-time.After(5 * time.Minute):
        }
        stop()
@@ -1815,7 +1810,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
 }
 
 func (t *Torrent) wantConns() bool {
-       if !t.networkingEnabled {
+       if !t.networkingEnabled.Bool() {
                return false
        }
        if t.closed.IsSet() {
@@ -1949,6 +1944,7 @@ func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
        t.pendAllChunkSpecs(piece)
        t.cancelRequestsForPiece(piece)
+       t.piece(piece).readerCond.Broadcast()
        for conn := range t.conns {
                conn.have(piece)
                t.maybeDropMutuallyCompletePeer(&conn.Peer)
@@ -2137,27 +2133,15 @@ func (t *Torrent) onWriteChunkErr(err error) {
 }
 
 func (t *Torrent) DisallowDataDownload() {
-       t.cl.lock()
-       defer t.cl.unlock()
        t.disallowDataDownloadLocked()
 }
 
 func (t *Torrent) disallowDataDownloadLocked() {
-       t.dataDownloadDisallowed = true
-       t.iterPeers(func(c *Peer) {
-               c.updateRequests()
-       })
-       t.tickleReaders()
+       t.dataDownloadDisallowed.Set()
 }
 
 func (t *Torrent) AllowDataDownload() {
-       t.cl.lock()
-       defer t.cl.unlock()
-       t.dataDownloadDisallowed = false
-       t.tickleReaders()
-       t.iterPeers(func(c *Peer) {
-               c.updateRequests()
-       })
+       t.dataDownloadDisallowed.Clear()
 }
 
 // Enables uploading data, if it was disabled.