]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Use iotest.TestReader
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "sync"
10         "testing"
11         "testing/iotest"
12         "time"
13
14         "github.com/anacrolix/missinggo/v2/filecache"
15         "github.com/anacrolix/torrent"
16         "github.com/anacrolix/torrent/internal/testutil"
17         "github.com/anacrolix/torrent/storage"
18         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
19         "github.com/frankban/quicktest"
20         "golang.org/x/time/rate"
21
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24 )
25
26 type testClientTransferParams struct {
27         Responsive                 bool
28         Readahead                  int64
29         SetReadahead               bool
30         LeecherStorage             func(string) storage.ClientImplCloser
31         SeederStorage              func(string) storage.ClientImplCloser
32         SeederUploadRateLimiter    *rate.Limiter
33         LeecherDownloadRateLimiter *rate.Limiter
34         ConfigureSeeder            ConfigureClient
35         ConfigureLeecher           ConfigureClient
36
37         LeecherStartsWithoutMetadata bool
38 }
39
40 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
41         pos, err := r.Seek(0, io.SeekStart)
42         assert.NoError(t, err)
43         assert.EqualValues(t, 0, pos)
44         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
45 }
46
47 // Creates a seeder and a leecher, and ensures the data transfers when a read
48 // is attempted on the leecher.
49 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
50         greetingTempDir, mi := testutil.GreetingTestTorrent()
51         defer os.RemoveAll(greetingTempDir)
52         // Create seeder and a Torrent.
53         cfg := torrent.TestingConfig()
54         cfg.Seed = true
55         // Some test instances don't like this being on, even when there's no cache involved.
56         cfg.DropMutuallyCompletePeers = false
57         if ps.SeederUploadRateLimiter != nil {
58                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
59         }
60         // cfg.ListenAddr = "localhost:4000"
61         if ps.SeederStorage != nil {
62                 storage := ps.SeederStorage(greetingTempDir)
63                 defer storage.Close()
64                 cfg.DefaultStorage = storage
65         } else {
66                 cfg.DataDir = greetingTempDir
67         }
68         if ps.ConfigureSeeder.Config != nil {
69                 ps.ConfigureSeeder.Config(cfg)
70         }
71         seeder, err := torrent.NewClient(cfg)
72         require.NoError(t, err)
73         if ps.ConfigureSeeder.Client != nil {
74                 ps.ConfigureSeeder.Client(seeder)
75         }
76         defer testutil.ExportStatusWriter(seeder, "s", t)()
77         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
78         // Run a Stats right after Closing the Client. This will trigger the Stats
79         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
80         defer seederTorrent.Stats()
81         defer seeder.Close()
82         seederTorrent.VerifyData()
83         // Create leecher and a Torrent.
84         leecherDataDir, err := ioutil.TempDir("", "")
85         require.NoError(t, err)
86         defer os.RemoveAll(leecherDataDir)
87         cfg = torrent.TestingConfig()
88         // See the seeder client config comment.
89         cfg.DropMutuallyCompletePeers = false
90         if ps.LeecherStorage == nil {
91                 cfg.DataDir = leecherDataDir
92         } else {
93                 storage := ps.LeecherStorage(leecherDataDir)
94                 defer storage.Close()
95                 cfg.DefaultStorage = storage
96         }
97         if ps.LeecherDownloadRateLimiter != nil {
98                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
99         }
100         cfg.Seed = false
101         if ps.ConfigureLeecher.Config != nil {
102                 ps.ConfigureLeecher.Config(cfg)
103         }
104         leecher, err := torrent.NewClient(cfg)
105         require.NoError(t, err)
106         defer leecher.Close()
107         if ps.ConfigureLeecher.Client != nil {
108                 ps.ConfigureLeecher.Client(leecher)
109         }
110         defer testutil.ExportStatusWriter(leecher, "l", t)()
111         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
112                 ret = torrent.TorrentSpecFromMetaInfo(mi)
113                 ret.ChunkSize = 2
114                 if ps.LeecherStartsWithoutMetadata {
115                         ret.InfoBytes = nil
116                 }
117                 return
118         }())
119         require.NoError(t, err)
120         assert.True(t, new)
121
122         //// This was used when observing coalescing of piece state changes.
123         //logPieceStateChanges(leecherTorrent)
124
125         // Now do some things with leecher and seeder.
126         added := leecherTorrent.AddClientPeer(seeder)
127         assert.False(t, leecherTorrent.Seeding())
128         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
129         // should be sitting idle until we demand data.
130         if !ps.LeecherStartsWithoutMetadata {
131                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
132         }
133         if ps.LeecherStartsWithoutMetadata {
134                 <-leecherTorrent.GotInfo()
135         }
136         r := leecherTorrent.NewReader()
137         defer r.Close()
138         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
139         if ps.Responsive {
140                 r.SetResponsive()
141         }
142         if ps.SetReadahead {
143                 r.SetReadahead(ps.Readahead)
144         }
145         assertReadAllGreeting(t, r)
146         assert.NotEmpty(t, seederTorrent.PeerConns())
147         leecherPeerConns := leecherTorrent.PeerConns()
148         if cfg.DropMutuallyCompletePeers {
149                 // I don't think we can assume it will be empty already, due to timing.
150                 //assert.Empty(t, leecherPeerConns)
151         } else {
152                 assert.NotEmpty(t, leecherPeerConns)
153         }
154         foundSeeder := false
155         for _, pc := range leecherPeerConns {
156                 completed := pc.PeerPieces().Len()
157                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
158                 if completed == leecherTorrent.Info().NumPieces() {
159                         foundSeeder = true
160                 }
161         }
162         if !foundSeeder {
163                 t.Errorf("didn't find seeder amongst leecher peer conns")
164         }
165
166         seederStats := seederTorrent.Stats()
167         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
168         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
169
170         leecherStats := leecherTorrent.Stats()
171         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
172         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
173
174         // Try reading through again for the cases where the torrent data size
175         // exceeds the size of the cache.
176         assertReadAllGreeting(t, r)
177 }
178
179 type fileCacheClientStorageFactoryParams struct {
180         Capacity    int64
181         SetCapacity bool
182         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
183 }
184
185 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
186         return func(dataDir string) storage.ClientImplCloser {
187                 fc, err := filecache.NewCache(dataDir)
188                 if err != nil {
189                         panic(err)
190                 }
191                 if ps.SetCapacity {
192                         fc.SetCapacity(ps.Capacity)
193                 }
194                 return ps.Wrapper(fc)
195         }
196 }
197
198 type storageFactory func(string) storage.ClientImplCloser
199
200 func TestClientTransferDefault(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
203                         Wrapper: fileCachePieceResourceStorage,
204                 }),
205         })
206 }
207
208 func TestClientTransferDefaultNoMetadata(t *testing.T) {
209         testClientTransfer(t, testClientTransferParams{
210                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
211                         Wrapper: fileCachePieceResourceStorage,
212                 }),
213                 LeecherStartsWithoutMetadata: true,
214         })
215 }
216
217 func TestClientTransferRateLimitedUpload(t *testing.T) {
218         started := time.Now()
219         testClientTransfer(t, testClientTransferParams{
220                 // We are uploading 13 bytes (the length of the greeting torrent). The
221                 // chunks are 2 bytes in length. Then the smallest burst we can run
222                 // with is 2. Time taken is (13-burst)/rate.
223                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
224         })
225         require.True(t, time.Since(started) > time.Second)
226 }
227
228 func TestClientTransferRateLimitedDownload(t *testing.T) {
229         testClientTransfer(t, testClientTransferParams{
230                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
231         })
232 }
233
234 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
235         return struct {
236                 storage.ClientImpl
237                 io.Closer
238         }{
239                 storage.NewResourcePieces(fc.AsResourceProvider()),
240                 ioutil.NopCloser(nil),
241         }
242 }
243
244 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
245         testClientTransfer(t, testClientTransferParams{
246                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
247                         SetCapacity: true,
248                         // Going below the piece length means it can't complete a piece so
249                         // that it can be hashed.
250                         Capacity: 5,
251                         Wrapper:  fileCachePieceResourceStorage,
252                 }),
253                 SetReadahead: setReadahead,
254                 // Can't readahead too far or the cache will thrash and drop data we
255                 // thought we had.
256                 Readahead: readahead,
257
258                 // These tests don't work well with more than 1 connection to the seeder.
259                 ConfigureLeecher: ConfigureClient{
260                         Config: func(cfg *torrent.ClientConfig) {
261                                 cfg.DropDuplicatePeerIds = true
262                                 //cfg.DisableIPv6 = true
263                                 //cfg.DisableUTP = true
264                         },
265                 },
266         })
267 }
268
269 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
270         testClientTransferSmallCache(t, true, 5)
271 }
272
273 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
274         testClientTransferSmallCache(t, true, 15)
275 }
276
277 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
278         testClientTransferSmallCache(t, false, -1)
279 }
280
281 func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory {
282         return func(dataDir string) storage.ClientImplCloser {
283                 opts := optsMaker(dataDir)
284                 //log.Printf("opening sqlite db: %#v", opts)
285                 ret, err := sqliteStorage.NewPiecesStorage(opts)
286                 if err != nil {
287                         panic(err)
288                 }
289                 return ret
290         }
291 }
292
293 func TestClientTransferVarious(t *testing.T) {
294         // Leecher storage
295         for _, ls := range []struct {
296                 name string
297                 f    storageFactory
298         }{
299                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
300                         Wrapper: fileCachePieceResourceStorage,
301                 })},
302                 {"Boltdb", storage.NewBoltDB},
303                 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
304                         return sqliteStorage.NewPiecesStorageOpts{
305                                 NewPoolOpts: sqliteStorage.NewPoolOpts{
306                                         Path: filepath.Join(dataDir, "sqlite.db"),
307                                 },
308                         }
309                 })},
310                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
311                         return sqliteStorage.NewPiecesStorageOpts{
312                                 NewPoolOpts: sqliteStorage.NewPoolOpts{
313                                         Memory: true,
314                                 },
315                         }
316                 })},
317         } {
318                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
319                         // Seeder storage
320                         for _, ss := range []struct {
321                                 name string
322                                 f    storageFactory
323                         }{
324                                 {"File", storage.NewFile},
325                                 {"Mmap", storage.NewMMap},
326                         } {
327                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
328                                         for _, responsive := range []bool{false, true} {
329                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
330                                                         t.Run("NoReadahead", func(t *testing.T) {
331                                                                 testClientTransfer(t, testClientTransferParams{
332                                                                         Responsive:     responsive,
333                                                                         SeederStorage:  ss.f,
334                                                                         LeecherStorage: ls.f,
335                                                                 })
336                                                         })
337                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
338                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
339                                                                         testClientTransfer(t, testClientTransferParams{
340                                                                                 SeederStorage:  ss.f,
341                                                                                 Responsive:     responsive,
342                                                                                 SetReadahead:   true,
343                                                                                 Readahead:      readahead,
344                                                                                 LeecherStorage: ls.f,
345                                                                         })
346                                                                 })
347                                                         }
348                                                 })
349                                         }
350                                 })
351                         }
352                 })
353         }
354 }
355
356 // Check that after completing leeching, a leecher transitions to a seeding
357 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
358 func TestSeedAfterDownloading(t *testing.T) {
359         greetingTempDir, mi := testutil.GreetingTestTorrent()
360         defer os.RemoveAll(greetingTempDir)
361
362         cfg := torrent.TestingConfig()
363         cfg.Seed = true
364         cfg.DataDir = greetingTempDir
365         seeder, err := torrent.NewClient(cfg)
366         require.NoError(t, err)
367         defer seeder.Close()
368         defer testutil.ExportStatusWriter(seeder, "s", t)()
369         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
370         require.NoError(t, err)
371         assert.True(t, ok)
372         seederTorrent.VerifyData()
373
374         cfg = torrent.TestingConfig()
375         cfg.Seed = true
376         cfg.DataDir, err = ioutil.TempDir("", "")
377         require.NoError(t, err)
378         defer os.RemoveAll(cfg.DataDir)
379         leecher, err := torrent.NewClient(cfg)
380         require.NoError(t, err)
381         defer leecher.Close()
382         defer testutil.ExportStatusWriter(leecher, "l", t)()
383
384         cfg = torrent.TestingConfig()
385         cfg.Seed = false
386         cfg.DataDir, err = ioutil.TempDir("", "")
387         require.NoError(t, err)
388         defer os.RemoveAll(cfg.DataDir)
389         leecherLeecher, _ := torrent.NewClient(cfg)
390         require.NoError(t, err)
391         defer leecherLeecher.Close()
392         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
393         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
394                 ret = torrent.TorrentSpecFromMetaInfo(mi)
395                 ret.ChunkSize = 2
396                 return
397         }())
398         require.NoError(t, err)
399         assert.True(t, ok)
400         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
401                 ret = torrent.TorrentSpecFromMetaInfo(mi)
402                 ret.ChunkSize = 3
403                 return
404         }())
405         require.NoError(t, err)
406         assert.True(t, ok)
407         // Simultaneously DownloadAll in Leecher, and read the contents
408         // consecutively in LeecherLeecher. This non-deterministically triggered a
409         // case where the leecher wouldn't unchoke the LeecherLeecher.
410         var wg sync.WaitGroup
411         wg.Add(1)
412         go func() {
413                 defer wg.Done()
414                 r := llg.NewReader()
415                 defer r.Close()
416                 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
417         }()
418         done := make(chan struct{})
419         defer close(done)
420         go leecherGreeting.AddClientPeer(seeder)
421         go leecherGreeting.AddClientPeer(leecherLeecher)
422         wg.Add(1)
423         go func() {
424                 defer wg.Done()
425                 leecherGreeting.DownloadAll()
426                 leecher.WaitAll()
427         }()
428         wg.Wait()
429 }
430
431 type ConfigureClient struct {
432         Config func(*torrent.ClientConfig)
433         Client func(*torrent.Client)
434 }