]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Add the DropMutuallyCompletePeers ClientConfig field
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "log"
8         "os"
9         "path/filepath"
10         "sync"
11         "testing"
12         "time"
13
14         "github.com/anacrolix/missinggo/v2/filecache"
15         "github.com/anacrolix/torrent"
16         "github.com/anacrolix/torrent/internal/testutil"
17         "github.com/anacrolix/torrent/storage"
18         sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
19         "golang.org/x/time/rate"
20
21         "github.com/stretchr/testify/assert"
22         "github.com/stretchr/testify/require"
23 )
24
25 type testClientTransferParams struct {
26         Responsive                 bool
27         Readahead                  int64
28         SetReadahead               bool
29         LeecherStorage             func(string) storage.ClientImplCloser
30         SeederStorage              func(string) storage.ClientImplCloser
31         SeederUploadRateLimiter    *rate.Limiter
32         LeecherDownloadRateLimiter *rate.Limiter
33         ConfigureSeeder            ConfigureClient
34         ConfigureLeecher           ConfigureClient
35
36         LeecherStartsWithoutMetadata bool
37 }
38
39 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
40         pos, err := r.Seek(0, io.SeekStart)
41         assert.NoError(t, err)
42         assert.EqualValues(t, 0, pos)
43         _greeting, err := ioutil.ReadAll(r)
44         assert.NoError(t, err)
45         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
46 }
47
48 // Creates a seeder and a leecher, and ensures the data transfers when a read
49 // is attempted on the leecher.
50 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
51         greetingTempDir, mi := testutil.GreetingTestTorrent()
52         defer os.RemoveAll(greetingTempDir)
53         // Create seeder and a Torrent.
54         cfg := torrent.TestingConfig()
55         cfg.Seed = true
56         // Some test instances don't like this being on, even when there's no cache involved.
57         cfg.DropMutuallyCompletePeers = false
58         if ps.SeederUploadRateLimiter != nil {
59                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
60         }
61         // cfg.ListenAddr = "localhost:4000"
62         if ps.SeederStorage != nil {
63                 storage := ps.SeederStorage(greetingTempDir)
64                 defer storage.Close()
65                 cfg.DefaultStorage = storage
66         } else {
67                 cfg.DataDir = greetingTempDir
68         }
69         if ps.ConfigureSeeder.Config != nil {
70                 ps.ConfigureSeeder.Config(cfg)
71         }
72         seeder, err := torrent.NewClient(cfg)
73         require.NoError(t, err)
74         if ps.ConfigureSeeder.Client != nil {
75                 ps.ConfigureSeeder.Client(seeder)
76         }
77         defer testutil.ExportStatusWriter(seeder, "s", t)()
78         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
79         // Run a Stats right after Closing the Client. This will trigger the Stats
80         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
81         defer seederTorrent.Stats()
82         defer seeder.Close()
83         seederTorrent.VerifyData()
84         // Create leecher and a Torrent.
85         leecherDataDir, err := ioutil.TempDir("", "")
86         require.NoError(t, err)
87         defer os.RemoveAll(leecherDataDir)
88         cfg = torrent.TestingConfig()
89         // See the seeder client config comment.
90         cfg.DropMutuallyCompletePeers = false
91         if ps.LeecherStorage == nil {
92                 cfg.DataDir = leecherDataDir
93         } else {
94                 storage := ps.LeecherStorage(leecherDataDir)
95                 defer storage.Close()
96                 cfg.DefaultStorage = storage
97         }
98         if ps.LeecherDownloadRateLimiter != nil {
99                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
100         }
101         cfg.Seed = false
102         if ps.ConfigureLeecher.Config != nil {
103                 ps.ConfigureLeecher.Config(cfg)
104         }
105         leecher, err := torrent.NewClient(cfg)
106         require.NoError(t, err)
107         defer leecher.Close()
108         if ps.ConfigureLeecher.Client != nil {
109                 ps.ConfigureLeecher.Client(leecher)
110         }
111         defer testutil.ExportStatusWriter(leecher, "l", t)()
112         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
113                 ret = torrent.TorrentSpecFromMetaInfo(mi)
114                 ret.ChunkSize = 2
115                 if ps.LeecherStartsWithoutMetadata {
116                         ret.InfoBytes = nil
117                 }
118                 return
119         }())
120         require.NoError(t, err)
121         assert.True(t, new)
122
123         //// This was used when observing coalescing of piece state changes.
124         //logPieceStateChanges(leecherTorrent)
125
126         // Now do some things with leecher and seeder.
127         added := leecherTorrent.AddClientPeer(seeder)
128         assert.False(t, leecherTorrent.Seeding())
129         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
130         // should be sitting idle until we demand data.
131         if !ps.LeecherStartsWithoutMetadata {
132                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
133         }
134         if ps.LeecherStartsWithoutMetadata {
135                 <-leecherTorrent.GotInfo()
136         }
137         r := leecherTorrent.NewReader()
138         defer r.Close()
139         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
140         if ps.Responsive {
141                 r.SetResponsive()
142         }
143         if ps.SetReadahead {
144                 r.SetReadahead(ps.Readahead)
145         }
146         assertReadAllGreeting(t, r)
147         assert.NotEmpty(t, seederTorrent.PeerConns())
148         leecherPeerConns := leecherTorrent.PeerConns()
149         if cfg.DropMutuallyCompletePeers {
150                 // I don't think we can assume it will be empty already, due to timing.
151                 //assert.Empty(t, leecherPeerConns)
152         } else {
153                 assert.NotEmpty(t, leecherPeerConns)
154         }
155         foundSeeder := false
156         for _, pc := range leecherPeerConns {
157                 completed := pc.PeerPieces().Len()
158                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
159                 if completed == leecherTorrent.Info().NumPieces() {
160                         foundSeeder = true
161                 }
162         }
163         if !foundSeeder {
164                 t.Errorf("didn't find seeder amongst leecher peer conns")
165         }
166
167         seederStats := seederTorrent.Stats()
168         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
169         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
170
171         leecherStats := leecherTorrent.Stats()
172         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
173         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
174
175         // Try reading through again for the cases where the torrent data size
176         // exceeds the size of the cache.
177         assertReadAllGreeting(t, r)
178 }
179
180 type fileCacheClientStorageFactoryParams struct {
181         Capacity    int64
182         SetCapacity bool
183         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
184 }
185
186 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
187         return func(dataDir string) storage.ClientImplCloser {
188                 fc, err := filecache.NewCache(dataDir)
189                 if err != nil {
190                         panic(err)
191                 }
192                 if ps.SetCapacity {
193                         fc.SetCapacity(ps.Capacity)
194                 }
195                 return ps.Wrapper(fc)
196         }
197 }
198
199 type storageFactory func(string) storage.ClientImplCloser
200
201 func TestClientTransferDefault(t *testing.T) {
202         testClientTransfer(t, testClientTransferParams{
203                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
204                         Wrapper: fileCachePieceResourceStorage,
205                 }),
206         })
207 }
208
209 func TestClientTransferDefaultNoMetadata(t *testing.T) {
210         testClientTransfer(t, testClientTransferParams{
211                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
212                         Wrapper: fileCachePieceResourceStorage,
213                 }),
214                 LeecherStartsWithoutMetadata: true,
215         })
216 }
217
218 func TestClientTransferRateLimitedUpload(t *testing.T) {
219         started := time.Now()
220         testClientTransfer(t, testClientTransferParams{
221                 // We are uploading 13 bytes (the length of the greeting torrent). The
222                 // chunks are 2 bytes in length. Then the smallest burst we can run
223                 // with is 2. Time taken is (13-burst)/rate.
224                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
225         })
226         require.True(t, time.Since(started) > time.Second)
227 }
228
229 func TestClientTransferRateLimitedDownload(t *testing.T) {
230         testClientTransfer(t, testClientTransferParams{
231                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
232         })
233 }
234
235 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
236         return struct {
237                 storage.ClientImpl
238                 io.Closer
239         }{
240                 storage.NewResourcePieces(fc.AsResourceProvider()),
241                 ioutil.NopCloser(nil),
242         }
243 }
244
245 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
246         testClientTransfer(t, testClientTransferParams{
247                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
248                         SetCapacity: true,
249                         // Going below the piece length means it can't complete a piece so
250                         // that it can be hashed.
251                         Capacity: 5,
252                         Wrapper:  fileCachePieceResourceStorage,
253                 }),
254                 SetReadahead: setReadahead,
255                 // Can't readahead too far or the cache will thrash and drop data we
256                 // thought we had.
257                 Readahead: readahead,
258
259                 // These tests don't work well with more than 1 connection to the seeder.
260                 ConfigureLeecher: ConfigureClient{
261                         Config: func(cfg *torrent.ClientConfig) {
262                                 cfg.DropDuplicatePeerIds = true
263                                 //cfg.DisableIPv6 = true
264                                 //cfg.DisableUTP = true
265                         },
266                 },
267         })
268 }
269
270 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
271         testClientTransferSmallCache(t, true, 5)
272 }
273
274 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
275         testClientTransferSmallCache(t, true, 15)
276 }
277
278 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
279         testClientTransferSmallCache(t, false, -1)
280 }
281
282 func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
283         return func(dataDir string) storage.ClientImplCloser {
284                 connOpts := connOptsMaker(dataDir)
285                 log.Printf("opening sqlite db: %#v", connOpts)
286                 ret, err := sqliteStorage.NewPiecesStorage(connOpts)
287                 if err != nil {
288                         panic(err)
289                 }
290                 return ret
291         }
292 }
293
294 func TestClientTransferVarious(t *testing.T) {
295         // Leecher storage
296         for _, ls := range []struct {
297                 name string
298                 f    storageFactory
299         }{
300                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
301                         Wrapper: fileCachePieceResourceStorage,
302                 })},
303                 {"Boltdb", storage.NewBoltDB},
304                 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
305                         return sqliteStorage.NewPoolOpts{
306                                 Path: filepath.Join(dataDir, "sqlite.db"),
307                         }
308                 })},
309                 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
310                         return sqliteStorage.NewPoolOpts{
311                                 Memory: true,
312                         }
313                 })},
314         } {
315                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
316                         // Seeder storage
317                         for _, ss := range []struct {
318                                 name string
319                                 f    storageFactory
320                         }{
321                                 {"File", storage.NewFile},
322                                 {"Mmap", storage.NewMMap},
323                         } {
324                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
325                                         for _, responsive := range []bool{false, true} {
326                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
327                                                         t.Run("NoReadahead", func(t *testing.T) {
328                                                                 testClientTransfer(t, testClientTransferParams{
329                                                                         Responsive:     responsive,
330                                                                         SeederStorage:  ss.f,
331                                                                         LeecherStorage: ls.f,
332                                                                 })
333                                                         })
334                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
335                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
336                                                                         testClientTransfer(t, testClientTransferParams{
337                                                                                 SeederStorage:  ss.f,
338                                                                                 Responsive:     responsive,
339                                                                                 SetReadahead:   true,
340                                                                                 Readahead:      readahead,
341                                                                                 LeecherStorage: ls.f,
342                                                                         })
343                                                                 })
344                                                         }
345                                                 })
346                                         }
347                                 })
348                         }
349                 })
350         }
351 }
352
353 // Check that after completing leeching, a leecher transitions to a seeding
354 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
355 func TestSeedAfterDownloading(t *testing.T) {
356         greetingTempDir, mi := testutil.GreetingTestTorrent()
357         defer os.RemoveAll(greetingTempDir)
358
359         cfg := torrent.TestingConfig()
360         cfg.Seed = true
361         cfg.DataDir = greetingTempDir
362         seeder, err := torrent.NewClient(cfg)
363         require.NoError(t, err)
364         defer seeder.Close()
365         defer testutil.ExportStatusWriter(seeder, "s", t)()
366         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
367         require.NoError(t, err)
368         assert.True(t, ok)
369         seederTorrent.VerifyData()
370
371         cfg = torrent.TestingConfig()
372         cfg.Seed = true
373         cfg.DataDir, err = ioutil.TempDir("", "")
374         require.NoError(t, err)
375         defer os.RemoveAll(cfg.DataDir)
376         leecher, err := torrent.NewClient(cfg)
377         require.NoError(t, err)
378         defer leecher.Close()
379         defer testutil.ExportStatusWriter(leecher, "l", t)()
380
381         cfg = torrent.TestingConfig()
382         cfg.Seed = false
383         cfg.DataDir, err = ioutil.TempDir("", "")
384         require.NoError(t, err)
385         defer os.RemoveAll(cfg.DataDir)
386         leecherLeecher, _ := torrent.NewClient(cfg)
387         require.NoError(t, err)
388         defer leecherLeecher.Close()
389         defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
390         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
391                 ret = torrent.TorrentSpecFromMetaInfo(mi)
392                 ret.ChunkSize = 2
393                 return
394         }())
395         require.NoError(t, err)
396         assert.True(t, ok)
397         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
398                 ret = torrent.TorrentSpecFromMetaInfo(mi)
399                 ret.ChunkSize = 3
400                 return
401         }())
402         require.NoError(t, err)
403         assert.True(t, ok)
404         // Simultaneously DownloadAll in Leecher, and read the contents
405         // consecutively in LeecherLeecher. This non-deterministically triggered a
406         // case where the leecher wouldn't unchoke the LeecherLeecher.
407         var wg sync.WaitGroup
408         wg.Add(1)
409         go func() {
410                 defer wg.Done()
411                 r := llg.NewReader()
412                 defer r.Close()
413                 b, err := ioutil.ReadAll(r)
414                 require.NoError(t, err)
415                 assert.EqualValues(t, testutil.GreetingFileContents, b)
416         }()
417         done := make(chan struct{})
418         defer close(done)
419         go leecherGreeting.AddClientPeer(seeder)
420         go leecherGreeting.AddClientPeer(leecherLeecher)
421         wg.Add(1)
422         go func() {
423                 defer wg.Done()
424                 leecherGreeting.DownloadAll()
425                 leecher.WaitAll()
426         }()
427         wg.Wait()
428 }
429
430 type ConfigureClient struct {
431         Config func(*torrent.ClientConfig)
432         Client func(*torrent.Client)
433 }