]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Test both sqlite file and in-memory leecher storages
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "log"
8         "os"
9         "path/filepath"
10         "sync"
11         "testing"
12         "time"
13
14         "crawshaw.io/sqlite"
15         "github.com/anacrolix/missinggo/v2/filecache"
16         "github.com/anacrolix/torrent"
17         "github.com/anacrolix/torrent/internal/testutil"
18         "github.com/anacrolix/torrent/storage"
19         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
20         "golang.org/x/time/rate"
21
22         "github.com/stretchr/testify/assert"
23         "github.com/stretchr/testify/require"
24 )
25
26 type testClientTransferParams struct {
27         Responsive                 bool
28         Readahead                  int64
29         SetReadahead               bool
30         LeecherStorage             func(string) storage.ClientImplCloser
31         SeederStorage              func(string) storage.ClientImplCloser
32         SeederUploadRateLimiter    *rate.Limiter
33         LeecherDownloadRateLimiter *rate.Limiter
34         ConfigureSeeder            ConfigureClient
35         ConfigureLeecher           ConfigureClient
36
37         LeecherStartsWithoutMetadata bool
38 }
39
40 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
41         pos, err := r.Seek(0, io.SeekStart)
42         assert.NoError(t, err)
43         assert.EqualValues(t, 0, pos)
44         _greeting, err := ioutil.ReadAll(r)
45         assert.NoError(t, err)
46         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
47 }
48
49 // Creates a seeder and a leecher, and ensures the data transfers when a read
50 // is attempted on the leecher.
51 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
52         greetingTempDir, mi := testutil.GreetingTestTorrent()
53         defer os.RemoveAll(greetingTempDir)
54         // Create seeder and a Torrent.
55         cfg := torrent.TestingConfig()
56         cfg.Seed = true
57         if ps.SeederUploadRateLimiter != nil {
58                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
59         }
60         // cfg.ListenAddr = "localhost:4000"
61         if ps.SeederStorage != nil {
62                 storage := ps.SeederStorage(greetingTempDir)
63                 defer storage.Close()
64                 cfg.DefaultStorage = storage
65         } else {
66                 cfg.DataDir = greetingTempDir
67         }
68         if ps.ConfigureSeeder.Config != nil {
69                 ps.ConfigureSeeder.Config(cfg)
70         }
71         seeder, err := torrent.NewClient(cfg)
72         require.NoError(t, err)
73         if ps.ConfigureSeeder.Client != nil {
74                 ps.ConfigureSeeder.Client(seeder)
75         }
76         defer testutil.ExportStatusWriter(seeder, "s", t)()
77         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
78         // Run a Stats right after Closing the Client. This will trigger the Stats
79         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
80         defer seederTorrent.Stats()
81         defer seeder.Close()
82         seederTorrent.VerifyData()
83         // Create leecher and a Torrent.
84         leecherDataDir, err := ioutil.TempDir("", "")
85         require.NoError(t, err)
86         defer os.RemoveAll(leecherDataDir)
87         cfg = torrent.TestingConfig()
88         if ps.LeecherStorage == nil {
89                 cfg.DataDir = leecherDataDir
90         } else {
91                 storage := ps.LeecherStorage(leecherDataDir)
92                 defer storage.Close()
93                 cfg.DefaultStorage = storage
94         }
95         if ps.LeecherDownloadRateLimiter != nil {
96                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
97         }
98         cfg.Seed = false
99         if ps.ConfigureLeecher.Config != nil {
100                 ps.ConfigureLeecher.Config(cfg)
101         }
102         leecher, err := torrent.NewClient(cfg)
103         require.NoError(t, err)
104         defer leecher.Close()
105         if ps.ConfigureLeecher.Client != nil {
106                 ps.ConfigureLeecher.Client(leecher)
107         }
108         defer testutil.ExportStatusWriter(leecher, "l", t)()
109         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
110                 ret = torrent.TorrentSpecFromMetaInfo(mi)
111                 ret.ChunkSize = 2
112                 if ps.LeecherStartsWithoutMetadata {
113                         ret.InfoBytes = nil
114                 }
115                 return
116         }())
117         require.NoError(t, err)
118         assert.True(t, new)
119
120         //// This was used when observing coalescing of piece state changes.
121         //logPieceStateChanges(leecherTorrent)
122
123         // Now do some things with leecher and seeder.
124         added := leecherTorrent.AddClientPeer(seeder)
125         assert.False(t, leecherTorrent.Seeding())
126         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
127         // should be sitting idle until we demand data.
128         if !ps.LeecherStartsWithoutMetadata {
129                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
130         }
131         if ps.LeecherStartsWithoutMetadata {
132                 <-leecherTorrent.GotInfo()
133         }
134         r := leecherTorrent.NewReader()
135         defer r.Close()
136         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
137         if ps.Responsive {
138                 r.SetResponsive()
139         }
140         if ps.SetReadahead {
141                 r.SetReadahead(ps.Readahead)
142         }
143         assertReadAllGreeting(t, r)
144         assert.NotEmpty(t, seederTorrent.PeerConns())
145         leecherPeerConns := leecherTorrent.PeerConns()
146         assert.NotEmpty(t, leecherPeerConns)
147         foundSeeder := false
148         for _, pc := range leecherPeerConns {
149                 completed := pc.PeerPieces().Len()
150                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
151                 if completed == leecherTorrent.Info().NumPieces() {
152                         foundSeeder = true
153                 }
154         }
155         if !foundSeeder {
156                 t.Errorf("didn't find seeder amongst leecher peer conns")
157         }
158
159         seederStats := seederTorrent.Stats()
160         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
161         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
162
163         leecherStats := leecherTorrent.Stats()
164         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
165         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
166
167         // Try reading through again for the cases where the torrent data size
168         // exceeds the size of the cache.
169         assertReadAllGreeting(t, r)
170 }
171
172 type fileCacheClientStorageFactoryParams struct {
173         Capacity    int64
174         SetCapacity bool
175         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
176 }
177
178 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
179         return func(dataDir string) storage.ClientImplCloser {
180                 fc, err := filecache.NewCache(dataDir)
181                 if err != nil {
182                         panic(err)
183                 }
184                 if ps.SetCapacity {
185                         fc.SetCapacity(ps.Capacity)
186                 }
187                 return ps.Wrapper(fc)
188         }
189 }
190
191 type storageFactory func(string) storage.ClientImplCloser
192
193 func TestClientTransferDefault(t *testing.T) {
194         testClientTransfer(t, testClientTransferParams{
195                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
196                         Wrapper: fileCachePieceResourceStorage,
197                 }),
198         })
199 }
200
201 func TestClientTransferDefaultNoMetadata(t *testing.T) {
202         testClientTransfer(t, testClientTransferParams{
203                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206                 LeecherStartsWithoutMetadata: true,
207         })
208 }
209
210 func TestClientTransferRateLimitedUpload(t *testing.T) {
211         started := time.Now()
212         testClientTransfer(t, testClientTransferParams{
213                 // We are uploading 13 bytes (the length of the greeting torrent). The
214                 // chunks are 2 bytes in length. Then the smallest burst we can run
215                 // with is 2. Time taken is (13-burst)/rate.
216                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
217         })
218         require.True(t, time.Since(started) > time.Second)
219 }
220
221 func TestClientTransferRateLimitedDownload(t *testing.T) {
222         testClientTransfer(t, testClientTransferParams{
223                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
224         })
225 }
226
227 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
228         return struct {
229                 storage.ClientImpl
230                 io.Closer
231         }{
232                 storage.NewResourcePieces(fc.AsResourceProvider()),
233                 ioutil.NopCloser(nil),
234         }
235 }
236
237 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
238         testClientTransfer(t, testClientTransferParams{
239                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
240                         SetCapacity: true,
241                         // Going below the piece length means it can't complete a piece so
242                         // that it can be hashed.
243                         Capacity: 5,
244                         Wrapper:  fileCachePieceResourceStorage,
245                 }),
246                 SetReadahead: setReadahead,
247                 // Can't readahead too far or the cache will thrash and drop data we
248                 // thought we had.
249                 Readahead: readahead,
250
251                 // These tests don't work well with more than 1 connection to the seeder.
252                 ConfigureLeecher: ConfigureClient{
253                         Config: func(cfg *torrent.ClientConfig) {
254                                 cfg.DropDuplicatePeerIds = true
255                                 //cfg.DisableIPv6 = true
256                                 //cfg.DisableUTP = true
257                         },
258                 },
259         })
260 }
261
262 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
263         testClientTransferSmallCache(t, true, 5)
264 }
265
266 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
267         testClientTransferSmallCache(t, true, 15)
268 }
269
270 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
271         testClientTransferSmallCache(t, false, -1)
272 }
273
274 func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory {
275         return func(dataDir string) storage.ClientImplCloser {
276                 path := connPathMaker(dataDir)
277                 log.Printf("opening sqlite db at %q", path)
278                 conn, err := sqlite.OpenConn(path, 0)
279                 if err != nil {
280                         panic(err)
281                 }
282                 prov, err := sqliteStorage.NewProvider(conn)
283                 if err != nil {
284                         panic(err)
285                 }
286                 return struct {
287                         storage.ClientImpl
288                         io.Closer
289                 }{
290                         storage.NewResourcePieces(prov),
291                         conn,
292                 }
293
294         }
295 }
296
297 func TestClientTransferVarious(t *testing.T) {
298         // Leecher storage
299         for _, ls := range []struct {
300                 name string
301                 f    storageFactory
302         }{
303                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
304                         Wrapper: fileCachePieceResourceStorage,
305                 })},
306                 {"Boltdb", storage.NewBoltDB},
307                 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string {
308                         return "file:" + filepath.Join(dataDir, "sqlite.db")
309                 })},
310                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
311                         return "file:memory:?mode=memory"
312                 })},
313         } {
314                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
315                         // Seeder storage
316                         for _, ss := range []struct {
317                                 name string
318                                 f    storageFactory
319                         }{
320                                 {"File", storage.NewFile},
321                                 {"Mmap", storage.NewMMap},
322                         } {
323                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
324                                         for _, responsive := range []bool{false, true} {
325                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
326                                                         t.Run("NoReadahead", func(t *testing.T) {
327                                                                 testClientTransfer(t, testClientTransferParams{
328                                                                         Responsive:     responsive,
329                                                                         SeederStorage:  ss.f,
330                                                                         LeecherStorage: ls.f,
331                                                                 })
332                                                         })
333                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
334                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
335                                                                         testClientTransfer(t, testClientTransferParams{
336                                                                                 SeederStorage:  ss.f,
337                                                                                 Responsive:     responsive,
338                                                                                 SetReadahead:   true,
339                                                                                 Readahead:      readahead,
340                                                                                 LeecherStorage: ls.f,
341                                                                         })
342                                                                 })
343                                                         }
344                                                 })
345                                         }
346                                 })
347                         }
348                 })
349         }
350 }
351
352 // Check that after completing leeching, a leecher transitions to a seeding
353 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
354 func TestSeedAfterDownloading(t *testing.T) {
355         greetingTempDir, mi := testutil.GreetingTestTorrent()
356         defer os.RemoveAll(greetingTempDir)
357
358         cfg := torrent.TestingConfig()
359         cfg.Seed = true
360         cfg.DataDir = greetingTempDir
361         seeder, err := torrent.NewClient(cfg)
362         require.NoError(t, err)
363         defer seeder.Close()
364         defer testutil.ExportStatusWriter(seeder, "s", t)()
365         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
366         require.NoError(t, err)
367         assert.True(t, ok)
368         seederTorrent.VerifyData()
369
370         cfg = torrent.TestingConfig()
371         cfg.Seed = true
372         cfg.DataDir, err = ioutil.TempDir("", "")
373         require.NoError(t, err)
374         defer os.RemoveAll(cfg.DataDir)
375         leecher, err := torrent.NewClient(cfg)
376         require.NoError(t, err)
377         defer leecher.Close()
378         defer testutil.ExportStatusWriter(leecher, "l", t)()
379
380         cfg = torrent.TestingConfig()
381         cfg.Seed = false
382         cfg.DataDir, err = ioutil.TempDir("", "")
383         require.NoError(t, err)
384         defer os.RemoveAll(cfg.DataDir)
385         leecherLeecher, _ := torrent.NewClient(cfg)
386         require.NoError(t, err)
387         defer leecherLeecher.Close()
388         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
389         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
390                 ret = torrent.TorrentSpecFromMetaInfo(mi)
391                 ret.ChunkSize = 2
392                 return
393         }())
394         require.NoError(t, err)
395         assert.True(t, ok)
396         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
397                 ret = torrent.TorrentSpecFromMetaInfo(mi)
398                 ret.ChunkSize = 3
399                 return
400         }())
401         require.NoError(t, err)
402         assert.True(t, ok)
403         // Simultaneously DownloadAll in Leecher, and read the contents
404         // consecutively in LeecherLeecher. This non-deterministically triggered a
405         // case where the leecher wouldn't unchoke the LeecherLeecher.
406         var wg sync.WaitGroup
407         wg.Add(1)
408         go func() {
409                 defer wg.Done()
410                 r := llg.NewReader()
411                 defer r.Close()
412                 b, err := ioutil.ReadAll(r)
413                 require.NoError(t, err)
414                 assert.EqualValues(t, testutil.GreetingFileContents, b)
415         }()
416         done := make(chan struct{})
417         defer close(done)
418         go leecherGreeting.AddClientPeer(seeder)
419         go leecherGreeting.AddClientPeer(leecherLeecher)
420         wg.Add(1)
421         go func() {
422                 defer wg.Done()
423                 leecherGreeting.DownloadAll()
424                 leecher.WaitAll()
425         }()
426         wg.Wait()
427 }
428
429 type ConfigureClient struct {
430         Config func(*torrent.ClientConfig)
431         Client func(*torrent.Client)
432 }