]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Make Client.Close more responsive
authorMatt Joiner <anacrolix@gmail.com>
Fri, 1 Aug 2025 03:04:30 +0000 (13:04 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 1 Aug 2025 03:04:30 +0000 (13:04 +1000)
Stop updating webseed requests ASAP.
Set the Client closed event outside of the Client lock.
Make Client initialization more consistent for Close.

client.go
config.go
issue97_test.go
justfile
peerconn_test.go
piece.go
storage/file-piece.go
test_test.go
torrent.go
torrent_test.go
webseed-requesting.go

index 3065e5d19493d3a65f7c4e5c5e11def7a896ce7c..e4daa4609cfd589662c2f215f5737c178e959ab0 100644 (file)
--- a/client.go
+++ b/client.go
@@ -279,39 +279,81 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
        }
        cl = &Client{}
        cl.init(cfg)
-       go cl.acceptLimitClearer()
-       cl.initLogger()
-       //cl.logger.Levelf(log.Critical, "test after init")
+       // Belongs after infallible init
        defer func() {
                if err != nil {
                        cl.Close()
                        cl = nil
                }
        }()
+       // Infallible init. Belongs in separate function.
+       {
+               go cl.acceptLimitClearer()
+               cl.initLogger()
+               //cl.logger.Levelf(log.Critical, "test after init")
+
+               storageImpl := cfg.DefaultStorage
+               if storageImpl == nil {
+                       // We'd use mmap by default but HFS+ doesn't support sparse files.
+                       storageImplCloser := storage.NewFile(cfg.DataDir)
+                       cl.onClose = append(cl.onClose, func() {
+                               if err := storageImplCloser.Close(); err != nil {
+                                       cl.logger.Printf("error closing default storage: %s", err)
+                               }
+                       })
+                       storageImpl = storageImplCloser
+               }
+               cl.defaultStorage = storage.NewClient(storageImpl)
 
-       storageImpl := cfg.DefaultStorage
-       if storageImpl == nil {
-               // We'd use mmap by default but HFS+ doesn't support sparse files.
-               storageImplCloser := storage.NewFile(cfg.DataDir)
-               cl.onClose = append(cl.onClose, func() {
-                       if err := storageImplCloser.Close(); err != nil {
-                               cl.logger.Printf("error closing default storage: %s", err)
+               if cfg.PeerID != "" {
+                       missinggo.CopyExact(&cl.peerID, cfg.PeerID)
+               } else {
+                       o := copy(cl.peerID[:], cfg.Bep20)
+                       _, err = rand.Read(cl.peerID[o:])
+                       if err != nil {
+                               panic("error generating peer id")
                        }
-               })
-               storageImpl = storageImplCloser
-       }
-       cl.defaultStorage = storage.NewClient(storageImpl)
+               }
 
-       if cfg.PeerID != "" {
-               missinggo.CopyExact(&cl.peerID, cfg.PeerID)
-       } else {
-               o := copy(cl.peerID[:], cfg.Bep20)
-               _, err = rand.Read(cl.peerID[o:])
-               if err != nil {
-                       panic("error generating peer id")
+               cl.websocketTrackers = websocketTrackers{
+                       PeerId: cl.peerID,
+                       Logger: cl.logger.WithNames("websocketTrackers"),
+                       GetAnnounceRequest: func(
+                               event tracker.AnnounceEvent, infoHash [20]byte,
+                       ) (
+                               tracker.AnnounceRequest, error,
+                       ) {
+                               cl.lock()
+                               defer cl.unlock()
+                               t, ok := cl.torrentsByShortHash[infoHash]
+                               if !ok {
+                                       return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
+                               }
+                               return t.announceRequest(event, infoHash), nil
+                       },
+                       Proxy:                      cl.config.HTTPProxy,
+                       WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
+                       ICEServers:                 cl.ICEServers(),
+                       DialContext:                cl.config.TrackerDialContext,
+                       callbacks:                  &cl.config.Callbacks,
+                       OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
+                               cl.lock()
+                               defer cl.unlock()
+                               t, ok := cl.torrentsByShortHash[dcc.InfoHash]
+                               if !ok {
+                                       cl.logger.WithDefaultLevel(log.Warning).Printf(
+                                               "got webrtc conn for unloaded torrent with infohash %x",
+                                               dcc.InfoHash,
+                                       )
+                                       dc.Close()
+                                       return
+                               }
+                               go t.onWebRtcConn(dc, dcc)
+                       },
                }
-       }
 
+               cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
+       }
        builtinListenNetworks := cl.listenNetworks()
        sockets, err := listenAll(
                builtinListenNetworks,
@@ -358,45 +400,6 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                }
        }
 
-       cl.websocketTrackers = websocketTrackers{
-               PeerId: cl.peerID,
-               Logger: cl.logger.WithNames("websocketTrackers"),
-               GetAnnounceRequest: func(
-                       event tracker.AnnounceEvent, infoHash [20]byte,
-               ) (
-                       tracker.AnnounceRequest, error,
-               ) {
-                       cl.lock()
-                       defer cl.unlock()
-                       t, ok := cl.torrentsByShortHash[infoHash]
-                       if !ok {
-                               return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
-                       }
-                       return t.announceRequest(event, infoHash), nil
-               },
-               Proxy:                      cl.config.HTTPProxy,
-               WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
-               ICEServers:                 cl.ICEServers(),
-               DialContext:                cl.config.TrackerDialContext,
-               callbacks:                  &cl.config.Callbacks,
-               OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
-                       cl.lock()
-                       defer cl.unlock()
-                       t, ok := cl.torrentsByShortHash[dcc.InfoHash]
-                       if !ok {
-                               cl.logger.WithDefaultLevel(log.Warning).Printf(
-                                       "got webrtc conn for unloaded torrent with infohash %x",
-                                       dcc.InfoHash,
-                               )
-                               dc.Close()
-                               return
-                       }
-                       go t.onWebRtcConn(dc, dcc)
-               },
-       }
-
-       cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
-
        err = cl.checkConfig()
        return
 }
@@ -505,6 +508,9 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) {
 
 // Stops the client. All connections to peers are closed and all activity will come to a halt.
 func (cl *Client) Close() (errs []error) {
+       // Close atomically, allow systems to break early if we're contended on the Client lock.
+       cl.closed.Set()
+       cl.webseedRequestTimer.Stop()
        var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
        cl.lock()
        for t := range cl.torrents {
@@ -520,7 +526,6 @@ func (cl *Client) Close() (errs []error) {
        for i := range cl.onClose {
                cl.onClose[len(cl.onClose)-1-i]()
        }
-       cl.closed.Set()
        cl.unlock()
        cl.event.Broadcast()
        closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
index 817856c4aaec997e7cbc2633e35887fd8b14a7c7..a0b1e0822f926355e6e5b0425252ac1cd5766a96 100644 (file)
--- a/config.go
+++ b/config.go
@@ -51,7 +51,7 @@ type ClientConfig struct {
        ClientTrackerConfig
        ClientDhtConfig
 
-       // Store torrent file data in this directory unless .DefaultStorage is
+       // Store torrent file data in this directory unless DefaultStorage is
        // specified.
        DataDir string `long:"data-dir" description:"directory to store downloaded torrent data"`
        // The address to listen for new uTP and TCP BitTorrent protocol connections. DHT shares a UDP
index 8453fad1cdc48efa7cf6a872c88d2cf9f4e6fb2b..b933784638c9df8b9f3a7d9bc75b7dae1eea9cd9 100644 (file)
@@ -17,9 +17,7 @@ func TestHashPieceAfterStorageClosed(t *testing.T) {
        defer cs.Close()
        tt := cl.newTorrent(metainfo.Hash{1}, cs)
        mi := testutil.GreetingMetaInfo()
-       info, err := mi.UnmarshalInfo()
-       require.NoError(t, err)
-       require.NoError(t, tt.setInfo(&info))
+       require.NoError(t, tt.SetInfoBytes(mi.InfoBytes))
+       go tt.piece(0).VerifyDataContext(t.Context())
        require.NoError(t, tt.storage.Close())
-       tt.hashPiece(0)
 }
index 9a52ffbaedb67892ff9b91e183b19f3a2ea62e51..9484a96e8f88f4944a21a42665f80a8d1578ac62 100644 (file)
--- a/justfile
+++ b/justfile
@@ -5,7 +5,7 @@ act:
     act -j test --matrix go-version:'1.24' --env-file .empty.env
 
 test: build-possum
-    go test -race ./...
+    go test -race -failfast ./...
 
 build-possum:
     cd storage/possum/lib && cargo build
index 396948538842230508e3be5baaadd989c6ae6b58..7c324a1f6a748cda95fb1b529d073ef782833639 100644 (file)
@@ -11,7 +11,7 @@ import (
        "testing"
 
        g "github.com/anacrolix/generics"
-       qt "github.com/go-quicktest/qt"
+       "github.com/go-quicktest/qt"
        "github.com/stretchr/testify/require"
        "golang.org/x/time/rate"
 
@@ -279,11 +279,12 @@ func TestHaveAllThenBitfield(t *testing.T) {
        pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
        qt.Check(t, qt.Equals(pc.peerMinPieces, 6))
        qt.Check(t, qt.HasLen(pc.t.connsWithAllPieces, 0))
-       qt.Assert(t, qt.IsNil(pc.t.setInfo(&metainfo.Info{
-               PieceLength: 0,
+       qt.Assert(t, qt.IsNil(pc.t.setInfoUnlocked(&metainfo.Info{
+               Name:        "herp",
+               Length:      7,
+               PieceLength: 1,
                Pieces:      make([]byte, pieceHash.Size()*7),
        })))
-       pc.t.onSetInfo()
        qt.Check(t, qt.Equals(tt.numPieces(), 7))
        qt.Check(t, qt.DeepEquals(tt.pieceAvailabilityRuns(), []pieceAvailabilityRun{
                // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
index 12d28be2d55043ae82eb9219ea8a75ab0ec0103b..113e934568d8eba9f7b3cb1fb377b7e4bbc2a03a 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -228,6 +228,9 @@ func (p *Piece) VerifyData() error {
 func (p *Piece) VerifyDataContext(ctx context.Context) error {
        locker := p.t.cl.locker()
        locker.Lock()
+       if p.t.closed.IsSet() {
+               return errTorrentClosed
+       }
        target, err := p.t.queuePieceCheck(p.index)
        locker.Unlock()
        if err != nil {
index f24bbfc4a5d678ab6582b86e3f27e890fff0c054..cad8596e0c7ee6ab3dda80e7986d7a63ee0598d2 100644 (file)
@@ -75,16 +75,15 @@ func (me *filePieceImpl) Completion() (c Completion) {
 
 func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] {
        return func(yield func(int, segments.Extent) bool) {
+               pieceExtent := me.extent()
                noFiles := true
-               for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) {
+               for i, extent := range me.t.segmentLocater.LocateIter(pieceExtent) {
                        noFiles = false
                        if !yield(i, extent) {
                                return
                        }
                }
-               if noFiles {
-                       panic("files do not cover piece extent")
-               }
+               panicif.NotEq(noFiles, pieceExtent.Length == 0)
        }
 }
 
index 11ebb3b3d11fe65f69966f28499de59ab389e6d1..c65039004288269f155c43a7ea6a5837975d4da8 100644 (file)
@@ -5,16 +5,17 @@ package torrent
 import (
        "testing"
 
+       "github.com/go-quicktest/qt"
+
        "github.com/anacrolix/torrent/metainfo"
 )
 
 func newTestingClient(t testing.TB) *Client {
-       cl := new(Client)
-       cl.init(TestingConfig(t))
+       cl, err := NewClient(TestingConfig(t))
+       qt.Assert(t, qt.IsNil(err))
        t.Cleanup(func() {
                cl.Close()
        })
-       cl.initLogger()
        return cl
 }
 
index e684360b71cc39a7f7af2f1061fc245584313704..014a443e2f16333f8682319a33446022e3dc9ab0 100644 (file)
@@ -659,6 +659,18 @@ func (t *Torrent) setInfoBytesLocked(b []byte) (err error) {
        return nil
 }
 
+// Used in tests.
+func (t *Torrent) setInfoUnlocked(info *metainfo.Info) (err error) {
+       t.cl.lock()
+       defer t.cl.unlock()
+       err = t.setInfo(info)
+       if err != nil {
+               return
+       }
+       t.onSetInfo()
+       return
+}
+
 func (t *Torrent) haveAllMetadataPieces() bool {
        if t.haveInfo() {
                return true
@@ -1700,9 +1712,12 @@ func (t *Torrent) openNewConns() (initiated int) {
 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        p := t.piece(piece)
        uncached := t.pieceCompleteUncached(piece)
-       // This isn't being handled. Here we should probably be storing Option[bool] for completion and
-       // filtering out errors. Also, errors should probably disable downloading here too.
-       panicif.Err(uncached.Err)
+       if uncached.Err != nil {
+               t.slogger().Error("error getting piece completion", "err", uncached.Err)
+               t.disallowDataDownloadLocked()
+       }
+       // TODO: Here we should probably be storing Option[bool] for completion and filtering out
+       // errors.
        cached := p.completion()
        changed := cached != uncached
        complete := uncached.Ok && uncached.Complete
index 6a3d7ca1f395971d096ea8c93c9fd5c253f47fcf..9854f800f4cfcaa99281c508cabbc860e5eee753 100644 (file)
@@ -13,7 +13,7 @@ import (
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
-       qt "github.com/go-quicktest/qt"
+       "github.com/go-quicktest/qt"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
@@ -227,7 +227,7 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
 func TestRelativeAvailabilityHaveNone(t *testing.T) {
        var err error
        cl := newTestingClient(t)
-       mi, info := testutil.Greeting.Generate(5)
+       mi, _ := testutil.Greeting.Generate(5)
        tt := cl.newTorrentOpt(AddTorrentOpts{InfoHash: mi.HashInfoBytes()})
        tt.setChunkSize(2)
        g.MakeMapIfNil(&tt.conns)
@@ -236,13 +236,16 @@ func TestRelativeAvailabilityHaveNone(t *testing.T) {
        pc.legacyPeerImpl = &pc
        pc.initRequestState()
        g.InitNew(&pc.callbacks)
+       tt.cl.lock()
        tt.conns[&pc] = struct{}{}
        err = pc.peerSentHave(0)
+       tt.cl.unlock()
        qt.Assert(t, qt.IsNil(err))
-       err = tt.setInfo(&info)
+       err = tt.SetInfoBytes(mi.InfoBytes)
        qt.Assert(t, qt.IsNil(err))
-       tt.onSetInfo()
+       tt.cl.lock()
        err = pc.peerSentHaveNone()
+       tt.cl.unlock()
        qt.Assert(t, qt.IsNil(err))
        var wg sync.WaitGroup
        tt.close(&wg)
index 4628658e8da7acd664b88d90a7fee01446b680be..c980e29681a87e68a8f853cd0959c8b631a844d9 100644 (file)
@@ -398,6 +398,9 @@ func (cl *Client) scheduleImmediateWebseedRequestUpdate() {
 }
 
 func (cl *Client) updateWebseedRequestsTimerFunc() {
+       if cl.closed.IsSet() {
+               return
+       }
        cl.lock()
        defer cl.unlock()
        cl.updateWebseedRequestsAndResetTimer()