]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Fix small cache transfer tests
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "sync"
9         "testing"
10         "time"
11
12         "github.com/anacrolix/missinggo/v2/filecache"
13         "github.com/anacrolix/torrent"
14         "github.com/anacrolix/torrent/internal/testutil"
15         "github.com/anacrolix/torrent/storage"
16         "golang.org/x/time/rate"
17
18         "github.com/stretchr/testify/assert"
19         "github.com/stretchr/testify/require"
20 )
21
22 type testClientTransferParams struct {
23         Responsive                 bool
24         Readahead                  int64
25         SetReadahead               bool
26         ExportClientStatus         bool
27         LeecherStorage             func(string) storage.ClientImplCloser
28         SeederStorage              func(string) storage.ClientImplCloser
29         SeederUploadRateLimiter    *rate.Limiter
30         LeecherDownloadRateLimiter *rate.Limiter
31         ConfigureSeeder            ConfigureClient
32         ConfigureLeecher           ConfigureClient
33 }
34
35 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
36         pos, err := r.Seek(0, io.SeekStart)
37         assert.NoError(t, err)
38         assert.EqualValues(t, 0, pos)
39         _greeting, err := ioutil.ReadAll(r)
40         assert.NoError(t, err)
41         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
42 }
43
44 // Creates a seeder and a leecher, and ensures the data transfers when a read
45 // is attempted on the leecher.
46 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
47         greetingTempDir, mi := testutil.GreetingTestTorrent()
48         defer os.RemoveAll(greetingTempDir)
49         // Create seeder and a Torrent.
50         cfg := torrent.TestingConfig()
51         cfg.Seed = true
52         if ps.SeederUploadRateLimiter != nil {
53                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
54         }
55         // cfg.ListenAddr = "localhost:4000"
56         if ps.SeederStorage != nil {
57                 storage := ps.SeederStorage(greetingTempDir)
58                 defer storage.Close()
59                 cfg.DefaultStorage = storage
60         } else {
61                 cfg.DataDir = greetingTempDir
62         }
63         if ps.ConfigureSeeder.Config != nil {
64                 ps.ConfigureSeeder.Config(cfg)
65         }
66         seeder, err := torrent.NewClient(cfg)
67         require.NoError(t, err)
68         if ps.ConfigureSeeder.Client != nil {
69                 ps.ConfigureSeeder.Client(seeder)
70         }
71         if ps.ExportClientStatus {
72                 defer testutil.ExportStatusWriter(seeder, "s")()
73         }
74         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
75         // Run a Stats right after Closing the Client. This will trigger the Stats
76         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
77         defer seederTorrent.Stats()
78         defer seeder.Close()
79         seederTorrent.VerifyData()
80         // Create leecher and a Torrent.
81         leecherDataDir, err := ioutil.TempDir("", "")
82         require.NoError(t, err)
83         defer os.RemoveAll(leecherDataDir)
84         cfg = torrent.TestingConfig()
85         if ps.LeecherStorage == nil {
86                 cfg.DataDir = leecherDataDir
87         } else {
88                 storage := ps.LeecherStorage(leecherDataDir)
89                 defer storage.Close()
90                 cfg.DefaultStorage = storage
91         }
92         if ps.LeecherDownloadRateLimiter != nil {
93                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
94         }
95         cfg.Seed = false
96         if ps.ConfigureLeecher.Config != nil {
97                 ps.ConfigureLeecher.Config(cfg)
98         }
99         leecher, err := torrent.NewClient(cfg)
100         require.NoError(t, err)
101         defer leecher.Close()
102         if ps.ConfigureLeecher.Client != nil {
103                 ps.ConfigureLeecher.Client(leecher)
104         }
105         if ps.ExportClientStatus {
106                 defer testutil.ExportStatusWriter(leecher, "l")()
107         }
108         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
109                 ret = torrent.TorrentSpecFromMetaInfo(mi)
110                 ret.ChunkSize = 2
111                 return
112         }())
113         require.NoError(t, err)
114         assert.True(t, new)
115
116         //// This was used when observing coalescing of piece state changes.
117         //logPieceStateChanges(leecherTorrent)
118
119         // Now do some things with leecher and seeder.
120         added := leecherTorrent.AddClientPeer(seeder)
121         // The Torrent should not be interested in obtaining peers, so the one we
122         // just added should be the only one.
123         assert.False(t, leecherTorrent.Seeding())
124         assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
125         r := leecherTorrent.NewReader()
126         defer r.Close()
127         if ps.Responsive {
128                 r.SetResponsive()
129         }
130         if ps.SetReadahead {
131                 r.SetReadahead(ps.Readahead)
132         }
133         assertReadAllGreeting(t, r)
134         assert.NotEmpty(t, seederTorrent.PeerConns())
135         leecherPeerConns := leecherTorrent.PeerConns()
136         assert.NotEmpty(t, leecherPeerConns)
137         for _, pc := range leecherPeerConns {
138                 assert.EqualValues(t, leecherTorrent.Info().NumPieces(), pc.PeerPieces().Len())
139         }
140
141         seederStats := seederTorrent.Stats()
142         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
143         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
144
145         leecherStats := leecherTorrent.Stats()
146         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
147         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
148
149         // Try reading through again for the cases where the torrent data size
150         // exceeds the size of the cache.
151         assertReadAllGreeting(t, r)
152 }
153
154 type fileCacheClientStorageFactoryParams struct {
155         Capacity    int64
156         SetCapacity bool
157         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
158 }
159
160 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
161         return func(dataDir string) storage.ClientImplCloser {
162                 fc, err := filecache.NewCache(dataDir)
163                 if err != nil {
164                         panic(err)
165                 }
166                 if ps.SetCapacity {
167                         fc.SetCapacity(ps.Capacity)
168                 }
169                 return ps.Wrapper(fc)
170         }
171 }
172
173 type storageFactory func(string) storage.ClientImplCloser
174
175 func TestClientTransferDefault(t *testing.T) {
176         testClientTransfer(t, testClientTransferParams{
177                 ExportClientStatus: true,
178                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
179                         Wrapper: fileCachePieceResourceStorage,
180                 }),
181         })
182 }
183
184 func TestClientTransferRateLimitedUpload(t *testing.T) {
185         started := time.Now()
186         testClientTransfer(t, testClientTransferParams{
187                 // We are uploading 13 bytes (the length of the greeting torrent). The
188                 // chunks are 2 bytes in length. Then the smallest burst we can run
189                 // with is 2. Time taken is (13-burst)/rate.
190                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
191                 ExportClientStatus:      true,
192         })
193         require.True(t, time.Since(started) > time.Second)
194 }
195
196 func TestClientTransferRateLimitedDownload(t *testing.T) {
197         testClientTransfer(t, testClientTransferParams{
198                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
199         })
200 }
201
202 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
203         return storage.NewResourcePieces(fc.AsResourceProvider())
204 }
205
206 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
207         testClientTransfer(t, testClientTransferParams{
208                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
209                         SetCapacity: true,
210                         // Going below the piece length means it can't complete a piece so
211                         // that it can be hashed.
212                         Capacity: 5,
213                         Wrapper:  fileCachePieceResourceStorage,
214                 }),
215                 SetReadahead: setReadahead,
216                 // Can't readahead too far or the cache will thrash and drop data we
217                 // thought we had.
218                 Readahead:          readahead,
219                 ExportClientStatus: true,
220
221                 // These tests don't work well with more than 1 connection to the seeder.
222                 ConfigureLeecher: ConfigureClient{
223                         Config: func(cfg *torrent.ClientConfig) {
224                                 cfg.DropDuplicatePeerIds = true
225                                 //cfg.DisableIPv6 = true
226                                 //cfg.DisableUTP = true
227                         },
228                 },
229         })
230 }
231
232 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
233         testClientTransferSmallCache(t, true, 5)
234 }
235
236 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
237         testClientTransferSmallCache(t, true, 15)
238 }
239
240 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
241         testClientTransferSmallCache(t, false, -1)
242 }
243
244 func TestClientTransferVarious(t *testing.T) {
245         // Leecher storage
246         for _, ls := range []struct {
247                 name string
248                 f    storageFactory
249         }{
250                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
251                         Wrapper: fileCachePieceResourceStorage,
252                 })},
253                 {"Boltdb", storage.NewBoltDB},
254         } {
255                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
256                         // Seeder storage
257                         for _, ss := range []struct {
258                                 name string
259                                 f    func(string) storage.ClientImplCloser
260                         }{
261                                 {"File", storage.NewFile},
262                                 {"Mmap", storage.NewMMap},
263                         } {
264                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
265                                         for _, responsive := range []bool{false, true} {
266                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
267                                                         t.Run("NoReadahead", func(t *testing.T) {
268                                                                 testClientTransfer(t, testClientTransferParams{
269                                                                         Responsive:     responsive,
270                                                                         SeederStorage:  ss.f,
271                                                                         LeecherStorage: ls.f,
272                                                                 })
273                                                         })
274                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
275                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
276                                                                         testClientTransfer(t, testClientTransferParams{
277                                                                                 SeederStorage:  ss.f,
278                                                                                 Responsive:     responsive,
279                                                                                 SetReadahead:   true,
280                                                                                 Readahead:      readahead,
281                                                                                 LeecherStorage: ls.f,
282                                                                         })
283                                                                 })
284                                                         }
285                                                 })
286                                         }
287                                 })
288                         }
289                 })
290         }
291 }
292
293 // Check that after completing leeching, a leecher transitions to a seeding
294 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
295 func TestSeedAfterDownloading(t *testing.T) {
296         greetingTempDir, mi := testutil.GreetingTestTorrent()
297         defer os.RemoveAll(greetingTempDir)
298
299         cfg := torrent.TestingConfig()
300         cfg.Seed = true
301         cfg.DataDir = greetingTempDir
302         seeder, err := torrent.NewClient(cfg)
303         require.NoError(t, err)
304         defer seeder.Close()
305         defer testutil.ExportStatusWriter(seeder, "s")()
306         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
307         require.NoError(t, err)
308         assert.True(t, ok)
309         seederTorrent.VerifyData()
310
311         cfg = torrent.TestingConfig()
312         cfg.Seed = true
313         cfg.DataDir, err = ioutil.TempDir("", "")
314         require.NoError(t, err)
315         defer os.RemoveAll(cfg.DataDir)
316         leecher, err := torrent.NewClient(cfg)
317         require.NoError(t, err)
318         defer leecher.Close()
319         defer testutil.ExportStatusWriter(leecher, "l")()
320
321         cfg = torrent.TestingConfig()
322         cfg.Seed = false
323         cfg.DataDir, err = ioutil.TempDir("", "")
324         require.NoError(t, err)
325         defer os.RemoveAll(cfg.DataDir)
326         leecherLeecher, _ := torrent.NewClient(cfg)
327         require.NoError(t, err)
328         defer leecherLeecher.Close()
329         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
330         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
331                 ret = torrent.TorrentSpecFromMetaInfo(mi)
332                 ret.ChunkSize = 2
333                 return
334         }())
335         require.NoError(t, err)
336         assert.True(t, ok)
337         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
338                 ret = torrent.TorrentSpecFromMetaInfo(mi)
339                 ret.ChunkSize = 3
340                 return
341         }())
342         require.NoError(t, err)
343         assert.True(t, ok)
344         // Simultaneously DownloadAll in Leecher, and read the contents
345         // consecutively in LeecherLeecher. This non-deterministically triggered a
346         // case where the leecher wouldn't unchoke the LeecherLeecher.
347         var wg sync.WaitGroup
348         wg.Add(1)
349         go func() {
350                 defer wg.Done()
351                 r := llg.NewReader()
352                 defer r.Close()
353                 b, err := ioutil.ReadAll(r)
354                 require.NoError(t, err)
355                 assert.EqualValues(t, testutil.GreetingFileContents, b)
356         }()
357         done := make(chan struct{})
358         defer close(done)
359         go leecherGreeting.AddClientPeer(seeder)
360         go leecherGreeting.AddClientPeer(leecherLeecher)
361         wg.Add(1)
362         go func() {
363                 defer wg.Done()
364                 leecherGreeting.DownloadAll()
365                 leecher.WaitAll()
366         }()
367         wg.Wait()
368 }
369
370 type ConfigureClient struct {
371         Config func(*torrent.ClientConfig)
372         Client func(*torrent.Client)
373 }