]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Limit keep alives for rate limited download test
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "runtime"
10         "sync"
11         "testing"
12         "testing/iotest"
13         "time"
14
15         "github.com/anacrolix/missinggo/v2/bitmap"
16         "github.com/anacrolix/missinggo/v2/filecache"
17         "github.com/anacrolix/torrent"
18         "github.com/anacrolix/torrent/internal/testutil"
19         "github.com/anacrolix/torrent/storage"
20         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
21         "github.com/frankban/quicktest"
22         "golang.org/x/time/rate"
23
24         "github.com/stretchr/testify/assert"
25         "github.com/stretchr/testify/require"
26 )
27
28 type testClientTransferParams struct {
29         Responsive                 bool
30         Readahead                  int64
31         SetReadahead               bool
32         LeecherStorage             func(string) storage.ClientImplCloser
33         SeederStorage              func(string) storage.ClientImplCloser
34         SeederUploadRateLimiter    *rate.Limiter
35         LeecherDownloadRateLimiter *rate.Limiter
36         ConfigureSeeder            ConfigureClient
37         ConfigureLeecher           ConfigureClient
38         GOMAXPROCS                 int
39
40         LeecherStartsWithoutMetadata bool
41 }
42
43 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
44         pos, err := r.Seek(0, io.SeekStart)
45         assert.NoError(t, err)
46         assert.EqualValues(t, 0, pos)
47         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
48 }
49
50 // Creates a seeder and a leecher, and ensures the data transfers when a read
51 // is attempted on the leecher.
52 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
53         t.Parallel()
54
55         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
56         newGOMAXPROCS := prevGOMAXPROCS
57         if ps.GOMAXPROCS > 0 {
58                 newGOMAXPROCS = ps.GOMAXPROCS
59         }
60         defer func() {
61                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
62         }()
63
64         greetingTempDir, mi := testutil.GreetingTestTorrent()
65         defer os.RemoveAll(greetingTempDir)
66         // Create seeder and a Torrent.
67         cfg := torrent.TestingConfig(t)
68         cfg.Seed = true
69         // Some test instances don't like this being on, even when there's no cache involved.
70         cfg.DropMutuallyCompletePeers = false
71         if ps.SeederUploadRateLimiter != nil {
72                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
73         }
74         // cfg.ListenAddr = "localhost:4000"
75         if ps.SeederStorage != nil {
76                 storage := ps.SeederStorage(greetingTempDir)
77                 defer storage.Close()
78                 cfg.DefaultStorage = storage
79         } else {
80                 cfg.DataDir = greetingTempDir
81         }
82         if ps.ConfigureSeeder.Config != nil {
83                 ps.ConfigureSeeder.Config(cfg)
84         }
85         seeder, err := torrent.NewClient(cfg)
86         require.NoError(t, err)
87         if ps.ConfigureSeeder.Client != nil {
88                 ps.ConfigureSeeder.Client(seeder)
89         }
90         defer testutil.ExportStatusWriter(seeder, "s", t)()
91         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
92         // Run a Stats right after Closing the Client. This will trigger the Stats
93         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
94         defer seederTorrent.Stats()
95         defer seeder.Close()
96         seederTorrent.VerifyData()
97         // Create leecher and a Torrent.
98         leecherDataDir, err := ioutil.TempDir("", "")
99         require.NoError(t, err)
100         defer os.RemoveAll(leecherDataDir)
101         cfg = torrent.TestingConfig(t)
102         // See the seeder client config comment.
103         cfg.DropMutuallyCompletePeers = false
104         if ps.LeecherStorage == nil {
105                 cfg.DataDir = leecherDataDir
106         } else {
107                 storage := ps.LeecherStorage(leecherDataDir)
108                 defer storage.Close()
109                 cfg.DefaultStorage = storage
110         }
111         if ps.LeecherDownloadRateLimiter != nil {
112                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
113         }
114         cfg.Seed = false
115         //cfg.Debug = true
116         if ps.ConfigureLeecher.Config != nil {
117                 ps.ConfigureLeecher.Config(cfg)
118         }
119         leecher, err := torrent.NewClient(cfg)
120         require.NoError(t, err)
121         defer leecher.Close()
122         if ps.ConfigureLeecher.Client != nil {
123                 ps.ConfigureLeecher.Client(leecher)
124         }
125         defer testutil.ExportStatusWriter(leecher, "l", t)()
126         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
127                 ret = torrent.TorrentSpecFromMetaInfo(mi)
128                 ret.ChunkSize = 2
129                 if ps.LeecherStartsWithoutMetadata {
130                         ret.InfoBytes = nil
131                 }
132                 return
133         }())
134         require.NoError(t, err)
135         assert.True(t, new)
136
137         //// This was used when observing coalescing of piece state changes.
138         //logPieceStateChanges(leecherTorrent)
139
140         // Now do some things with leecher and seeder.
141         added := leecherTorrent.AddClientPeer(seeder)
142         assert.False(t, leecherTorrent.Seeding())
143         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
144         // should be sitting idle until we demand data.
145         if !ps.LeecherStartsWithoutMetadata {
146                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
147         }
148         if ps.LeecherStartsWithoutMetadata {
149                 <-leecherTorrent.GotInfo()
150         }
151         r := leecherTorrent.NewReader()
152         defer r.Close()
153         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
154         if ps.Responsive {
155                 r.SetResponsive()
156         }
157         if ps.SetReadahead {
158                 r.SetReadahead(ps.Readahead)
159         }
160         assertReadAllGreeting(t, r)
161         assert.NotEmpty(t, seederTorrent.PeerConns())
162         leecherPeerConns := leecherTorrent.PeerConns()
163         if cfg.DropMutuallyCompletePeers {
164                 // I don't think we can assume it will be empty already, due to timing.
165                 //assert.Empty(t, leecherPeerConns)
166         } else {
167                 assert.NotEmpty(t, leecherPeerConns)
168         }
169         foundSeeder := false
170         for _, pc := range leecherPeerConns {
171                 completed := pc.PeerPieces().Len()
172                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
173                 if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
174                         foundSeeder = true
175                 }
176         }
177         if !foundSeeder {
178                 t.Errorf("didn't find seeder amongst leecher peer conns")
179         }
180
181         seederStats := seederTorrent.Stats()
182         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
183         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
184
185         leecherStats := leecherTorrent.Stats()
186         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
187         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
188
189         // Try reading through again for the cases where the torrent data size
190         // exceeds the size of the cache.
191         assertReadAllGreeting(t, r)
192 }
193
194 type fileCacheClientStorageFactoryParams struct {
195         Capacity    int64
196         SetCapacity bool
197 }
198
199 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
200         return func(dataDir string) storage.ClientImplCloser {
201                 fc, err := filecache.NewCache(dataDir)
202                 if err != nil {
203                         panic(err)
204                 }
205                 var sharedCapacity *int64
206                 if ps.SetCapacity {
207                         sharedCapacity = &ps.Capacity
208                         fc.SetCapacity(ps.Capacity)
209                 }
210                 return struct {
211                         storage.ClientImpl
212                         io.Closer
213                 }{
214                         storage.NewResourcePiecesOpts(
215                                 fc.AsResourceProvider(),
216                                 storage.ResourcePiecesOpts{
217                                         Capacity: sharedCapacity,
218                                 }),
219                         ioutil.NopCloser(nil),
220                 }
221         }
222 }
223
224 type storageFactory func(string) storage.ClientImplCloser
225
226 func TestClientTransferDefault(t *testing.T) {
227         testClientTransfer(t, testClientTransferParams{
228                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
229         })
230 }
231
232 func TestClientTransferDefaultNoMetadata(t *testing.T) {
233         testClientTransfer(t, testClientTransferParams{
234                 LeecherStorage:               newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
235                 LeecherStartsWithoutMetadata: true,
236         })
237 }
238
239 func TestClientTransferRateLimitedUpload(t *testing.T) {
240         started := time.Now()
241         testClientTransfer(t, testClientTransferParams{
242                 // We are uploading 13 bytes (the length of the greeting torrent). The
243                 // chunks are 2 bytes in length. Then the smallest burst we can run
244                 // with is 2. Time taken is (13-burst)/rate.
245                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
246         })
247         require.True(t, time.Since(started) > time.Second)
248 }
249
250 func TestClientTransferRateLimitedDownload(t *testing.T) {
251         testClientTransfer(t, testClientTransferParams{
252                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
253                 ConfigureSeeder: ConfigureClient{
254                         Config: func(cfg *torrent.ClientConfig) {
255                                 // If we send too many keep alives, we consume all the leechers available download
256                                 // rate. The default isn't exposed, but a minute is pretty reasonable.
257                                 cfg.KeepAliveTimeout = time.Minute
258                         },
259                 },
260         })
261 }
262
263 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
264         testClientTransfer(t, testClientTransferParams{
265                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
266                         SetCapacity: true,
267                         // Going below the piece length means it can't complete a piece so
268                         // that it can be hashed.
269                         Capacity: 5,
270                 }),
271                 SetReadahead: setReadahead,
272                 // Can't readahead too far or the cache will thrash and drop data we
273                 // thought we had.
274                 Readahead: readahead,
275
276                 // These tests don't work well with more than 1 connection to the seeder.
277                 ConfigureLeecher: ConfigureClient{
278                         Config: func(cfg *torrent.ClientConfig) {
279                                 cfg.DropDuplicatePeerIds = true
280                                 //cfg.DisableIPv6 = true
281                                 //cfg.DisableUTP = true
282                         },
283                 },
284         })
285 }
286
287 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
288         testClientTransferSmallCache(t, true, 5)
289 }
290
291 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
292         testClientTransferSmallCache(t, true, 15)
293 }
294
295 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
296         testClientTransferSmallCache(t, false, -1)
297 }
298
299 type leecherStorageTestCase struct {
300         name       string
301         f          storageFactory
302         gomaxprocs int
303 }
304
305 func TestClientTransferVarious(t *testing.T) {
306         // Leecher storage
307         for _, ls := range []leecherStorageTestCase{
308                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
309                 {"Boltdb", storage.NewBoltDB, 0},
310                 {"SqliteDirect", func(s string) storage.ClientImplCloser {
311                         path := filepath.Join(s, "sqlite3.db")
312                         var opts sqliteStorage.NewDirectStorageOpts
313                         opts.Path = path
314                         cl, err := sqliteStorage.NewDirectStorage(opts)
315                         if err != nil {
316                                 panic(err)
317                         }
318                         return cl
319                 }, 0},
320         } {
321                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
322                         // Seeder storage
323                         for _, ss := range []struct {
324                                 name string
325                                 f    storageFactory
326                         }{
327                                 {"File", storage.NewFile},
328                                 {"Mmap", storage.NewMMap},
329                         } {
330                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
331                                         for _, responsive := range []bool{false, true} {
332                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
333                                                         t.Run("NoReadahead", func(t *testing.T) {
334                                                                 testClientTransfer(t, testClientTransferParams{
335                                                                         Responsive:     responsive,
336                                                                         SeederStorage:  ss.f,
337                                                                         LeecherStorage: ls.f,
338                                                                         GOMAXPROCS:     ls.gomaxprocs,
339                                                                 })
340                                                         })
341                                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
342                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
343                                                                         testClientTransfer(t, testClientTransferParams{
344                                                                                 SeederStorage:  ss.f,
345                                                                                 Responsive:     responsive,
346                                                                                 SetReadahead:   true,
347                                                                                 Readahead:      readahead,
348                                                                                 LeecherStorage: ls.f,
349                                                                                 GOMAXPROCS:     ls.gomaxprocs,
350                                                                         })
351                                                                 })
352                                                         }
353                                                 })
354                                         }
355                                 })
356                         }
357                 })
358         }
359 }
360
361 // Check that after completing leeching, a leecher transitions to a seeding
362 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
363 func TestSeedAfterDownloading(t *testing.T) {
364         greetingTempDir, mi := testutil.GreetingTestTorrent()
365         defer os.RemoveAll(greetingTempDir)
366
367         cfg := torrent.TestingConfig(t)
368         cfg.Seed = true
369         cfg.DataDir = greetingTempDir
370         seeder, err := torrent.NewClient(cfg)
371         require.NoError(t, err)
372         defer seeder.Close()
373         defer testutil.ExportStatusWriter(seeder, "s", t)()
374         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
375         require.NoError(t, err)
376         assert.True(t, ok)
377         seederTorrent.VerifyData()
378
379         cfg = torrent.TestingConfig(t)
380         cfg.Seed = true
381         cfg.DataDir, err = ioutil.TempDir("", "")
382         require.NoError(t, err)
383         defer os.RemoveAll(cfg.DataDir)
384         leecher, err := torrent.NewClient(cfg)
385         require.NoError(t, err)
386         defer leecher.Close()
387         defer testutil.ExportStatusWriter(leecher, "l", t)()
388
389         cfg = torrent.TestingConfig(t)
390         cfg.Seed = false
391         cfg.DataDir, err = ioutil.TempDir("", "")
392         require.NoError(t, err)
393         defer os.RemoveAll(cfg.DataDir)
394         leecherLeecher, _ := torrent.NewClient(cfg)
395         require.NoError(t, err)
396         defer leecherLeecher.Close()
397         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
398         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
399                 ret = torrent.TorrentSpecFromMetaInfo(mi)
400                 ret.ChunkSize = 2
401                 return
402         }())
403         require.NoError(t, err)
404         assert.True(t, ok)
405         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
406                 ret = torrent.TorrentSpecFromMetaInfo(mi)
407                 ret.ChunkSize = 3
408                 return
409         }())
410         require.NoError(t, err)
411         assert.True(t, ok)
412         // Simultaneously DownloadAll in Leecher, and read the contents
413         // consecutively in LeecherLeecher. This non-deterministically triggered a
414         // case where the leecher wouldn't unchoke the LeecherLeecher.
415         var wg sync.WaitGroup
416         wg.Add(1)
417         go func() {
418                 defer wg.Done()
419                 r := llg.NewReader()
420                 defer r.Close()
421                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
422         }()
423         done := make(chan struct{})
424         defer close(done)
425         go leecherGreeting.AddClientPeer(seeder)
426         go leecherGreeting.AddClientPeer(leecherLeecher)
427         wg.Add(1)
428         go func() {
429                 defer wg.Done()
430                 leecherGreeting.DownloadAll()
431                 leecher.WaitAll()
432         }()
433         wg.Wait()
434 }
435
436 type ConfigureClient struct {
437         Config func(cfg *torrent.ClientConfig)
438         Client func(cl *torrent.Client)
439 }