]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Fix transfer test check for seeder piece counts
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "sync"
9         "testing"
10         "time"
11
12         "github.com/anacrolix/missinggo/v2/filecache"
13         "github.com/anacrolix/torrent"
14         "github.com/anacrolix/torrent/internal/testutil"
15         "github.com/anacrolix/torrent/storage"
16         "golang.org/x/time/rate"
17
18         "github.com/stretchr/testify/assert"
19         "github.com/stretchr/testify/require"
20 )
21
22 type testClientTransferParams struct {
23         Responsive                 bool
24         Readahead                  int64
25         SetReadahead               bool
26         ExportClientStatus         bool
27         LeecherStorage             func(string) storage.ClientImplCloser
28         SeederStorage              func(string) storage.ClientImplCloser
29         SeederUploadRateLimiter    *rate.Limiter
30         LeecherDownloadRateLimiter *rate.Limiter
31         ConfigureSeeder            ConfigureClient
32         ConfigureLeecher           ConfigureClient
33 }
34
35 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
36         pos, err := r.Seek(0, io.SeekStart)
37         assert.NoError(t, err)
38         assert.EqualValues(t, 0, pos)
39         _greeting, err := ioutil.ReadAll(r)
40         assert.NoError(t, err)
41         assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
42 }
43
44 // Creates a seeder and a leecher, and ensures the data transfers when a read
45 // is attempted on the leecher.
46 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
47         greetingTempDir, mi := testutil.GreetingTestTorrent()
48         defer os.RemoveAll(greetingTempDir)
49         // Create seeder and a Torrent.
50         cfg := torrent.TestingConfig()
51         cfg.Seed = true
52         if ps.SeederUploadRateLimiter != nil {
53                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
54         }
55         // cfg.ListenAddr = "localhost:4000"
56         if ps.SeederStorage != nil {
57                 storage := ps.SeederStorage(greetingTempDir)
58                 defer storage.Close()
59                 cfg.DefaultStorage = storage
60         } else {
61                 cfg.DataDir = greetingTempDir
62         }
63         if ps.ConfigureSeeder.Config != nil {
64                 ps.ConfigureSeeder.Config(cfg)
65         }
66         seeder, err := torrent.NewClient(cfg)
67         require.NoError(t, err)
68         if ps.ConfigureSeeder.Client != nil {
69                 ps.ConfigureSeeder.Client(seeder)
70         }
71         if ps.ExportClientStatus {
72                 defer testutil.ExportStatusWriter(seeder, "s")()
73         }
74         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
75         // Run a Stats right after Closing the Client. This will trigger the Stats
76         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
77         defer seederTorrent.Stats()
78         defer seeder.Close()
79         seederTorrent.VerifyData()
80         // Create leecher and a Torrent.
81         leecherDataDir, err := ioutil.TempDir("", "")
82         require.NoError(t, err)
83         defer os.RemoveAll(leecherDataDir)
84         cfg = torrent.TestingConfig()
85         if ps.LeecherStorage == nil {
86                 cfg.DataDir = leecherDataDir
87         } else {
88                 storage := ps.LeecherStorage(leecherDataDir)
89                 defer storage.Close()
90                 cfg.DefaultStorage = storage
91         }
92         if ps.LeecherDownloadRateLimiter != nil {
93                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
94         }
95         cfg.Seed = false
96         if ps.ConfigureLeecher.Config != nil {
97                 ps.ConfigureLeecher.Config(cfg)
98         }
99         leecher, err := torrent.NewClient(cfg)
100         require.NoError(t, err)
101         defer leecher.Close()
102         if ps.ConfigureLeecher.Client != nil {
103                 ps.ConfigureLeecher.Client(leecher)
104         }
105         if ps.ExportClientStatus {
106                 defer testutil.ExportStatusWriter(leecher, "l")()
107         }
108         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
109                 ret = torrent.TorrentSpecFromMetaInfo(mi)
110                 ret.ChunkSize = 2
111                 return
112         }())
113         require.NoError(t, err)
114         assert.True(t, new)
115
116         //// This was used when observing coalescing of piece state changes.
117         //logPieceStateChanges(leecherTorrent)
118
119         // Now do some things with leecher and seeder.
120         added := leecherTorrent.AddClientPeer(seeder)
121         // The Torrent should not be interested in obtaining peers, so the one we
122         // just added should be the only one.
123         assert.False(t, leecherTorrent.Seeding())
124         assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
125         r := leecherTorrent.NewReader()
126         defer r.Close()
127         if ps.Responsive {
128                 r.SetResponsive()
129         }
130         if ps.SetReadahead {
131                 r.SetReadahead(ps.Readahead)
132         }
133         assertReadAllGreeting(t, r)
134         assert.NotEmpty(t, seederTorrent.PeerConns())
135         leecherPeerConns := leecherTorrent.PeerConns()
136         assert.NotEmpty(t, leecherPeerConns)
137         foundSeeder := false
138         for _, pc := range leecherPeerConns {
139                 completed := pc.PeerPieces().Len()
140                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
141                 if completed == leecherTorrent.Info().NumPieces() {
142                         foundSeeder = true
143                 }
144         }
145         if !foundSeeder {
146                 t.Errorf("didn't find seeder amongst leecher peer conns")
147         }
148
149         seederStats := seederTorrent.Stats()
150         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
151         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
152
153         leecherStats := leecherTorrent.Stats()
154         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
155         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
156
157         // Try reading through again for the cases where the torrent data size
158         // exceeds the size of the cache.
159         assertReadAllGreeting(t, r)
160 }
161
162 type fileCacheClientStorageFactoryParams struct {
163         Capacity    int64
164         SetCapacity bool
165         Wrapper     func(*filecache.Cache) storage.ClientImplCloser
166 }
167
168 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
169         return func(dataDir string) storage.ClientImplCloser {
170                 fc, err := filecache.NewCache(dataDir)
171                 if err != nil {
172                         panic(err)
173                 }
174                 if ps.SetCapacity {
175                         fc.SetCapacity(ps.Capacity)
176                 }
177                 return ps.Wrapper(fc)
178         }
179 }
180
181 type storageFactory func(string) storage.ClientImplCloser
182
183 func TestClientTransferDefault(t *testing.T) {
184         testClientTransfer(t, testClientTransferParams{
185                 ExportClientStatus: true,
186                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
187                         Wrapper: fileCachePieceResourceStorage,
188                 }),
189         })
190 }
191
192 func TestClientTransferRateLimitedUpload(t *testing.T) {
193         started := time.Now()
194         testClientTransfer(t, testClientTransferParams{
195                 // We are uploading 13 bytes (the length of the greeting torrent). The
196                 // chunks are 2 bytes in length. Then the smallest burst we can run
197                 // with is 2. Time taken is (13-burst)/rate.
198                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
199                 ExportClientStatus:      true,
200         })
201         require.True(t, time.Since(started) > time.Second)
202 }
203
204 func TestClientTransferRateLimitedDownload(t *testing.T) {
205         testClientTransfer(t, testClientTransferParams{
206                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
207         })
208 }
209
210 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
211         return storage.NewResourcePieces(fc.AsResourceProvider())
212 }
213
214 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
215         testClientTransfer(t, testClientTransferParams{
216                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
217                         SetCapacity: true,
218                         // Going below the piece length means it can't complete a piece so
219                         // that it can be hashed.
220                         Capacity: 5,
221                         Wrapper:  fileCachePieceResourceStorage,
222                 }),
223                 SetReadahead: setReadahead,
224                 // Can't readahead too far or the cache will thrash and drop data we
225                 // thought we had.
226                 Readahead:          readahead,
227                 ExportClientStatus: true,
228
229                 // These tests don't work well with more than 1 connection to the seeder.
230                 ConfigureLeecher: ConfigureClient{
231                         Config: func(cfg *torrent.ClientConfig) {
232                                 cfg.DropDuplicatePeerIds = true
233                                 //cfg.DisableIPv6 = true
234                                 //cfg.DisableUTP = true
235                         },
236                 },
237         })
238 }
239
240 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
241         testClientTransferSmallCache(t, true, 5)
242 }
243
244 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
245         testClientTransferSmallCache(t, true, 15)
246 }
247
248 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
249         testClientTransferSmallCache(t, false, -1)
250 }
251
252 func TestClientTransferVarious(t *testing.T) {
253         // Leecher storage
254         for _, ls := range []struct {
255                 name string
256                 f    storageFactory
257         }{
258                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
259                         Wrapper: fileCachePieceResourceStorage,
260                 })},
261                 {"Boltdb", storage.NewBoltDB},
262         } {
263                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
264                         // Seeder storage
265                         for _, ss := range []struct {
266                                 name string
267                                 f    func(string) storage.ClientImplCloser
268                         }{
269                                 {"File", storage.NewFile},
270                                 {"Mmap", storage.NewMMap},
271                         } {
272                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
273                                         for _, responsive := range []bool{false, true} {
274                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
275                                                         t.Run("NoReadahead", func(t *testing.T) {
276                                                                 testClientTransfer(t, testClientTransferParams{
277                                                                         Responsive:     responsive,
278                                                                         SeederStorage:  ss.f,
279                                                                         LeecherStorage: ls.f,
280                                                                 })
281                                                         })
282                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
283                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
284                                                                         testClientTransfer(t, testClientTransferParams{
285                                                                                 SeederStorage:  ss.f,
286                                                                                 Responsive:     responsive,
287                                                                                 SetReadahead:   true,
288                                                                                 Readahead:      readahead,
289                                                                                 LeecherStorage: ls.f,
290                                                                         })
291                                                                 })
292                                                         }
293                                                 })
294                                         }
295                                 })
296                         }
297                 })
298         }
299 }
300
301 // Check that after completing leeching, a leecher transitions to a seeding
302 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
303 func TestSeedAfterDownloading(t *testing.T) {
304         greetingTempDir, mi := testutil.GreetingTestTorrent()
305         defer os.RemoveAll(greetingTempDir)
306
307         cfg := torrent.TestingConfig()
308         cfg.Seed = true
309         cfg.DataDir = greetingTempDir
310         seeder, err := torrent.NewClient(cfg)
311         require.NoError(t, err)
312         defer seeder.Close()
313         defer testutil.ExportStatusWriter(seeder, "s")()
314         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
315         require.NoError(t, err)
316         assert.True(t, ok)
317         seederTorrent.VerifyData()
318
319         cfg = torrent.TestingConfig()
320         cfg.Seed = true
321         cfg.DataDir, err = ioutil.TempDir("", "")
322         require.NoError(t, err)
323         defer os.RemoveAll(cfg.DataDir)
324         leecher, err := torrent.NewClient(cfg)
325         require.NoError(t, err)
326         defer leecher.Close()
327         defer testutil.ExportStatusWriter(leecher, "l")()
328
329         cfg = torrent.TestingConfig()
330         cfg.Seed = false
331         cfg.DataDir, err = ioutil.TempDir("", "")
332         require.NoError(t, err)
333         defer os.RemoveAll(cfg.DataDir)
334         leecherLeecher, _ := torrent.NewClient(cfg)
335         require.NoError(t, err)
336         defer leecherLeecher.Close()
337         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
338         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
339                 ret = torrent.TorrentSpecFromMetaInfo(mi)
340                 ret.ChunkSize = 2
341                 return
342         }())
343         require.NoError(t, err)
344         assert.True(t, ok)
345         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
346                 ret = torrent.TorrentSpecFromMetaInfo(mi)
347                 ret.ChunkSize = 3
348                 return
349         }())
350         require.NoError(t, err)
351         assert.True(t, ok)
352         // Simultaneously DownloadAll in Leecher, and read the contents
353         // consecutively in LeecherLeecher. This non-deterministically triggered a
354         // case where the leecher wouldn't unchoke the LeecherLeecher.
355         var wg sync.WaitGroup
356         wg.Add(1)
357         go func() {
358                 defer wg.Done()
359                 r := llg.NewReader()
360                 defer r.Close()
361                 b, err := ioutil.ReadAll(r)
362                 require.NoError(t, err)
363                 assert.EqualValues(t, testutil.GreetingFileContents, b)
364         }()
365         done := make(chan struct{})
366         defer close(done)
367         go leecherGreeting.AddClientPeer(seeder)
368         go leecherGreeting.AddClientPeer(leecherLeecher)
369         wg.Add(1)
370         go func() {
371                 defer wg.Done()
372                 leecherGreeting.DownloadAll()
373                 leecher.WaitAll()
374         }()
375         wg.Wait()
376 }
377
378 type ConfigureClient struct {
379         Config func(*torrent.ClientConfig)
380         Client func(*torrent.Client)
381 }