]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Rework storage.TorrentImpl to support shared capacity key
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "runtime"
10         "sync"
11         "testing"
12         "testing/iotest"
13         "time"
14
15         "github.com/anacrolix/missinggo/v2/filecache"
16         "github.com/anacrolix/torrent"
17         "github.com/anacrolix/torrent/internal/testutil"
18         "github.com/anacrolix/torrent/storage"
19         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
20         "github.com/frankban/quicktest"
21         "golang.org/x/time/rate"
22
23         "github.com/stretchr/testify/assert"
24         "github.com/stretchr/testify/require"
25 )
26
27 type testClientTransferParams struct {
28         Responsive                 bool
29         Readahead                  int64
30         SetReadahead               bool
31         LeecherStorage             func(string) storage.ClientImplCloser
32         SeederStorage              func(string) storage.ClientImplCloser
33         SeederUploadRateLimiter    *rate.Limiter
34         LeecherDownloadRateLimiter *rate.Limiter
35         ConfigureSeeder            ConfigureClient
36         ConfigureLeecher           ConfigureClient
37         GOMAXPROCS                 int
38
39         LeecherStartsWithoutMetadata bool
40 }
41
42 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
43         pos, err := r.Seek(0, io.SeekStart)
44         assert.NoError(t, err)
45         assert.EqualValues(t, 0, pos)
46         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
47 }
48
49 // Creates a seeder and a leecher, and ensures the data transfers when a read
50 // is attempted on the leecher.
51 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
52
53         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
54         newGOMAXPROCS := prevGOMAXPROCS
55         if ps.GOMAXPROCS > 0 {
56                 newGOMAXPROCS = ps.GOMAXPROCS
57         }
58         defer func() {
59                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
60         }()
61
62         greetingTempDir, mi := testutil.GreetingTestTorrent()
63         defer os.RemoveAll(greetingTempDir)
64         // Create seeder and a Torrent.
65         cfg := torrent.TestingConfig(t)
66         cfg.Seed = true
67         // Some test instances don't like this being on, even when there's no cache involved.
68         cfg.DropMutuallyCompletePeers = false
69         if ps.SeederUploadRateLimiter != nil {
70                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
71         }
72         // cfg.ListenAddr = "localhost:4000"
73         if ps.SeederStorage != nil {
74                 storage := ps.SeederStorage(greetingTempDir)
75                 defer storage.Close()
76                 cfg.DefaultStorage = storage
77         } else {
78                 cfg.DataDir = greetingTempDir
79         }
80         if ps.ConfigureSeeder.Config != nil {
81                 ps.ConfigureSeeder.Config(cfg)
82         }
83         seeder, err := torrent.NewClient(cfg)
84         require.NoError(t, err)
85         if ps.ConfigureSeeder.Client != nil {
86                 ps.ConfigureSeeder.Client(seeder)
87         }
88         defer testutil.ExportStatusWriter(seeder, "s", t)()
89         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
90         // Run a Stats right after Closing the Client. This will trigger the Stats
91         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
92         defer seederTorrent.Stats()
93         defer seeder.Close()
94         seederTorrent.VerifyData()
95         // Create leecher and a Torrent.
96         leecherDataDir, err := ioutil.TempDir("", "")
97         require.NoError(t, err)
98         defer os.RemoveAll(leecherDataDir)
99         cfg = torrent.TestingConfig(t)
100         // See the seeder client config comment.
101         cfg.DropMutuallyCompletePeers = false
102         if ps.LeecherStorage == nil {
103                 cfg.DataDir = leecherDataDir
104         } else {
105                 storage := ps.LeecherStorage(leecherDataDir)
106                 defer storage.Close()
107                 cfg.DefaultStorage = storage
108         }
109         if ps.LeecherDownloadRateLimiter != nil {
110                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
111         }
112         cfg.Seed = false
113         //cfg.Debug = true
114         if ps.ConfigureLeecher.Config != nil {
115                 ps.ConfigureLeecher.Config(cfg)
116         }
117         leecher, err := torrent.NewClient(cfg)
118         require.NoError(t, err)
119         defer leecher.Close()
120         if ps.ConfigureLeecher.Client != nil {
121                 ps.ConfigureLeecher.Client(leecher)
122         }
123         defer testutil.ExportStatusWriter(leecher, "l", t)()
124         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
125                 ret = torrent.TorrentSpecFromMetaInfo(mi)
126                 ret.ChunkSize = 2
127                 if ps.LeecherStartsWithoutMetadata {
128                         ret.InfoBytes = nil
129                 }
130                 return
131         }())
132         require.NoError(t, err)
133         assert.True(t, new)
134
135         //// This was used when observing coalescing of piece state changes.
136         //logPieceStateChanges(leecherTorrent)
137
138         // Now do some things with leecher and seeder.
139         added := leecherTorrent.AddClientPeer(seeder)
140         assert.False(t, leecherTorrent.Seeding())
141         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
142         // should be sitting idle until we demand data.
143         if !ps.LeecherStartsWithoutMetadata {
144                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
145         }
146         if ps.LeecherStartsWithoutMetadata {
147                 <-leecherTorrent.GotInfo()
148         }
149         r := leecherTorrent.NewReader()
150         defer r.Close()
151         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
152         if ps.Responsive {
153                 r.SetResponsive()
154         }
155         if ps.SetReadahead {
156                 r.SetReadahead(ps.Readahead)
157         }
158         assertReadAllGreeting(t, r)
159         assert.NotEmpty(t, seederTorrent.PeerConns())
160         leecherPeerConns := leecherTorrent.PeerConns()
161         if cfg.DropMutuallyCompletePeers {
162                 // I don't think we can assume it will be empty already, due to timing.
163                 //assert.Empty(t, leecherPeerConns)
164         } else {
165                 assert.NotEmpty(t, leecherPeerConns)
166         }
167         foundSeeder := false
168         for _, pc := range leecherPeerConns {
169                 completed := pc.PeerPieces().Len()
170                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
171                 if completed == leecherTorrent.Info().NumPieces() {
172                         foundSeeder = true
173                 }
174         }
175         if !foundSeeder {
176                 t.Errorf("didn't find seeder amongst leecher peer conns")
177         }
178
179         seederStats := seederTorrent.Stats()
180         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
181         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
182
183         leecherStats := leecherTorrent.Stats()
184         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
185         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
186
187         // Try reading through again for the cases where the torrent data size
188         // exceeds the size of the cache.
189         assertReadAllGreeting(t, r)
190 }
191
192 type fileCacheClientStorageFactoryParams struct {
193         Capacity    int64
194         SetCapacity bool
195 }
196
197 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
198         return func(dataDir string) storage.ClientImplCloser {
199                 fc, err := filecache.NewCache(dataDir)
200                 if err != nil {
201                         panic(err)
202                 }
203                 var sharedCapacity *int64
204                 if ps.SetCapacity {
205                         sharedCapacity = &ps.Capacity
206                         fc.SetCapacity(ps.Capacity)
207                 }
208                 return struct {
209                         storage.ClientImpl
210                         io.Closer
211                 }{
212                         storage.NewResourcePiecesOpts(
213                                 fc.AsResourceProvider(),
214                                 storage.ResourcePiecesOpts{
215                                         Capacity: sharedCapacity,
216                                 }),
217                         ioutil.NopCloser(nil),
218                 }
219         }
220 }
221
222 type storageFactory func(string) storage.ClientImplCloser
223
224 func TestClientTransferDefault(t *testing.T) {
225         testClientTransfer(t, testClientTransferParams{
226                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
227         })
228 }
229
230 func TestClientTransferDefaultNoMetadata(t *testing.T) {
231         testClientTransfer(t, testClientTransferParams{
232                 LeecherStorage:               newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
233                 LeecherStartsWithoutMetadata: true,
234         })
235 }
236
237 func TestClientTransferRateLimitedUpload(t *testing.T) {
238         started := time.Now()
239         testClientTransfer(t, testClientTransferParams{
240                 // We are uploading 13 bytes (the length of the greeting torrent). The
241                 // chunks are 2 bytes in length. Then the smallest burst we can run
242                 // with is 2. Time taken is (13-burst)/rate.
243                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
244         })
245         require.True(t, time.Since(started) > time.Second)
246 }
247
248 func TestClientTransferRateLimitedDownload(t *testing.T) {
249         testClientTransfer(t, testClientTransferParams{
250                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
251         })
252 }
253
254 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
255         testClientTransfer(t, testClientTransferParams{
256                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
257                         SetCapacity: true,
258                         // Going below the piece length means it can't complete a piece so
259                         // that it can be hashed.
260                         Capacity: 5,
261                 }),
262                 SetReadahead: setReadahead,
263                 // Can't readahead too far or the cache will thrash and drop data we
264                 // thought we had.
265                 Readahead: readahead,
266
267                 // These tests don't work well with more than 1 connection to the seeder.
268                 ConfigureLeecher: ConfigureClient{
269                         Config: func(cfg *torrent.ClientConfig) {
270                                 cfg.DropDuplicatePeerIds = true
271                                 //cfg.DisableIPv6 = true
272                                 //cfg.DisableUTP = true
273                         },
274                 },
275         })
276 }
277
278 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
279         testClientTransferSmallCache(t, true, 5)
280 }
281
282 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
283         testClientTransferSmallCache(t, true, 15)
284 }
285
286 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
287         testClientTransferSmallCache(t, false, -1)
288 }
289
290 func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory {
291         return func(dataDir string) storage.ClientImplCloser {
292                 opts := optsMaker(dataDir)
293                 //log.Printf("opening sqlite db: %#v", opts)
294                 ret, err := sqliteStorage.NewPiecesStorage(opts)
295                 if err != nil {
296                         panic(err)
297                 }
298                 return ret
299         }
300 }
301
302 type leecherStorageTestCase struct {
303         name       string
304         f          storageFactory
305         gomaxprocs int
306 }
307
308 func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
309         return leecherStorageTestCase{
310                 fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
311                 sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
312                         opts.Path = filepath.Join(dataDir, "sqlite.db")
313                         opts.NumConns = numConns
314                         return
315                 }),
316                 numConns,
317         }
318 }
319
320 func TestClientTransferVarious(t *testing.T) {
321         // Leecher storage
322         for _, ls := range []leecherStorageTestCase{
323                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
324                 {"Boltdb", storage.NewBoltDB, 0},
325                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
326                         path := filepath.Join(s, "sqlite3.db")
327                         var opts sqliteStorage.NewDirectStorageOpts
328                         opts.Path = path
329                         cl, err := sqliteStorage.NewDirectStorage(opts)
330                         if err != nil {
331                                 panic(err)
332                         }
333                         return cl
334                 }, 0},
335                 sqliteLeecherStorageTestCase(1),
336                 sqliteLeecherStorageTestCase(2),
337                 // This should use a number of connections equal to the number of CPUs
338                 sqliteLeecherStorageTestCase(0),
339                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
340                         opts.Memory = true
341                         return
342                 }), 0},
343         } {
344                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
345                         // Seeder storage
346                         for _, ss := range []struct {
347                                 name string
348                                 f    storageFactory
349                         }{
350                                 {"File", storage.NewFile},
351                                 {"Mmap", storage.NewMMap},
352                         } {
353                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
354                                         for _, responsive := range []bool{false, true} {
355                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
356                                                         t.Run("NoReadahead", func(t *testing.T) {
357                                                                 testClientTransfer(t, testClientTransferParams{
358                                                                         Responsive:     responsive,
359                                                                         SeederStorage:  ss.f,
360                                                                         LeecherStorage: ls.f,
361                                                                         GOMAXPROCS:     ls.gomaxprocs,
362                                                                 })
363                                                         })
364                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
365                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
366                                                                         testClientTransfer(t, testClientTransferParams{
367                                                                                 SeederStorage:  ss.f,
368                                                                                 Responsive:     responsive,
369                                                                                 SetReadahead:   true,
370                                                                                 Readahead:      readahead,
371                                                                                 LeecherStorage: ls.f,
372                                                                                 GOMAXPROCS:     ls.gomaxprocs,
373                                                                         })
374                                                                 })
375                                                         }
376                                                 })
377                                         }
378                                 })
379                         }
380                 })
381         }
382 }
383
384 // Check that after completing leeching, a leecher transitions to a seeding
385 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
386 func TestSeedAfterDownloading(t *testing.T) {
387         greetingTempDir, mi := testutil.GreetingTestTorrent()
388         defer os.RemoveAll(greetingTempDir)
389
390         cfg := torrent.TestingConfig(t)
391         cfg.Seed = true
392         cfg.DataDir = greetingTempDir
393         seeder, err := torrent.NewClient(cfg)
394         require.NoError(t, err)
395         defer seeder.Close()
396         defer testutil.ExportStatusWriter(seeder, "s", t)()
397         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
398         require.NoError(t, err)
399         assert.True(t, ok)
400         seederTorrent.VerifyData()
401
402         cfg = torrent.TestingConfig(t)
403         cfg.Seed = true
404         cfg.DataDir, err = ioutil.TempDir("", "")
405         require.NoError(t, err)
406         defer os.RemoveAll(cfg.DataDir)
407         leecher, err := torrent.NewClient(cfg)
408         require.NoError(t, err)
409         defer leecher.Close()
410         defer testutil.ExportStatusWriter(leecher, "l", t)()
411
412         cfg = torrent.TestingConfig(t)
413         cfg.Seed = false
414         cfg.DataDir, err = ioutil.TempDir("", "")
415         require.NoError(t, err)
416         defer os.RemoveAll(cfg.DataDir)
417         leecherLeecher, _ := torrent.NewClient(cfg)
418         require.NoError(t, err)
419         defer leecherLeecher.Close()
420         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
421         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
422                 ret = torrent.TorrentSpecFromMetaInfo(mi)
423                 ret.ChunkSize = 2
424                 return
425         }())
426         require.NoError(t, err)
427         assert.True(t, ok)
428         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
429                 ret = torrent.TorrentSpecFromMetaInfo(mi)
430                 ret.ChunkSize = 3
431                 return
432         }())
433         require.NoError(t, err)
434         assert.True(t, ok)
435         // Simultaneously DownloadAll in Leecher, and read the contents
436         // consecutively in LeecherLeecher. This non-deterministically triggered a
437         // case where the leecher wouldn't unchoke the LeecherLeecher.
438         var wg sync.WaitGroup
439         wg.Add(1)
440         go func() {
441                 defer wg.Done()
442                 r := llg.NewReader()
443                 defer r.Close()
444                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
445         }()
446         done := make(chan struct{})
447         defer close(done)
448         go leecherGreeting.AddClientPeer(seeder)
449         go leecherGreeting.AddClientPeer(leecherLeecher)
450         wg.Add(1)
451         go func() {
452                 defer wg.Done()
453                 leecherGreeting.DownloadAll()
454                 leecher.WaitAll()
455         }()
456         wg.Wait()
457 }
458
459 type ConfigureClient struct {
460         Config func(*torrent.ClientConfig)
461         Client func(*torrent.Client)
462 }