From: Matt Joiner Date: Thu, 1 Jun 2017 12:57:08 +0000 (+1000) Subject: Close implicit Client default storage on Client.Close X-Git-Tag: v1.0.0~464 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=49648b9ae1a9d61f24df3c4c5ca6831b554c3d50;p=btrtrc.git Close implicit Client default storage on Client.Close Fixes #158 --- diff --git a/client.go b/client.go index a24e0aeb..d444fe7e 100644 --- a/client.go +++ b/client.go @@ -46,6 +46,7 @@ type Client struct { halfOpenLimit int peerID [20]byte defaultStorage *storage.Client + onClose []func() tcpListener net.Listener utpSock *utp.Socket dHT *dht.Server @@ -238,6 +239,12 @@ func NewClient(cfg *Config) (cl *Client, err error) { dopplegangerAddrs: make(map[string]struct{}), torrents: make(map[metainfo.Hash]*Torrent), } + defer func() { + if err == nil { + return + } + cl.Close() + }() if cfg.UploadRateLimiter == nil { cl.uploadLimit = rate.NewLimiter(rate.Inf, 0) } else { @@ -253,6 +260,11 @@ func NewClient(cfg *Config) (cl *Client, err error) { storageImpl := cfg.DefaultStorage if storageImpl == nil { storageImpl = storage.NewFile(cfg.DataDir) + cl.onClose = append(cl.onClose, func() { + if err := storageImpl.Close(); err != nil { + log.Printf("error closing default storage: %s", err) + } + }) } cl.defaultStorage = storage.NewClient(storageImpl) if cfg.IPBlocklist != nil { @@ -337,6 +349,9 @@ func (cl *Client) Close() { for _, t := range cl.torrents { t.close() } + for _, f := range cl.onClose { + f() + } cl.event.Broadcast() } diff --git a/client_test.go b/client_test.go index b70e2897..68553e6c 100644 --- a/client_test.go +++ b/client_test.go @@ -38,25 +38,49 @@ func init() { log.SetFlags(log.LstdFlags | log.Llongfile) } -var TestingConfig = Config{ - ListenAddr: "localhost:0", - NoDHT: true, - DisableTrackers: true, - DataDir: "/tmp/anacrolix", - DHTConfig: dht.ServerConfig{ - NoDefaultBootstrap: true, - }, - Debug: true, +func TestingConfig() *Config { + return &Config{ + ListenAddr: "localhost:0", + NoDHT: true, + DisableTrackers: true, + DataDir: func() string { + ret, err := ioutil.TempDir("", "") + if err != nil { + panic(err) + } + return ret + }(), + DHTConfig: dht.ServerConfig{ + NoDefaultBootstrap: true, + }, + Debug: true, + } } func TestClientDefault(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) + require.NoError(t, err) + cl.Close() +} + +func TestBoltPieceCompletionClosedWhenClientClosed(t *testing.T) { + cfg := TestingConfig() + pc, err := storage.NewBoltPieceCompletion(cfg.DataDir) + require.NoError(t, err) + ci := storage.NewFileWithCompletion(cfg.DataDir, pc) + defer ci.Close() + cfg.DefaultStorage = ci + cl, err := NewClient(cfg) + require.NoError(t, err) + cl.Close() + // And again, https://github.com/anacrolix/torrent/issues/158 + cl, err = NewClient(cfg) require.NoError(t, err) cl.Close() } func TestAddDropTorrent(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() dir, mi := testutil.GreetingTestTorrent() @@ -95,7 +119,7 @@ func TestTorrentInitialState(t *testing.T) { pieceStateChanges: pubsub.NewPubSub(), } tor.chunkSize = 2 - tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null")) + tor.storageOpener = storage.NewClient(storage.NewFileWithCompletion("/dev/null", storage.NewMapPieceCompletion())) // Needed to lock for asynchronous piece verification. tor.cl = new(Client) err := tor.setInfoBytes(mi.InfoBytes) @@ -219,7 +243,7 @@ func TestUTPRawConn(t *testing.T) { func TestTwoClientsArbitraryPorts(t *testing.T) { for i := 0; i < 2; i++ { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) if err != nil { t.Fatal(err) } @@ -228,7 +252,7 @@ func TestTwoClientsArbitraryPorts(t *testing.T) { } func TestAddDropManyTorrents(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() for i := range iter.N(1000) { @@ -359,16 +383,17 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) // Create seeder and a Torrent. - cfg := TestingConfig + cfg := TestingConfig() cfg.Seed = true cfg.UploadRateLimiter = ps.SeederUploadRateLimiter // cfg.ListenAddr = "localhost:4000" if ps.SeederStorage != nil { cfg.DefaultStorage = ps.SeederStorage(greetingTempDir) + defer cfg.DefaultStorage.Close() } else { cfg.DataDir = greetingTempDir } - seeder, err := NewClient(&cfg) + seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() if ps.ExportClientStatus { @@ -389,7 +414,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { } cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter // cfg.ListenAddr = "localhost:4001" - leecher, err := NewClient(&cfg) + leecher, err := NewClient(cfg) require.NoError(t, err) defer leecher.Close() if ps.ExportClientStatus { @@ -398,7 +423,6 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { leecherGreeting, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { ret = TorrentSpecFromMetaInfo(mi) ret.ChunkSize = 2 - ret.Storage = storage.NewFile(leecherDataDir) return }()) require.NoError(t, err) @@ -441,10 +465,10 @@ func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { func TestSeedAfterDownloading(t *testing.T) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig + cfg := TestingConfig() cfg.Seed = true cfg.DataDir = greetingTempDir - seeder, err := NewClient(&cfg) + seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() testutil.ExportStatusWriter(seeder, "s") @@ -452,7 +476,7 @@ func TestSeedAfterDownloading(t *testing.T) { cfg.DataDir, err = ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(cfg.DataDir) - leecher, err := NewClient(&cfg) + leecher, err := NewClient(cfg) require.NoError(t, err) defer leecher.Close() testutil.ExportStatusWriter(leecher, "l") @@ -461,7 +485,7 @@ func TestSeedAfterDownloading(t *testing.T) { cfg.DataDir, err = ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(cfg.DataDir) - leecherLeecher, _ := NewClient(&cfg) + leecherLeecher, _ := NewClient(cfg) defer leecherLeecher.Close() testutil.ExportStatusWriter(leecherLeecher, "ll") leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { @@ -499,7 +523,7 @@ func TestSeedAfterDownloading(t *testing.T) { } func TestMergingTrackersByAddingSpecs(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() spec := TorrentSpec{} @@ -560,9 +584,9 @@ func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) { // We read from a piece which is marked completed, but is missing data. func TestCompletedPieceWrongSize(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DefaultStorage = badStorage{} - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() info := metainfo.Info{ @@ -588,11 +612,11 @@ func TestCompletedPieceWrongSize(t *testing.T) { } func BenchmarkAddLargeTorrent(b *testing.B) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DisableTCP = true cfg.DisableUTP = true cfg.ListenAddr = "redonk" - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(b, err) defer cl.Close() for range iter.N(b.N) { @@ -607,19 +631,19 @@ func BenchmarkAddLargeTorrent(b *testing.B) { func TestResponsive(t *testing.T) { seederDataDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) - cfg := TestingConfig + cfg := TestingConfig() cfg.Seed = true cfg.DataDir = seederDataDir - seeder, err := NewClient(&cfg) + seeder, err := NewClient(cfg) require.Nil(t, err) defer seeder.Close() seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) leecherDataDir, err := ioutil.TempDir("", "") require.Nil(t, err) defer os.RemoveAll(leecherDataDir) - cfg = TestingConfig + cfg = TestingConfig() cfg.DataDir = leecherDataDir - leecher, err := NewClient(&cfg) + leecher, err := NewClient(cfg) require.Nil(t, err) defer leecher.Close() leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { @@ -649,19 +673,19 @@ func TestResponsive(t *testing.T) { func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { seederDataDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) - cfg := TestingConfig + cfg := TestingConfig() cfg.Seed = true cfg.DataDir = seederDataDir - seeder, err := NewClient(&cfg) + seeder, err := NewClient(cfg) require.Nil(t, err) defer seeder.Close() seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) leecherDataDir, err := ioutil.TempDir("", "") require.Nil(t, err) defer os.RemoveAll(leecherDataDir) - cfg = TestingConfig + cfg = TestingConfig() cfg.DataDir = leecherDataDir - leecher, err := NewClient(&cfg) + leecher, err := NewClient(cfg) require.Nil(t, err) defer leecher.Close() leecherTorrent, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { @@ -691,10 +715,10 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { func TestDHTInheritBlocklist(t *testing.T) { ipl := iplist.New(nil) require.NotNil(t, ipl) - cfg := TestingConfig + cfg := TestingConfig() cfg.IPBlocklist = ipl cfg.NoDHT = false - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() require.Equal(t, ipl, cl.DHT().IPBlocklist()) @@ -703,7 +727,7 @@ func TestDHTInheritBlocklist(t *testing.T) { // Check that stuff is merged in subsequent AddTorrentSpec for the same // infohash. func TestAddTorrentSpecMerging(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() dir, mi := testutil.GreetingTestTorrent() @@ -723,7 +747,7 @@ func TestAddTorrentSpecMerging(t *testing.T) { func TestTorrentDroppedBeforeGotInfo(t *testing.T) { dir, mi := testutil.GreetingTestTorrent() os.RemoveAll(dir) - cl, _ := NewClient(&TestingConfig) + cl, _ := NewClient(TestingConfig()) defer cl.Close() tt, _, _ := cl.AddTorrentSpec(&TorrentSpec{ InfoHash: mi.HashInfoBytes(), @@ -768,12 +792,12 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf assert.NoError(t, err) } } - cfg := TestingConfig + cfg := TestingConfig() // TODO: Disable network option? cfg.DisableTCP = true cfg.DisableUTP = true cfg.DefaultStorage = filePieceStore - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() tt, err := cl.AddTorrent(greetingMetainfo) @@ -799,13 +823,13 @@ func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) { } func TestAddMetainfoWithNodes(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.NoDHT = false // For now, we want to just jam the nodes into the table, without // verifying them first. Also the DHT code doesn't support mixing secure // and insecure nodes if security is enabled (yet). cfg.DHTConfig.NoSecurity = true - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() assert.EqualValues(t, cl.DHT().NumNodes(), 0) @@ -825,10 +849,10 @@ type testDownloadCancelParams struct { func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { greetingTempDir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig + cfg := TestingConfig() cfg.Seed = true cfg.DataDir = greetingTempDir - seeder, err := NewClient(&cfg) + seeder, err := NewClient(cfg) require.NoError(t, err) defer seeder.Close() if ps.ExportClientStatus { @@ -845,7 +869,7 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { } cfg.DefaultStorage = storage.NewResourcePieces(fc.AsResourceProvider()) cfg.DataDir = leecherDataDir - leecher, _ := NewClient(&cfg) + leecher, _ := NewClient(cfg) defer leecher.Close() if ps.ExportClientStatus { testutil.ExportStatusWriter(leecher, "l") @@ -897,7 +921,7 @@ func TestTorrentDownloadAllThenCancel(t *testing.T) { // Ensure that it's an error for a peer to send an invalid have message. func TestPeerInvalidHave(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() info := metainfo.Info{ @@ -924,9 +948,9 @@ func TestPeerInvalidHave(t *testing.T) { func TestPieceCompletedInStorageButNotClient(t *testing.T) { greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) - cfg := TestingConfig + cfg := TestingConfig() cfg.DataDir = greetingTempDir - seeder, err := NewClient(&TestingConfig) + seeder, err := NewClient(TestingConfig()) require.NoError(t, err) seeder.AddTorrentSpec(&TorrentSpec{ InfoBytes: greetingMetainfo.InfoBytes, @@ -945,7 +969,7 @@ func TestPrepareTrackerAnnounce(t *testing.T) { // Check that when the listen port is 0, all the protocols listened on have // the same port, and it isn't zero. func TestClientDynamicListenPortAllProtocols(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) @@ -953,9 +977,9 @@ func TestClientDynamicListenPortAllProtocols(t *testing.T) { } func TestClientDynamicListenTCPOnly(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DisableUTP = true - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) @@ -963,9 +987,9 @@ func TestClientDynamicListenTCPOnly(t *testing.T) { } func TestClientDynamicListenUTPOnly(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DisableTCP = true - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() assert.NotEqual(t, 0, missinggo.AddrPort(cl.ListenAddr())) @@ -973,10 +997,10 @@ func TestClientDynamicListenUTPOnly(t *testing.T) { } func TestClientDynamicListenPortNoProtocols(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DisableTCP = true cfg.DisableUTP = true - cl, err := NewClient(&cfg) + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() assert.Nil(t, cl.ListenAddr()) @@ -1009,9 +1033,8 @@ func totalConns(tts []*Torrent) (ret int) { func TestSetMaxEstablishedConn(t *testing.T) { var tts []*Torrent ih := testutil.GreetingMetaInfo().HashInfoBytes() - cfg := TestingConfig for i := range iter.N(3) { - cl, err := NewClient(&cfg) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() tt, _ := cl.AddTorrentInfoHash(ih) @@ -1067,25 +1090,25 @@ func makeMagnet(t *testing.T, cl *Client, dir string, name string) string { // https://github.com/anacrolix/torrent/issues/114 func TestMultipleTorrentsWithEncryption(t *testing.T) { - cfg := TestingConfig + cfg := TestingConfig() cfg.DisableUTP = true cfg.Seed = true cfg.DataDir = filepath.Join(cfg.DataDir, "server") cfg.Debug = true cfg.ForceEncryption = true os.Mkdir(cfg.DataDir, 0755) - server, err := NewClient(&cfg) + server, err := NewClient(cfg) require.NoError(t, err) defer server.Close() testutil.ExportStatusWriter(server, "s") magnet1 := makeMagnet(t, server, cfg.DataDir, "test1") makeMagnet(t, server, cfg.DataDir, "test2") - cfg = TestingConfig + cfg = TestingConfig() cfg.DisableUTP = true cfg.DataDir = filepath.Join(cfg.DataDir, "client") cfg.Debug = true cfg.ForceEncryption = true - client, err := NewClient(&cfg) + client, err := NewClient(cfg) require.NoError(t, err) defer client.Close() testutil.ExportStatusWriter(client, "c") diff --git a/reader_test.go b/reader_test.go index 6bde870a..dcda4152 100644 --- a/reader_test.go +++ b/reader_test.go @@ -11,7 +11,7 @@ import ( ) func TestReaderReadContext(t *testing.T) { - cl, err := NewClient(&TestingConfig) + cl, err := NewClient(TestingConfig()) require.NoError(t, err) defer cl.Close() tt, err := cl.AddTorrent(testutil.GreetingMetaInfo()) diff --git a/storage/wrappers.go b/storage/wrappers.go index 8e90f0f0..b4137e5b 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -11,7 +11,7 @@ import ( ) type Client struct { - ClientImpl + ci ClientImpl } func NewClient(cl ClientImpl) *Client { @@ -19,7 +19,7 @@ func NewClient(cl ClientImpl) *Client { } func (cl Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (*Torrent, error) { - t, err := cl.ClientImpl.OpenTorrent(info, infoHash) + t, err := cl.ci.OpenTorrent(info, infoHash) return &Torrent{t}, err } diff --git a/torrent_test.go b/torrent_test.go index 5811b359..edda574d 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -100,10 +100,8 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) { // Check that a torrent containing zero-length file(s) will start, and that // they're created in the filesystem. The client storage is assumed to be // file-based on the native filesystem based. -func testEmptyFilesAndZeroPieceLength(t *testing.T, cs storage.ClientImpl) { - cfg := TestingConfig - cfg.DefaultStorage = cs - cl, err := NewClient(&TestingConfig) +func testEmptyFilesAndZeroPieceLength(t *testing.T, cfg *Config) { + cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() ib, err := bencode.Marshal(metainfo.Info{ @@ -112,7 +110,7 @@ func testEmptyFilesAndZeroPieceLength(t *testing.T, cs storage.ClientImpl) { PieceLength: 0, }) require.NoError(t, err) - fp := filepath.Join(TestingConfig.DataDir, "empty") + fp := filepath.Join(cfg.DataDir, "empty") os.Remove(fp) assert.False(t, missinggo.FilePathExists(fp)) tt, err := cl.AddTorrent(&metainfo.MetaInfo{ @@ -126,11 +124,19 @@ func testEmptyFilesAndZeroPieceLength(t *testing.T, cs storage.ClientImpl) { } func TestEmptyFilesAndZeroPieceLengthWithFileStorage(t *testing.T) { - testEmptyFilesAndZeroPieceLength(t, storage.NewFile(TestingConfig.DataDir)) + cfg := TestingConfig() + ci := storage.NewFile(cfg.DataDir) + defer ci.Close() + cfg.DefaultStorage = ci + testEmptyFilesAndZeroPieceLength(t, cfg) } func TestEmptyFilesAndZeroPieceLengthWithMMapStorage(t *testing.T) { - testEmptyFilesAndZeroPieceLength(t, storage.NewMMap(TestingConfig.DataDir)) + cfg := TestingConfig() + ci := storage.NewMMap(cfg.DataDir) + defer ci.Close() + cfg.DefaultStorage = ci + testEmptyFilesAndZeroPieceLength(t, cfg) } func TestPieceHashFailed(t *testing.T) {