]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
sqlite storage: Provide helpers and reasonable defaults
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "log"
8         "os"
9         "path/filepath"
10         "sync"
11         "testing"
12         "time"
13
14         "github.com/anacrolix/missinggo/v2/filecache"
15         "github.com/anacrolix/torrent"
16         "github.com/anacrolix/torrent/internal/testutil"
17         "github.com/anacrolix/torrent/storage"
18         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
19         "golang.org/x/time/rate"
20
21         "github.com/stretchr/testify/assert"
22         "github.com/stretchr/testify/require"
23 )
24
25 type testClientTransferParams struct {
26         Responsive                 bool
27         Readahead                  int64
28         SetReadahead               bool
29         LeecherStorage             func(string) storage.ClientImplCloser
30         SeederStorage              func(string) storage.ClientImplCloser
31         SeederUploadRateLimiter    *rate.Limiter
32         LeecherDownloadRateLimiter *rate.Limiter
33         ConfigureSeeder            ConfigureClient
34         ConfigureLeecher           ConfigureClient
35
36         LeecherStartsWithoutMetadata bool
37 }
38
39 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
40         pos, err := r.Seek(0, io.SeekStart)
41         assert.NoError(t, err)
42         assert.EqualValues(t, 0, pos)
43         _greeting, err := ioutil.ReadAll(r)
44         assert.NoError(t, err)
45         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
46 }
47
48 // Creates a seeder and a leecher, and ensures the data transfers when a read
49 // is attempted on the leecher.
50 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
51         greetingTempDir, mi := testutil.GreetingTestTorrent()
52         defer os.RemoveAll(greetingTempDir)
53         // Create seeder and a Torrent.
54         cfg := torrent.TestingConfig()
55         cfg.Seed = true
56         if ps.SeederUploadRateLimiter != nil {
57                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
58         }
59         // cfg.ListenAddr = "localhost:4000"
60         if ps.SeederStorage != nil {
61                 storage := ps.SeederStorage(greetingTempDir)
62                 defer storage.Close()
63                 cfg.DefaultStorage = storage
64         } else {
65                 cfg.DataDir = greetingTempDir
66         }
67         if ps.ConfigureSeeder.Config != nil {
68                 ps.ConfigureSeeder.Config(cfg)
69         }
70         seeder, err := torrent.NewClient(cfg)
71         require.NoError(t, err)
72         if ps.ConfigureSeeder.Client != nil {
73                 ps.ConfigureSeeder.Client(seeder)
74         }
75         defer testutil.ExportStatusWriter(seeder, "s", t)()
76         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
77         // Run a Stats right after Closing the Client. This will trigger the Stats
78         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
79         defer seederTorrent.Stats()
80         defer seeder.Close()
81         seederTorrent.VerifyData()
82         // Create leecher and a Torrent.
83         leecherDataDir, err := ioutil.TempDir("", "")
84         require.NoError(t, err)
85         defer os.RemoveAll(leecherDataDir)
86         cfg = torrent.TestingConfig()
87         if ps.LeecherStorage == nil {
88                 cfg.DataDir = leecherDataDir
89         } else {
90                 storage := ps.LeecherStorage(leecherDataDir)
91                 defer storage.Close()
92                 cfg.DefaultStorage = storage
93         }
94         if ps.LeecherDownloadRateLimiter != nil {
95                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
96         }
97         cfg.Seed = false
98         if ps.ConfigureLeecher.Config != nil {
99                 ps.ConfigureLeecher.Config(cfg)
100         }
101         leecher, err := torrent.NewClient(cfg)
102         require.NoError(t, err)
103         defer leecher.Close()
104         if ps.ConfigureLeecher.Client != nil {
105                 ps.ConfigureLeecher.Client(leecher)
106         }
107         defer testutil.ExportStatusWriter(leecher, "l", t)()
108         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
109                 ret = torrent.TorrentSpecFromMetaInfo(mi)
110                 ret.ChunkSize = 2
111                 if ps.LeecherStartsWithoutMetadata {
112                         ret.InfoBytes = nil
113                 }
114                 return
115         }())
116         require.NoError(t, err)
117         assert.True(t, new)
118
119         //// This was used when observing coalescing of piece state changes.
120         //logPieceStateChanges(leecherTorrent)
121
122         // Now do some things with leecher and seeder.
123         added := leecherTorrent.AddClientPeer(seeder)
124         assert.False(t, leecherTorrent.Seeding())
125         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
126         // should be sitting idle until we demand data.
127         if !ps.LeecherStartsWithoutMetadata {
128                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
129         }
130         if ps.LeecherStartsWithoutMetadata {
131                 <-leecherTorrent.GotInfo()
132         }
133         r := leecherTorrent.NewReader()
134         defer r.Close()
135         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
136         if ps.Responsive {
137                 r.SetResponsive()
138         }
139         if ps.SetReadahead {
140                 r.SetReadahead(ps.Readahead)
141         }
142         assertReadAllGreeting(t, r)
143         assert.NotEmpty(t, seederTorrent.PeerConns())
144         leecherPeerConns := leecherTorrent.PeerConns()
145         assert.NotEmpty(t, leecherPeerConns)
146         foundSeeder := false
147         for _, pc := range leecherPeerConns {
148                 completed := pc.PeerPieces().Len()
149                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
150                 if completed == leecherTorrent.Info().NumPieces() {
151                         foundSeeder = true
152                 }
153         }
154         if !foundSeeder {
155                 t.Errorf("didn't find seeder amongst leecher peer conns")
156         }
157
158         seederStats := seederTorrent.Stats()
159         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
160         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
161
162         leecherStats := leecherTorrent.Stats()
163         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
164         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
165
166         // Try reading through again for the cases where the torrent data size
167         // exceeds the size of the cache.
168         assertReadAllGreeting(t, r)
169 }
170
171 type fileCacheClientStorageFactoryParams struct {
172         Capacity    int64
173         SetCapacity bool
174         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
175 }
176
177 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
178         return func(dataDir string) storage.ClientImplCloser {
179                 fc, err := filecache.NewCache(dataDir)
180                 if err != nil {
181                         panic(err)
182                 }
183                 if ps.SetCapacity {
184                         fc.SetCapacity(ps.Capacity)
185                 }
186                 return ps.Wrapper(fc)
187         }
188 }
189
190 type storageFactory func(string) storage.ClientImplCloser
191
192 func TestClientTransferDefault(t *testing.T) {
193         testClientTransfer(t, testClientTransferParams{
194                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
195                         Wrapper: fileCachePieceResourceStorage,
196                 }),
197         })
198 }
199
200 func TestClientTransferDefaultNoMetadata(t *testing.T) {
201         testClientTransfer(t, testClientTransferParams{
202                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
203                         Wrapper: fileCachePieceResourceStorage,
204                 }),
205                 LeecherStartsWithoutMetadata: true,
206         })
207 }
208
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210         started := time.Now()
211         testClientTransfer(t, testClientTransferParams{
212                 // We are uploading 13 bytes (the length of the greeting torrent). The
213                 // chunks are 2 bytes in length. Then the smallest burst we can run
214                 // with is 2. Time taken is (13-burst)/rate.
215                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
216         })
217         require.True(t, time.Since(started) > time.Second)
218 }
219
220 func TestClientTransferRateLimitedDownload(t *testing.T) {
221         testClientTransfer(t, testClientTransferParams{
222                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
223         })
224 }
225
226 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
227         return struct {
228                 storage.ClientImpl
229                 io.Closer
230         }{
231                 storage.NewResourcePieces(fc.AsResourceProvider()),
232                 ioutil.NopCloser(nil),
233         }
234 }
235
236 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
237         testClientTransfer(t, testClientTransferParams{
238                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
239                         SetCapacity: true,
240                         // Going below the piece length means it can't complete a piece so
241                         // that it can be hashed.
242                         Capacity: 5,
243                         Wrapper:  fileCachePieceResourceStorage,
244                 }),
245                 SetReadahead: setReadahead,
246                 // Can't readahead too far or the cache will thrash and drop data we
247                 // thought we had.
248                 Readahead: readahead,
249
250                 // These tests don't work well with more than 1 connection to the seeder.
251                 ConfigureLeecher: ConfigureClient{
252                         Config: func(cfg *torrent.ClientConfig) {
253                                 cfg.DropDuplicatePeerIds = true
254                                 //cfg.DisableIPv6 = true
255                                 //cfg.DisableUTP = true
256                         },
257                 },
258         })
259 }
260
261 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
262         testClientTransferSmallCache(t, true, 5)
263 }
264
265 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
266         testClientTransferSmallCache(t, true, 15)
267 }
268
269 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
270         testClientTransferSmallCache(t, false, -1)
271 }
272
273 func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
274         return func(dataDir string) storage.ClientImplCloser {
275                 connOpts := connOptsMaker(dataDir)
276                 log.Printf("opening sqlite db: %#v", connOpts)
277                 ret, err := sqliteStorage.NewPiecesStorage(connOpts)
278                 if err != nil {
279                         panic(err)
280                 }
281                 return ret
282         }
283 }
284
285 func TestClientTransferVarious(t *testing.T) {
286         // Leecher storage
287         for _, ls := range []struct {
288                 name string
289                 f    storageFactory
290         }{
291                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
292                         Wrapper: fileCachePieceResourceStorage,
293                 })},
294                 {"Boltdb", storage.NewBoltDB},
295                 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
296                         return sqliteStorage.NewPoolOpts{
297                                 Path: filepath.Join(dataDir, "sqlite.db"),
298                         }
299                 })},
300                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
301                         return sqliteStorage.NewPoolOpts{
302                                 Memory: true,
303                         }
304                 })},
305         } {
306                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
307                         // Seeder storage
308                         for _, ss := range []struct {
309                                 name string
310                                 f    storageFactory
311                         }{
312                                 {"File", storage.NewFile},
313                                 {"Mmap", storage.NewMMap},
314                         } {
315                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
316                                         for _, responsive := range []bool{false, true} {
317                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
318                                                         t.Run("NoReadahead", func(t *testing.T) {
319                                                                 testClientTransfer(t, testClientTransferParams{
320                                                                         Responsive:     responsive,
321                                                                         SeederStorage:  ss.f,
322                                                                         LeecherStorage: ls.f,
323                                                                 })
324                                                         })
325                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
326                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
327                                                                         testClientTransfer(t, testClientTransferParams{
328                                                                                 SeederStorage:  ss.f,
329                                                                                 Responsive:     responsive,
330                                                                                 SetReadahead:   true,
331                                                                                 Readahead:      readahead,
332                                                                                 LeecherStorage: ls.f,
333                                                                         })
334                                                                 })
335                                                         }
336                                                 })
337                                         }
338                                 })
339                         }
340                 })
341         }
342 }
343
344 // Check that after completing leeching, a leecher transitions to a seeding
345 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
346 func TestSeedAfterDownloading(t *testing.T) {
347         greetingTempDir, mi := testutil.GreetingTestTorrent()
348         defer os.RemoveAll(greetingTempDir)
349
350         cfg := torrent.TestingConfig()
351         cfg.Seed = true
352         cfg.DataDir = greetingTempDir
353         seeder, err := torrent.NewClient(cfg)
354         require.NoError(t, err)
355         defer seeder.Close()
356         defer testutil.ExportStatusWriter(seeder, "s", t)()
357         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
358         require.NoError(t, err)
359         assert.True(t, ok)
360         seederTorrent.VerifyData()
361
362         cfg = torrent.TestingConfig()
363         cfg.Seed = true
364         cfg.DataDir, err = ioutil.TempDir("", "")
365         require.NoError(t, err)
366         defer os.RemoveAll(cfg.DataDir)
367         leecher, err := torrent.NewClient(cfg)
368         require.NoError(t, err)
369         defer leecher.Close()
370         defer testutil.ExportStatusWriter(leecher, "l", t)()
371
372         cfg = torrent.TestingConfig()
373         cfg.Seed = false
374         cfg.DataDir, err = ioutil.TempDir("", "")
375         require.NoError(t, err)
376         defer os.RemoveAll(cfg.DataDir)
377         leecherLeecher, _ := torrent.NewClient(cfg)
378         require.NoError(t, err)
379         defer leecherLeecher.Close()
380         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
381         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
382                 ret = torrent.TorrentSpecFromMetaInfo(mi)
383                 ret.ChunkSize = 2
384                 return
385         }())
386         require.NoError(t, err)
387         assert.True(t, ok)
388         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
389                 ret = torrent.TorrentSpecFromMetaInfo(mi)
390                 ret.ChunkSize = 3
391                 return
392         }())
393         require.NoError(t, err)
394         assert.True(t, ok)
395         // Simultaneously DownloadAll in Leecher, and read the contents
396         // consecutively in LeecherLeecher. This non-deterministically triggered a
397         // case where the leecher wouldn't unchoke the LeecherLeecher.
398         var wg sync.WaitGroup
399         wg.Add(1)
400         go func() {
401                 defer wg.Done()
402                 r := llg.NewReader()
403                 defer r.Close()
404                 b, err := ioutil.ReadAll(r)
405                 require.NoError(t, err)
406                 assert.EqualValues(t, testutil.GreetingFileContents, b)
407         }()
408         done := make(chan struct{})
409         defer close(done)
410         go leecherGreeting.AddClientPeer(seeder)
411         go leecherGreeting.AddClientPeer(leecherLeecher)
412         wg.Add(1)
413         go func() {
414                 defer wg.Done()
415                 leecherGreeting.DownloadAll()
416                 leecher.WaitAll()
417         }()
418         wg.Wait()
419 }
420
421 type ConfigureClient struct {
422         Config func(*torrent.ClientConfig)
423         Client func(*torrent.Client)
424 }