From: Matt Joiner Date: Thu, 2 Sep 2021 10:53:49 +0000 (+1000) Subject: Rework Reader waiting X-Git-Tag: v1.32.0~99 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=175b826e738c76d1a96e435bc5396e054ca5796a;p=btrtrc.git Rework Reader waiting --- diff --git a/client.go b/client.go index a0465059..aea3677e 100644 --- a/client.go +++ b/client.go @@ -1097,12 +1097,12 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( storageOpener: storageClient, maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, - networkingEnabled: true, metadataChanged: sync.Cond{ L: cl.locker(), }, webSeeds: make(map[string]*Peer), } + t.networkingEnabled.Set() t._pendingPieces.NewSet = priorityBitmapStableNewSet t.logger = cl.logger.WithContextValue(t) t.setChunkSize(defaultChunkSize) @@ -1198,7 +1198,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { } t.addTrackers(spec.Trackers) t.maybeNewConns() - t.dataDownloadDisallowed = spec.DisallowDataDownload + t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload) t.dataUploadDisallowed = spec.DisallowDataUpload return nil } diff --git a/misc.go b/misc.go index 508f0a63..4a6e1deb 100644 --- a/misc.go +++ b/misc.go @@ -52,8 +52,11 @@ func metadataPieceSize(totalSize int, piece int) int { } // Return the request that would include the given offset into the torrent data. -func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) ( - r Request, ok bool) { +func torrentOffsetRequest( + torrentLength, pieceSize, chunkSize, offset int64, +) ( + r Request, ok bool, +) { if offset < 0 || offset >= torrentLength { return } diff --git a/piece.go b/piece.go index 398e7a70..e1b6ff90 100644 --- a/piece.go +++ b/piece.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/anacrolix/chansync" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/torrent/metainfo" @@ -21,6 +22,8 @@ type Piece struct { // length can be determined by the request chunkSize in use. _dirtyChunks bitmap.Bitmap + readerCond chansync.BroadcastCond + numVerifies int64 hashing bool marking bool @@ -70,7 +73,7 @@ func (p *Piece) numDirtyChunks() pp.Integer { func (p *Piece) unpendChunkIndex(i int) { p._dirtyChunks.Add(bitmap.BitIndex(i)) - p.t.tickleReaders() + p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i int) { diff --git a/reader.go b/reader.go index ca2f9f6c..97ae0ed6 100644 --- a/reader.go +++ b/reader.go @@ -97,11 +97,6 @@ func (r *reader) available(off, max int64) (ret int64) { return } -func (r *reader) waitReadable(off int64) { - // We may have been sent back here because we were told we could read but it failed. - r.t.cl.event.Wait() -} - // Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces. func (r *reader) piecesUncached() (ret pieceRange) { ra := r.readahead @@ -122,27 +117,11 @@ func (r *reader) Read(b []byte) (n int, err error) { } func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { - // This is set under the Client lock if the Context is canceled. I think we coordinate on a - // separate variable so as to avoid false negatives with race conditions due to Contexts being - // synchronized. - var ctxErr error - if ctx.Done() != nil { - ctx, cancel := context.WithCancel(ctx) - // Abort the goroutine when the function returns. - defer cancel() - go func() { - <-ctx.Done() - r.t.cl.lock() - ctxErr = ctx.Err() - r.t.tickleReaders() - r.t.cl.unlock() - }() - } // Hmmm, if a Read gets stuck, this means you can't change position for other purposes. That // seems reasonable, but unusual. r.opMu.Lock() defer r.opMu.Unlock() - n, err = r.readOnceAt(b, r.pos, &ctxErr) + n, err = r.readOnceAt(ctx, b, r.pos) if n == 0 { if err == nil && len(b) > 0 { panic("expected error") @@ -163,32 +142,44 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { return } +var closedChan = make(chan struct{}) + +func init() { + close(closedChan) +} + // Wait until some data should be available to read. Tickles the client if it isn't. Returns how // much should be readable without blocking. -func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) { - r.t.cl.lock() - defer r.t.cl.unlock() +func (r *reader) waitAvailable(ctx context.Context, pos, wanted int64, wait bool) (avail int64, err error) { + t := r.t for { + r.t.cl.rLock() avail = r.available(pos, wanted) + readerCond := t.piece(int((r.offset + pos) / t.info.PieceLength)).readerCond.Signaled() + r.t.cl.rUnlock() if avail != 0 { return } - if r.t.closed.IsSet() { + var dontWait <-chan struct{} + if !wait || wanted == 0 { + dontWait = closedChan + } + select { + case <-r.t.closed.Done(): err = errors.New("torrent closed") return - } - if *ctxErr != nil { - err = *ctxErr + case <-ctx.Done(): + err = ctx.Err() return - } - if r.t.dataDownloadDisallowed || !r.t.networkingEnabled { - err = errors.New("downloading disabled and data not already available") + case <-r.t.dataDownloadDisallowed.On(): + err = errors.New("torrent data downloading disabled") + case <-r.t.networkingEnabled.Off(): + err = errors.New("torrent networking disabled") return - } - if !wait || wanted == 0 { + case <-dontWait: return + case <-readerCond: } - r.waitReadable(pos) } } @@ -199,14 +190,14 @@ func (r *reader) torrentOffset(readerPos int64) int64 { } // Performs at most one successful read to torrent storage. -func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) { +func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) { if pos >= r.length { err = io.EOF return } for { var avail int64 - avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0) + avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0) if avail == 0 { return } diff --git a/torrent.go b/torrent.go index 8e2b3917..35d9a429 100644 --- a/torrent.go +++ b/torrent.go @@ -13,12 +13,12 @@ import ( "net/url" "sort" "strings" - "sync" "text/tabwriter" "time" "unsafe" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/chansync" "github.com/anacrolix/dht/v2" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/iter" @@ -29,6 +29,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" + "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" @@ -52,12 +53,12 @@ type Torrent struct { cl *Client logger log.Logger - networkingEnabled bool - dataDownloadDisallowed bool + networkingEnabled chansync.Flag + dataDownloadDisallowed chansync.Flag dataUploadDisallowed bool userOnWriteChunkErr func(error) - closed missinggo.Event + closed chansync.SetOnce infoHash metainfo.Hash pieces []Piece // Values are the piece indices that changed. @@ -192,13 +193,9 @@ func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { return &t._pendingPieces } -func (t *Torrent) tickleReaders() { - t.cl.event.Broadcast() -} - // Returns a channel that is closed when the Torrent is closed. -func (t *Torrent) Closed() <-chan struct{} { - return t.closed.LockedChan(t.cl.locker()) +func (t *Torrent) Closed() chansync.Done { + return t.closed.Done() } // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active, @@ -794,7 +791,6 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) { func (t *Torrent) close() (err error) { t.closed.Set() - t.tickleReaders() if t.storage != nil { go func() { t.storageLock.Lock() @@ -1162,7 +1158,6 @@ func (t *Torrent) pendRequest(req Request) { } func (t *Torrent) pieceCompletionChanged(piece pieceIndex) { - t.tickleReaders() t.cl.event.Broadcast() if t.pieceComplete(piece) { t.onPieceCompleted(piece) @@ -1501,7 +1496,7 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer { wtc, release := t.cl.websocketTrackers.Get(u.String()) go func() { - <-t.closed.LockedChan(t.cl.locker()) + <-t.closed.Done() release() }() wst := websocketTrackerStatus{u, wtc} @@ -1664,7 +1659,7 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error { return err } select { - case <-t.closed.LockedChan(t.cl.locker()): + case <-t.closed.Done(): case <-time.After(5 * time.Minute): } stop() @@ -1815,7 +1810,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } func (t *Torrent) wantConns() bool { - if !t.networkingEnabled { + if !t.networkingEnabled.Bool() { return false } if t.closed.IsSet() { @@ -1949,6 +1944,7 @@ func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) { func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.pendAllChunkSpecs(piece) t.cancelRequestsForPiece(piece) + t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) t.maybeDropMutuallyCompletePeer(&conn.Peer) @@ -2137,27 +2133,15 @@ func (t *Torrent) onWriteChunkErr(err error) { } func (t *Torrent) DisallowDataDownload() { - t.cl.lock() - defer t.cl.unlock() t.disallowDataDownloadLocked() } func (t *Torrent) disallowDataDownloadLocked() { - t.dataDownloadDisallowed = true - t.iterPeers(func(c *Peer) { - c.updateRequests() - }) - t.tickleReaders() + t.dataDownloadDisallowed.Set() } func (t *Torrent) AllowDataDownload() { - t.cl.lock() - defer t.cl.unlock() - t.dataDownloadDisallowed = false - t.tickleReaders() - t.iterPeers(func(c *Peer) { - c.updateRequests() - }) + t.dataDownloadDisallowed.Clear() } // Enables uploading data, if it was disabled. diff --git a/tracker_scraper.go b/tracker_scraper.go index 21c4111b..c339c2b4 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -216,7 +216,6 @@ func (me *trackerScraper) Run() { me.t.cl.lock() wantPeers := me.t.wantPeersEvent.C() - closed := me.t.closed.C() me.t.cl.unlock() // If we want peers, reduce the interval to the minimum if it's appropriate. @@ -234,7 +233,7 @@ func (me *trackerScraper) Run() { } select { - case <-closed: + case <-me.t.closed.Done(): return case <-reconsider: // Recalculate the interval.