From 065c62e6a81e9ea2eb103062c7e6b6cadb145815 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 1 Aug 2025 13:04:30 +1000 Subject: [PATCH] Make Client.Close more responsive Stop updating webseed requests ASAP. Set the Client closed event outside of the Client lock. Make Client initialization more consistent for Close. --- client.go | 129 ++++++++++++++++++++++-------------------- config.go | 2 +- issue97_test.go | 6 +- justfile | 2 +- peerconn_test.go | 9 +-- piece.go | 3 + storage/file-piece.go | 7 +-- test_test.go | 7 ++- torrent.go | 21 ++++++- torrent_test.go | 11 ++-- webseed-requesting.go | 3 + 11 files changed, 114 insertions(+), 86 deletions(-) diff --git a/client.go b/client.go index 3065e5d1..e4daa460 100644 --- a/client.go +++ b/client.go @@ -279,39 +279,81 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } cl = &Client{} cl.init(cfg) - go cl.acceptLimitClearer() - cl.initLogger() - //cl.logger.Levelf(log.Critical, "test after init") + // Belongs after infallible init defer func() { if err != nil { cl.Close() cl = nil } }() + // Infallible init. Belongs in separate function. + { + go cl.acceptLimitClearer() + cl.initLogger() + //cl.logger.Levelf(log.Critical, "test after init") + + storageImpl := cfg.DefaultStorage + if storageImpl == nil { + // We'd use mmap by default but HFS+ doesn't support sparse files. + storageImplCloser := storage.NewFile(cfg.DataDir) + cl.onClose = append(cl.onClose, func() { + if err := storageImplCloser.Close(); err != nil { + cl.logger.Printf("error closing default storage: %s", err) + } + }) + storageImpl = storageImplCloser + } + cl.defaultStorage = storage.NewClient(storageImpl) - storageImpl := cfg.DefaultStorage - if storageImpl == nil { - // We'd use mmap by default but HFS+ doesn't support sparse files. - storageImplCloser := storage.NewFile(cfg.DataDir) - cl.onClose = append(cl.onClose, func() { - if err := storageImplCloser.Close(); err != nil { - cl.logger.Printf("error closing default storage: %s", err) + if cfg.PeerID != "" { + missinggo.CopyExact(&cl.peerID, cfg.PeerID) + } else { + o := copy(cl.peerID[:], cfg.Bep20) + _, err = rand.Read(cl.peerID[o:]) + if err != nil { + panic("error generating peer id") } - }) - storageImpl = storageImplCloser - } - cl.defaultStorage = storage.NewClient(storageImpl) + } - if cfg.PeerID != "" { - missinggo.CopyExact(&cl.peerID, cfg.PeerID) - } else { - o := copy(cl.peerID[:], cfg.Bep20) - _, err = rand.Read(cl.peerID[o:]) - if err != nil { - panic("error generating peer id") + cl.websocketTrackers = websocketTrackers{ + PeerId: cl.peerID, + Logger: cl.logger.WithNames("websocketTrackers"), + GetAnnounceRequest: func( + event tracker.AnnounceEvent, infoHash [20]byte, + ) ( + tracker.AnnounceRequest, error, + ) { + cl.lock() + defer cl.unlock() + t, ok := cl.torrentsByShortHash[infoHash] + if !ok { + return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") + } + return t.announceRequest(event, infoHash), nil + }, + Proxy: cl.config.HTTPProxy, + WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, + ICEServers: cl.ICEServers(), + DialContext: cl.config.TrackerDialContext, + callbacks: &cl.config.Callbacks, + OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { + cl.lock() + defer cl.unlock() + t, ok := cl.torrentsByShortHash[dcc.InfoHash] + if !ok { + cl.logger.WithDefaultLevel(log.Warning).Printf( + "got webrtc conn for unloaded torrent with infohash %x", + dcc.InfoHash, + ) + dc.Close() + return + } + go t.onWebRtcConn(dc, dcc) + }, } - } + cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc) + } builtinListenNetworks := cl.listenNetworks() sockets, err := listenAll( builtinListenNetworks, @@ -358,45 +400,6 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } } - cl.websocketTrackers = websocketTrackers{ - PeerId: cl.peerID, - Logger: cl.logger.WithNames("websocketTrackers"), - GetAnnounceRequest: func( - event tracker.AnnounceEvent, infoHash [20]byte, - ) ( - tracker.AnnounceRequest, error, - ) { - cl.lock() - defer cl.unlock() - t, ok := cl.torrentsByShortHash[infoHash] - if !ok { - return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") - } - return t.announceRequest(event, infoHash), nil - }, - Proxy: cl.config.HTTPProxy, - WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, - ICEServers: cl.ICEServers(), - DialContext: cl.config.TrackerDialContext, - callbacks: &cl.config.Callbacks, - OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { - cl.lock() - defer cl.unlock() - t, ok := cl.torrentsByShortHash[dcc.InfoHash] - if !ok { - cl.logger.WithDefaultLevel(log.Warning).Printf( - "got webrtc conn for unloaded torrent with infohash %x", - dcc.InfoHash, - ) - dc.Close() - return - } - go t.onWebRtcConn(dc, dcc) - }, - } - - cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc) - err = cl.checkConfig() return } @@ -505,6 +508,9 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) { // Stops the client. All connections to peers are closed and all activity will come to a halt. func (cl *Client) Close() (errs []error) { + // Close atomically, allow systems to break early if we're contended on the Client lock. + cl.closed.Set() + cl.webseedRequestTimer.Stop() var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() for t := range cl.torrents { @@ -520,7 +526,6 @@ func (cl *Client) Close() (errs []error) { for i := range cl.onClose { cl.onClose[len(cl.onClose)-1-i]() } - cl.closed.Set() cl.unlock() cl.event.Broadcast() closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock() diff --git a/config.go b/config.go index 817856c4..a0b1e082 100644 --- a/config.go +++ b/config.go @@ -51,7 +51,7 @@ type ClientConfig struct { ClientTrackerConfig ClientDhtConfig - // Store torrent file data in this directory unless .DefaultStorage is + // Store torrent file data in this directory unless DefaultStorage is // specified. DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"` // The address to listen for new uTP and TCP BitTorrent protocol connections. DHT shares a UDP diff --git a/issue97_test.go b/issue97_test.go index 8453fad1..b9337846 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -17,9 +17,7 @@ func TestHashPieceAfterStorageClosed(t *testing.T) { defer cs.Close() tt := cl.newTorrent(metainfo.Hash{1}, cs) mi := testutil.GreetingMetaInfo() - info, err := mi.UnmarshalInfo() - require.NoError(t, err) - require.NoError(t, tt.setInfo(&info)) + require.NoError(t, tt.SetInfoBytes(mi.InfoBytes)) + go tt.piece(0).VerifyDataContext(t.Context()) require.NoError(t, tt.storage.Close()) - tt.hashPiece(0) } diff --git a/justfile b/justfile index 9a52ffba..9484a96e 100644 --- a/justfile +++ b/justfile @@ -5,7 +5,7 @@ act: act -j test --matrix go-version:'1.24' --env-file .empty.env test: build-possum - go test -race ./... + go test -race -failfast ./... build-possum: cd storage/possum/lib && cargo build diff --git a/peerconn_test.go b/peerconn_test.go index 39694853..7c324a1f 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -11,7 +11,7 @@ import ( "testing" g "github.com/anacrolix/generics" - qt "github.com/go-quicktest/qt" + "github.com/go-quicktest/qt" "github.com/stretchr/testify/require" "golang.org/x/time/rate" @@ -279,11 +279,12 @@ func TestHaveAllThenBitfield(t *testing.T) { pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) qt.Check(t, qt.Equals(pc.peerMinPieces, 6)) qt.Check(t, qt.HasLen(pc.t.connsWithAllPieces, 0)) - qt.Assert(t, qt.IsNil(pc.t.setInfo(&metainfo.Info{ - PieceLength: 0, + qt.Assert(t, qt.IsNil(pc.t.setInfoUnlocked(&metainfo.Info{ + Name: "herp", + Length: 7, + PieceLength: 1, Pieces: make([]byte, pieceHash.Size()*7), }))) - pc.t.onSetInfo() qt.Check(t, qt.Equals(tt.numPieces(), 7)) qt.Check(t, qt.DeepEquals(tt.pieceAvailabilityRuns(), []pieceAvailabilityRun{ // The last element of the bitfield is irrelevant, as the Torrent actually only has 7 diff --git a/piece.go b/piece.go index 12d28be2..113e9345 100644 --- a/piece.go +++ b/piece.go @@ -228,6 +228,9 @@ func (p *Piece) VerifyData() error { func (p *Piece) VerifyDataContext(ctx context.Context) error { locker := p.t.cl.locker() locker.Lock() + if p.t.closed.IsSet() { + return errTorrentClosed + } target, err := p.t.queuePieceCheck(p.index) locker.Unlock() if err != nil { diff --git a/storage/file-piece.go b/storage/file-piece.go index f24bbfc4..cad8596e 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -75,16 +75,15 @@ func (me *filePieceImpl) Completion() (c Completion) { func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] { return func(yield func(int, segments.Extent) bool) { + pieceExtent := me.extent() noFiles := true - for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) { + for i, extent := range me.t.segmentLocater.LocateIter(pieceExtent) { noFiles = false if !yield(i, extent) { return } } - if noFiles { - panic("files do not cover piece extent") - } + panicif.NotEq(noFiles, pieceExtent.Length == 0) } } diff --git a/test_test.go b/test_test.go index 11ebb3b3..c6503900 100644 --- a/test_test.go +++ b/test_test.go @@ -5,16 +5,17 @@ package torrent import ( "testing" + "github.com/go-quicktest/qt" + "github.com/anacrolix/torrent/metainfo" ) func newTestingClient(t testing.TB) *Client { - cl := new(Client) - cl.init(TestingConfig(t)) + cl, err := NewClient(TestingConfig(t)) + qt.Assert(t, qt.IsNil(err)) t.Cleanup(func() { cl.Close() }) - cl.initLogger() return cl } diff --git a/torrent.go b/torrent.go index e684360b..014a443e 100644 --- a/torrent.go +++ b/torrent.go @@ -659,6 +659,18 @@ func (t *Torrent) setInfoBytesLocked(b []byte) (err error) { return nil } +// Used in tests. +func (t *Torrent) setInfoUnlocked(info *metainfo.Info) (err error) { + t.cl.lock() + defer t.cl.unlock() + err = t.setInfo(info) + if err != nil { + return + } + t.onSetInfo() + return +} + func (t *Torrent) haveAllMetadataPieces() bool { if t.haveInfo() { return true @@ -1700,9 +1712,12 @@ func (t *Torrent) openNewConns() (initiated int) { func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { p := t.piece(piece) uncached := t.pieceCompleteUncached(piece) - // This isn't being handled. Here we should probably be storing Option[bool] for completion and - // filtering out errors. Also, errors should probably disable downloading here too. - panicif.Err(uncached.Err) + if uncached.Err != nil { + t.slogger().Error("error getting piece completion", "err", uncached.Err) + t.disallowDataDownloadLocked() + } + // TODO: Here we should probably be storing Option[bool] for completion and filtering out + // errors. cached := p.completion() changed := cached != uncached complete := uncached.Ok && uncached.Complete diff --git a/torrent_test.go b/torrent_test.go index 6a3d7ca1..9854f800 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -13,7 +13,7 @@ import ( g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" - qt "github.com/go-quicktest/qt" + "github.com/go-quicktest/qt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -227,7 +227,7 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { func TestRelativeAvailabilityHaveNone(t *testing.T) { var err error cl := newTestingClient(t) - mi, info := testutil.Greeting.Generate(5) + mi, _ := testutil.Greeting.Generate(5) tt := cl.newTorrentOpt(AddTorrentOpts{InfoHash: mi.HashInfoBytes()}) tt.setChunkSize(2) g.MakeMapIfNil(&tt.conns) @@ -236,13 +236,16 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) { pc.legacyPeerImpl = &pc pc.initRequestState() g.InitNew(&pc.callbacks) + tt.cl.lock() tt.conns[&pc] = struct{}{} err = pc.peerSentHave(0) + tt.cl.unlock() qt.Assert(t, qt.IsNil(err)) - err = tt.setInfo(&info) + err = tt.SetInfoBytes(mi.InfoBytes) qt.Assert(t, qt.IsNil(err)) - tt.onSetInfo() + tt.cl.lock() err = pc.peerSentHaveNone() + tt.cl.unlock() qt.Assert(t, qt.IsNil(err)) var wg sync.WaitGroup tt.close(&wg) diff --git a/webseed-requesting.go b/webseed-requesting.go index 4628658e..c980e296 100644 --- a/webseed-requesting.go +++ b/webseed-requesting.go @@ -398,6 +398,9 @@ func (cl *Client) scheduleImmediateWebseedRequestUpdate() { } func (cl *Client) updateWebseedRequestsTimerFunc() { + if cl.closed.IsSet() { + return + } cl.lock() defer cl.unlock() cl.updateWebseedRequestsAndResetTimer() -- 2.51.0