"net/url"
"sort"
"strings"
- "sync"
"text/tabwriter"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/chansync"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
cl *Client
logger log.Logger
- networkingEnabled bool
- dataDownloadDisallowed bool
+ networkingEnabled chansync.Flag
+ dataDownloadDisallowed chansync.Flag
dataUploadDisallowed bool
userOnWriteChunkErr func(error)
- closed missinggo.Event
+ closed chansync.SetOnce
infoHash metainfo.Hash
pieces []Piece
// Values are the piece indices that changed.
return &t._pendingPieces
}
-func (t *Torrent) tickleReaders() {
- t.cl.event.Broadcast()
-}
-
// Returns a channel that is closed when the Torrent is closed.
-func (t *Torrent) Closed() <-chan struct{} {
- return t.closed.LockedChan(t.cl.locker())
+func (t *Torrent) Closed() chansync.Done {
+ return t.closed.Done()
}
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
func (t *Torrent) close() (err error) {
t.closed.Set()
- t.tickleReaders()
if t.storage != nil {
go func() {
t.storageLock.Lock()
}
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
- t.tickleReaders()
t.cl.event.Broadcast()
if t.pieceComplete(piece) {
t.onPieceCompleted(piece)
func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
wtc, release := t.cl.websocketTrackers.Get(u.String())
go func() {
- <-t.closed.LockedChan(t.cl.locker())
+ <-t.closed.Done()
release()
}()
wst := websocketTrackerStatus{u, wtc}
return err
}
select {
- case <-t.closed.LockedChan(t.cl.locker()):
+ case <-t.closed.Done():
case <-time.After(5 * time.Minute):
}
stop()
}
func (t *Torrent) wantConns() bool {
- if !t.networkingEnabled {
+ if !t.networkingEnabled.Bool() {
return false
}
if t.closed.IsSet() {
func (t *Torrent) onPieceCompleted(piece pieceIndex) {
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
+ t.piece(piece).readerCond.Broadcast()
for conn := range t.conns {
conn.have(piece)
t.maybeDropMutuallyCompletePeer(&conn.Peer)
}
func (t *Torrent) DisallowDataDownload() {
- t.cl.lock()
- defer t.cl.unlock()
t.disallowDataDownloadLocked()
}
func (t *Torrent) disallowDataDownloadLocked() {
- t.dataDownloadDisallowed = true
- t.iterPeers(func(c *Peer) {
- c.updateRequests()
- })
- t.tickleReaders()
+ t.dataDownloadDisallowed.Set()
}
func (t *Torrent) AllowDataDownload() {
- t.cl.lock()
- defer t.cl.unlock()
- t.dataDownloadDisallowed = false
- t.tickleReaders()
- t.iterPeers(func(c *Peer) {
- c.updateRequests()
- })
+ t.dataDownloadDisallowed.Clear()
}
// Enables uploading data, if it was disabled.