]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework Reader waiting
authorMatt Joiner <anacrolix@gmail.com>
Thu, 2 Sep 2021 10:53:49 +0000 (20:53 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 4 Sep 2021 13:07:32 +0000 (23:07 +1000)
client.go
misc.go
piece.go
reader.go
torrent.go
tracker_scraper.go

index a04650594b906eb3601d9d8ebe2294d23bbb0351..aea3677e9b4dd1c3d639511dfcacc5e712e511fe 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1097,12 +1097,12 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                storageOpener:       storageClient,
                maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
 
-               networkingEnabled: true,
                metadataChanged: sync.Cond{
                        L: cl.locker(),
                },
                webSeeds: make(map[string]*Peer),
        }
+       t.networkingEnabled.Set()
        t._pendingPieces.NewSet = priorityBitmapStableNewSet
        t.logger = cl.logger.WithContextValue(t)
        t.setChunkSize(defaultChunkSize)
@@ -1198,7 +1198,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        }
        t.addTrackers(spec.Trackers)
        t.maybeNewConns()
-       t.dataDownloadDisallowed = spec.DisallowDataDownload
+       t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
        t.dataUploadDisallowed = spec.DisallowDataUpload
        return nil
 }
diff --git a/misc.go b/misc.go
index 508f0a63441ca090547670598296b202a6fd10e2..4a6e1debe3d21634ca2a6f5d40590d8132183a66 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -52,8 +52,11 @@ func metadataPieceSize(totalSize int, piece int) int {
 }
 
 // Return the request that would include the given offset into the torrent data.
-func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
-       r Request, ok bool) {
+func torrentOffsetRequest(
+       torrentLength, pieceSize, chunkSize, offset int64,
+) (
+       r Request, ok bool,
+) {
        if offset < 0 || offset >= torrentLength {
                return
        }
index 398e7a70b6975445a4438916b1af6116472c86d6..e1b6ff90aa8a504330da2095e48593c62bd557f6 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -4,6 +4,7 @@ import (
        "fmt"
        "sync"
 
+       "github.com/anacrolix/chansync"
        "github.com/anacrolix/missinggo/v2/bitmap"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -21,6 +22,8 @@ type Piece struct {
        // length can be determined by the request chunkSize in use.
        _dirtyChunks bitmap.Bitmap
 
+       readerCond chansync.BroadcastCond
+
        numVerifies         int64
        hashing             bool
        marking             bool
@@ -70,7 +73,7 @@ func (p *Piece) numDirtyChunks() pp.Integer {
 
 func (p *Piece) unpendChunkIndex(i int) {
        p._dirtyChunks.Add(bitmap.BitIndex(i))
-       p.t.tickleReaders()
+       p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i int) {
index ca2f9f6c31595f81f87b4281475ba62a0256a1b1..97ae0ed6f6b077b544484fdd5eb5828f584fa185 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -97,11 +97,6 @@ func (r *reader) available(off, max int64) (ret int64) {
        return
 }
 
-func (r *reader) waitReadable(off int64) {
-       // We may have been sent back here because we were told we could read but it failed.
-       r.t.cl.event.Wait()
-}
-
 // Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces.
 func (r *reader) piecesUncached() (ret pieceRange) {
        ra := r.readahead
@@ -122,27 +117,11 @@ func (r *reader) Read(b []byte) (n int, err error) {
 }
 
 func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
-       // This is set under the Client lock if the Context is canceled. I think we coordinate on a
-       // separate variable so as to avoid false negatives with race conditions due to Contexts being
-       // synchronized.
-       var ctxErr error
-       if ctx.Done() != nil {
-               ctx, cancel := context.WithCancel(ctx)
-               // Abort the goroutine when the function returns.
-               defer cancel()
-               go func() {
-                       <-ctx.Done()
-                       r.t.cl.lock()
-                       ctxErr = ctx.Err()
-                       r.t.tickleReaders()
-                       r.t.cl.unlock()
-               }()
-       }
        // Hmmm, if a Read gets stuck, this means you can't change position for other purposes. That
        // seems reasonable, but unusual.
        r.opMu.Lock()
        defer r.opMu.Unlock()
-       n, err = r.readOnceAt(b, r.pos, &ctxErr)
+       n, err = r.readOnceAt(ctx, b, r.pos)
        if n == 0 {
                if err == nil && len(b) > 0 {
                        panic("expected error")
@@ -163,32 +142,44 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
        return
 }
 
+var closedChan = make(chan struct{})
+
+func init() {
+       close(closedChan)
+}
+
 // Wait until some data should be available to read. Tickles the client if it isn't. Returns how
 // much should be readable without blocking.
-func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
-       r.t.cl.lock()
-       defer r.t.cl.unlock()
+func (r *reader) waitAvailable(ctx context.Context, pos, wanted int64, wait bool) (avail int64, err error) {
+       t := r.t
        for {
+               r.t.cl.rLock()
                avail = r.available(pos, wanted)
+               readerCond := t.piece(int((r.offset + pos) / t.info.PieceLength)).readerCond.Signaled()
+               r.t.cl.rUnlock()
                if avail != 0 {
                        return
                }
-               if r.t.closed.IsSet() {
+               var dontWait <-chan struct{}
+               if !wait || wanted == 0 {
+                       dontWait = closedChan
+               }
+               select {
+               case <-r.t.closed.Done():
                        err = errors.New("torrent closed")
                        return
-               }
-               if *ctxErr != nil {
-                       err = *ctxErr
+               case <-ctx.Done():
+                       err = ctx.Err()
                        return
-               }
-               if r.t.dataDownloadDisallowed || !r.t.networkingEnabled {
-                       err = errors.New("downloading disabled and data not already available")
+               case <-r.t.dataDownloadDisallowed.On():
+                       err = errors.New("torrent data downloading disabled")
+               case <-r.t.networkingEnabled.Off():
+                       err = errors.New("torrent networking disabled")
                        return
-               }
-               if !wait || wanted == 0 {
+               case <-dontWait:
                        return
+               case <-readerCond:
                }
-               r.waitReadable(pos)
        }
 }
 
@@ -199,14 +190,14 @@ func (r *reader) torrentOffset(readerPos int64) int64 {
 }
 
 // Performs at most one successful read to torrent storage.
-func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
+func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
        if pos >= r.length {
                err = io.EOF
                return
        }
        for {
                var avail int64
-               avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
+               avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
                if avail == 0 {
                        return
                }
index 8e2b39174a7be8e2980c9ce3c6412ec12e7b5c97..35d9a42914a05e7d2d11932f267ac30c328c57b6 100644 (file)
@@ -13,12 +13,12 @@ import (
        "net/url"
        "sort"
        "strings"
-       "sync"
        "text/tabwriter"
        "time"
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/chansync"
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/iter"
@@ -29,6 +29,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/sync"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -52,12 +53,12 @@ type Torrent struct {
        cl     *Client
        logger log.Logger
 
-       networkingEnabled      bool
-       dataDownloadDisallowed bool
+       networkingEnabled      chansync.Flag
+       dataDownloadDisallowed chansync.Flag
        dataUploadDisallowed   bool
        userOnWriteChunkErr    func(error)
 
-       closed   missinggo.Event
+       closed   chansync.SetOnce
        infoHash metainfo.Hash
        pieces   []Piece
        // Values are the piece indices that changed.
@@ -192,13 +193,9 @@ func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
        return &t._pendingPieces
 }
 
-func (t *Torrent) tickleReaders() {
-       t.cl.event.Broadcast()
-}
-
 // Returns a channel that is closed when the Torrent is closed.
-func (t *Torrent) Closed() <-chan struct{} {
-       return t.closed.LockedChan(t.cl.locker())
+func (t *Torrent) Closed() chansync.Done {
+       return t.closed.Done()
 }
 
 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
@@ -794,7 +791,6 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
 
 func (t *Torrent) close() (err error) {
        t.closed.Set()
-       t.tickleReaders()
        if t.storage != nil {
                go func() {
                        t.storageLock.Lock()
@@ -1162,7 +1158,6 @@ func (t *Torrent) pendRequest(req Request) {
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
-       t.tickleReaders()
        t.cl.event.Broadcast()
        if t.pieceComplete(piece) {
                t.onPieceCompleted(piece)
@@ -1501,7 +1496,7 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
        wtc, release := t.cl.websocketTrackers.Get(u.String())
        go func() {
-               <-t.closed.LockedChan(t.cl.locker())
+               <-t.closed.Done()
                release()
        }()
        wst := websocketTrackerStatus{u, wtc}
@@ -1664,7 +1659,7 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
                return err
        }
        select {
-       case <-t.closed.LockedChan(t.cl.locker()):
+       case <-t.closed.Done():
        case <-time.After(5 * time.Minute):
        }
        stop()
@@ -1815,7 +1810,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
 }
 
 func (t *Torrent) wantConns() bool {
-       if !t.networkingEnabled {
+       if !t.networkingEnabled.Bool() {
                return false
        }
        if t.closed.IsSet() {
@@ -1949,6 +1944,7 @@ func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
        t.pendAllChunkSpecs(piece)
        t.cancelRequestsForPiece(piece)
+       t.piece(piece).readerCond.Broadcast()
        for conn := range t.conns {
                conn.have(piece)
                t.maybeDropMutuallyCompletePeer(&conn.Peer)
@@ -2137,27 +2133,15 @@ func (t *Torrent) onWriteChunkErr(err error) {
 }
 
 func (t *Torrent) DisallowDataDownload() {
-       t.cl.lock()
-       defer t.cl.unlock()
        t.disallowDataDownloadLocked()
 }
 
 func (t *Torrent) disallowDataDownloadLocked() {
-       t.dataDownloadDisallowed = true
-       t.iterPeers(func(c *Peer) {
-               c.updateRequests()
-       })
-       t.tickleReaders()
+       t.dataDownloadDisallowed.Set()
 }
 
 func (t *Torrent) AllowDataDownload() {
-       t.cl.lock()
-       defer t.cl.unlock()
-       t.dataDownloadDisallowed = false
-       t.tickleReaders()
-       t.iterPeers(func(c *Peer) {
-               c.updateRequests()
-       })
+       t.dataDownloadDisallowed.Clear()
 }
 
 // Enables uploading data, if it was disabled.
index 21c4111be1d9813e074b56db3da612feb0ca1938..c339c2b4415f1e865a8337b0600756120c8293d5 100644 (file)
@@ -216,7 +216,6 @@ func (me *trackerScraper) Run() {
 
                me.t.cl.lock()
                wantPeers := me.t.wantPeersEvent.C()
-               closed := me.t.closed.C()
                me.t.cl.unlock()
 
                // If we want peers, reduce the interval to the minimum if it's appropriate.
@@ -234,7 +233,7 @@ func (me *trackerScraper) Run() {
                }
 
                select {
-               case <-closed:
+               case <-me.t.closed.Done():
                        return
                case <-reconsider:
                        // Recalculate the interval.