storageOpener: storageClient,
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
- networkingEnabled: true,
metadataChanged: sync.Cond{
L: cl.locker(),
},
webSeeds: make(map[string]*Peer),
}
+ t.networkingEnabled.Set()
t._pendingPieces.NewSet = priorityBitmapStableNewSet
t.logger = cl.logger.WithContextValue(t)
t.setChunkSize(defaultChunkSize)
}
t.addTrackers(spec.Trackers)
t.maybeNewConns()
- t.dataDownloadDisallowed = spec.DisallowDataDownload
+ t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
t.dataUploadDisallowed = spec.DisallowDataUpload
return nil
}
}
// Return the request that would include the given offset into the torrent data.
-func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
- r Request, ok bool) {
+func torrentOffsetRequest(
+ torrentLength, pieceSize, chunkSize, offset int64,
+) (
+ r Request, ok bool,
+) {
if offset < 0 || offset >= torrentLength {
return
}
"fmt"
"sync"
+ "github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/torrent/metainfo"
// length can be determined by the request chunkSize in use.
_dirtyChunks bitmap.Bitmap
+ readerCond chansync.BroadcastCond
+
numVerifies int64
hashing bool
marking bool
func (p *Piece) unpendChunkIndex(i int) {
p._dirtyChunks.Add(bitmap.BitIndex(i))
- p.t.tickleReaders()
+ p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i int) {
return
}
-func (r *reader) waitReadable(off int64) {
- // We may have been sent back here because we were told we could read but it failed.
- r.t.cl.event.Wait()
-}
-
// Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces.
func (r *reader) piecesUncached() (ret pieceRange) {
ra := r.readahead
}
func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
- // This is set under the Client lock if the Context is canceled. I think we coordinate on a
- // separate variable so as to avoid false negatives with race conditions due to Contexts being
- // synchronized.
- var ctxErr error
- if ctx.Done() != nil {
- ctx, cancel := context.WithCancel(ctx)
- // Abort the goroutine when the function returns.
- defer cancel()
- go func() {
- <-ctx.Done()
- r.t.cl.lock()
- ctxErr = ctx.Err()
- r.t.tickleReaders()
- r.t.cl.unlock()
- }()
- }
// Hmmm, if a Read gets stuck, this means you can't change position for other purposes. That
// seems reasonable, but unusual.
r.opMu.Lock()
defer r.opMu.Unlock()
- n, err = r.readOnceAt(b, r.pos, &ctxErr)
+ n, err = r.readOnceAt(ctx, b, r.pos)
if n == 0 {
if err == nil && len(b) > 0 {
panic("expected error")
return
}
+var closedChan = make(chan struct{})
+
+func init() {
+ close(closedChan)
+}
+
// Wait until some data should be available to read. Tickles the client if it isn't. Returns how
// much should be readable without blocking.
-func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
- r.t.cl.lock()
- defer r.t.cl.unlock()
+func (r *reader) waitAvailable(ctx context.Context, pos, wanted int64, wait bool) (avail int64, err error) {
+ t := r.t
for {
+ r.t.cl.rLock()
avail = r.available(pos, wanted)
+ readerCond := t.piece(int((r.offset + pos) / t.info.PieceLength)).readerCond.Signaled()
+ r.t.cl.rUnlock()
if avail != 0 {
return
}
- if r.t.closed.IsSet() {
+ var dontWait <-chan struct{}
+ if !wait || wanted == 0 {
+ dontWait = closedChan
+ }
+ select {
+ case <-r.t.closed.Done():
err = errors.New("torrent closed")
return
- }
- if *ctxErr != nil {
- err = *ctxErr
+ case <-ctx.Done():
+ err = ctx.Err()
return
- }
- if r.t.dataDownloadDisallowed || !r.t.networkingEnabled {
- err = errors.New("downloading disabled and data not already available")
+ case <-r.t.dataDownloadDisallowed.On():
+ err = errors.New("torrent data downloading disabled")
+ case <-r.t.networkingEnabled.Off():
+ err = errors.New("torrent networking disabled")
return
- }
- if !wait || wanted == 0 {
+ case <-dontWait:
return
+ case <-readerCond:
}
- r.waitReadable(pos)
}
}
}
// Performs at most one successful read to torrent storage.
-func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
+func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
if pos >= r.length {
err = io.EOF
return
}
for {
var avail int64
- avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
+ avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
if avail == 0 {
return
}
"net/url"
"sort"
"strings"
- "sync"
"text/tabwriter"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/chansync"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
cl *Client
logger log.Logger
- networkingEnabled bool
- dataDownloadDisallowed bool
+ networkingEnabled chansync.Flag
+ dataDownloadDisallowed chansync.Flag
dataUploadDisallowed bool
userOnWriteChunkErr func(error)
- closed missinggo.Event
+ closed chansync.SetOnce
infoHash metainfo.Hash
pieces []Piece
// Values are the piece indices that changed.
return &t._pendingPieces
}
-func (t *Torrent) tickleReaders() {
- t.cl.event.Broadcast()
-}
-
// Returns a channel that is closed when the Torrent is closed.
-func (t *Torrent) Closed() <-chan struct{} {
- return t.closed.LockedChan(t.cl.locker())
+func (t *Torrent) Closed() chansync.Done {
+ return t.closed.Done()
}
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
func (t *Torrent) close() (err error) {
t.closed.Set()
- t.tickleReaders()
if t.storage != nil {
go func() {
t.storageLock.Lock()
}
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
- t.tickleReaders()
t.cl.event.Broadcast()
if t.pieceComplete(piece) {
t.onPieceCompleted(piece)
func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
wtc, release := t.cl.websocketTrackers.Get(u.String())
go func() {
- <-t.closed.LockedChan(t.cl.locker())
+ <-t.closed.Done()
release()
}()
wst := websocketTrackerStatus{u, wtc}
return err
}
select {
- case <-t.closed.LockedChan(t.cl.locker()):
+ case <-t.closed.Done():
case <-time.After(5 * time.Minute):
}
stop()
}
func (t *Torrent) wantConns() bool {
- if !t.networkingEnabled {
+ if !t.networkingEnabled.Bool() {
return false
}
if t.closed.IsSet() {
func (t *Torrent) onPieceCompleted(piece pieceIndex) {
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
+ t.piece(piece).readerCond.Broadcast()
for conn := range t.conns {
conn.have(piece)
t.maybeDropMutuallyCompletePeer(&conn.Peer)
}
func (t *Torrent) DisallowDataDownload() {
- t.cl.lock()
- defer t.cl.unlock()
t.disallowDataDownloadLocked()
}
func (t *Torrent) disallowDataDownloadLocked() {
- t.dataDownloadDisallowed = true
- t.iterPeers(func(c *Peer) {
- c.updateRequests()
- })
- t.tickleReaders()
+ t.dataDownloadDisallowed.Set()
}
func (t *Torrent) AllowDataDownload() {
- t.cl.lock()
- defer t.cl.unlock()
- t.dataDownloadDisallowed = false
- t.tickleReaders()
- t.iterPeers(func(c *Peer) {
- c.updateRequests()
- })
+ t.dataDownloadDisallowed.Clear()
}
// Enables uploading data, if it was disabled.
me.t.cl.lock()
wantPeers := me.t.wantPeersEvent.C()
- closed := me.t.closed.C()
me.t.cl.unlock()
// If we want peers, reduce the interval to the minimum if it's appropriate.
}
select {
- case <-closed:
+ case <-me.t.closed.Done():
return
case <-reconsider:
// Recalculate the interval.