]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Add support for non-IP-based networks
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "sync"
9         "testing"
10         "time"
11
12         "github.com/anacrolix/missinggo/v2/filecache"
13         "github.com/anacrolix/torrent"
14         "github.com/anacrolix/torrent/internal/testutil"
15         "github.com/anacrolix/torrent/storage"
16         "golang.org/x/time/rate"
17
18         "github.com/stretchr/testify/assert"
19         "github.com/stretchr/testify/require"
20 )
21
22 type testClientTransferParams struct {
23         Responsive                 bool
24         Readahead                  int64
25         SetReadahead               bool
26         ExportClientStatus         bool
27         LeecherStorage             func(string) storage.ClientImpl
28         SeederStorage              func(string) storage.ClientImpl
29         SeederUploadRateLimiter    *rate.Limiter
30         LeecherDownloadRateLimiter *rate.Limiter
31         ConfigureSeeder            ConfigureClient
32         ConfigureLeecher           ConfigureClient
33 }
34
35 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
36         pos, err := r.Seek(0, io.SeekStart)
37         assert.NoError(t, err)
38         assert.EqualValues(t, 0, pos)
39         _greeting, err := ioutil.ReadAll(r)
40         assert.NoError(t, err)
41         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
42 }
43
44 // Creates a seeder and a leecher, and ensures the data transfers when a read
45 // is attempted on the leecher.
46 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
47         greetingTempDir, mi := testutil.GreetingTestTorrent()
48         defer os.RemoveAll(greetingTempDir)
49         // Create seeder and a Torrent.
50         cfg := torrent.TestingConfig()
51         cfg.Seed = true
52         if ps.SeederUploadRateLimiter != nil {
53                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
54         }
55         // cfg.ListenAddr = "localhost:4000"
56         if ps.SeederStorage != nil {
57                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
58                 defer cfg.DefaultStorage.Close()
59         } else {
60                 cfg.DataDir = greetingTempDir
61         }
62         if ps.ConfigureSeeder.Config != nil {
63                 ps.ConfigureSeeder.Config(cfg)
64         }
65         seeder, err := torrent.NewClient(cfg)
66         require.NoError(t, err)
67         if ps.ConfigureSeeder.Client != nil {
68                 ps.ConfigureSeeder.Client(seeder)
69         }
70         if ps.ExportClientStatus {
71                 defer testutil.ExportStatusWriter(seeder, "s")()
72         }
73         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
74         // Run a Stats right after Closing the Client. This will trigger the Stats
75         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
76         defer seederTorrent.Stats()
77         defer seeder.Close()
78         seederTorrent.VerifyData()
79         // Create leecher and a Torrent.
80         leecherDataDir, err := ioutil.TempDir("", "")
81         require.NoError(t, err)
82         defer os.RemoveAll(leecherDataDir)
83         cfg = torrent.TestingConfig()
84         if ps.LeecherStorage == nil {
85                 cfg.DataDir = leecherDataDir
86         } else {
87                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
88         }
89         if ps.LeecherDownloadRateLimiter != nil {
90                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
91         }
92         cfg.Seed = false
93         //cfg.Debug = true
94         if ps.ConfigureLeecher.Config != nil {
95                 ps.ConfigureLeecher.Config(cfg)
96         }
97         leecher, err := torrent.NewClient(cfg)
98         require.NoError(t, err)
99         defer leecher.Close()
100         if ps.ConfigureLeecher.Client != nil {
101                 ps.ConfigureLeecher.Client(leecher)
102         }
103         if ps.ExportClientStatus {
104                 defer testutil.ExportStatusWriter(leecher, "l")()
105         }
106         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
107                 ret = torrent.TorrentSpecFromMetaInfo(mi)
108                 ret.ChunkSize = 2
109                 return
110         }())
111         require.NoError(t, err)
112         assert.True(t, new)
113
114         //// This was used when observing coalescing of piece state changes.
115         //logPieceStateChanges(leecherTorrent)
116
117         // Now do some things with leecher and seeder.
118         leecherTorrent.AddClientPeer(seeder)
119         // The Torrent should not be interested in obtaining peers, so the one we
120         // just added should be the only one.
121         assert.False(t, leecherTorrent.Seeding())
122         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
123         r := leecherTorrent.NewReader()
124         defer r.Close()
125         if ps.Responsive {
126                 r.SetResponsive()
127         }
128         if ps.SetReadahead {
129                 r.SetReadahead(ps.Readahead)
130         }
131         assertReadAllGreeting(t, r)
132
133         seederStats := seederTorrent.Stats()
134         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
135         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
136
137         leecherStats := leecherTorrent.Stats()
138         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
139         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
140
141         // Try reading through again for the cases where the torrent data size
142         // exceeds the size of the cache.
143         assertReadAllGreeting(t, r)
144 }
145
146 type fileCacheClientStorageFactoryParams struct {
147         Capacity    int64
148         SetCapacity bool
149         Wrapper     func(*filecache.Cache) storage.ClientImpl
150 }
151
152 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
153         return func(dataDir string) storage.ClientImpl {
154                 fc, err := filecache.NewCache(dataDir)
155                 if err != nil {
156                         panic(err)
157                 }
158                 if ps.SetCapacity {
159                         fc.SetCapacity(ps.Capacity)
160                 }
161                 return ps.Wrapper(fc)
162         }
163 }
164
165 type storageFactory func(string) storage.ClientImpl
166
167 func TestClientTransferDefault(t *testing.T) {
168         testClientTransfer(t, testClientTransferParams{
169                 ExportClientStatus: true,
170                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
171                         Wrapper: fileCachePieceResourceStorage,
172                 }),
173         })
174 }
175
176 func TestClientTransferRateLimitedUpload(t *testing.T) {
177         started := time.Now()
178         testClientTransfer(t, testClientTransferParams{
179                 // We are uploading 13 bytes (the length of the greeting torrent). The
180                 // chunks are 2 bytes in length. Then the smallest burst we can run
181                 // with is 2. Time taken is (13-burst)/rate.
182                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
183                 ExportClientStatus:      true,
184         })
185         require.True(t, time.Since(started) > time.Second)
186 }
187
188 func TestClientTransferRateLimitedDownload(t *testing.T) {
189         testClientTransfer(t, testClientTransferParams{
190                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
191         })
192 }
193
194 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
195         return storage.NewResourcePieces(fc.AsResourceProvider())
196 }
197
198 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
199         testClientTransfer(t, testClientTransferParams{
200                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
201                         SetCapacity: true,
202                         // Going below the piece length means it can't complete a piece so
203                         // that it can be hashed.
204                         Capacity: 5,
205                         Wrapper:  fileCachePieceResourceStorage,
206                 }),
207                 SetReadahead: setReadahead,
208                 // Can't readahead too far or the cache will thrash and drop data we
209                 // thought we had.
210                 Readahead:          readahead,
211                 ExportClientStatus: true,
212         })
213 }
214
215 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
216         testClientTransferSmallCache(t, true, 5)
217 }
218
219 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
220         testClientTransferSmallCache(t, true, 15)
221 }
222
223 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
224         testClientTransferSmallCache(t, false, -1)
225 }
226
227 func TestClientTransferVarious(t *testing.T) {
228         // Leecher storage
229         for _, ls := range []struct {
230                 name string
231                 f    storageFactory
232         }{
233                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
234                         Wrapper: fileCachePieceResourceStorage,
235                 })},
236                 {"Boltdb", storage.NewBoltDB},
237         } {
238                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
239                         // Seeder storage
240                         for _, ss := range []struct {
241                                 name string
242                                 f    func(string) storage.ClientImpl
243                         }{
244                                 {"File", storage.NewFile},
245                                 {"Mmap", storage.NewMMap},
246                         } {
247                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
248                                         for _, responsive := range []bool{false, true} {
249                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
250                                                         t.Run("NoReadahead", func(t *testing.T) {
251                                                                 testClientTransfer(t, testClientTransferParams{
252                                                                         Responsive:     responsive,
253                                                                         SeederStorage:  ss.f,
254                                                                         LeecherStorage: ls.f,
255                                                                 })
256                                                         })
257                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
258                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
259                                                                         testClientTransfer(t, testClientTransferParams{
260                                                                                 SeederStorage:  ss.f,
261                                                                                 Responsive:     responsive,
262                                                                                 SetReadahead:   true,
263                                                                                 Readahead:      readahead,
264                                                                                 LeecherStorage: ls.f,
265                                                                         })
266                                                                 })
267                                                         }
268                                                 })
269                                         }
270                                 })
271                         }
272                 })
273         }
274 }
275
276 // Check that after completing leeching, a leecher transitions to a seeding
277 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
278 func TestSeedAfterDownloading(t *testing.T) {
279         greetingTempDir, mi := testutil.GreetingTestTorrent()
280         defer os.RemoveAll(greetingTempDir)
281
282         cfg := torrent.TestingConfig()
283         cfg.Seed = true
284         cfg.DataDir = greetingTempDir
285         seeder, err := torrent.NewClient(cfg)
286         require.NoError(t, err)
287         defer seeder.Close()
288         defer testutil.ExportStatusWriter(seeder, "s")()
289         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
290         require.NoError(t, err)
291         assert.True(t, ok)
292         seederTorrent.VerifyData()
293
294         cfg = torrent.TestingConfig()
295         cfg.Seed = true
296         cfg.DataDir, err = ioutil.TempDir("", "")
297         require.NoError(t, err)
298         defer os.RemoveAll(cfg.DataDir)
299         leecher, err := torrent.NewClient(cfg)
300         require.NoError(t, err)
301         defer leecher.Close()
302         defer testutil.ExportStatusWriter(leecher, "l")()
303
304         cfg = torrent.TestingConfig()
305         cfg.Seed = false
306         cfg.DataDir, err = ioutil.TempDir("", "")
307         require.NoError(t, err)
308         defer os.RemoveAll(cfg.DataDir)
309         leecherLeecher, _ := torrent.NewClient(cfg)
310         require.NoError(t, err)
311         defer leecherLeecher.Close()
312         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
313         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
314                 ret = torrent.TorrentSpecFromMetaInfo(mi)
315                 ret.ChunkSize = 2
316                 return
317         }())
318         require.NoError(t, err)
319         assert.True(t, ok)
320         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
321                 ret = torrent.TorrentSpecFromMetaInfo(mi)
322                 ret.ChunkSize = 3
323                 return
324         }())
325         require.NoError(t, err)
326         assert.True(t, ok)
327         // Simultaneously DownloadAll in Leecher, and read the contents
328         // consecutively in LeecherLeecher. This non-deterministically triggered a
329         // case where the leecher wouldn't unchoke the LeecherLeecher.
330         var wg sync.WaitGroup
331         wg.Add(1)
332         go func() {
333                 defer wg.Done()
334                 r := llg.NewReader()
335                 defer r.Close()
336                 b, err := ioutil.ReadAll(r)
337                 require.NoError(t, err)
338                 assert.EqualValues(t, testutil.GreetingFileContents, b)
339         }()
340         done := make(chan struct{})
341         defer close(done)
342         go leecherGreeting.AddClientPeer(seeder)
343         go leecherGreeting.AddClientPeer(leecherLeecher)
344         wg.Add(1)
345         go func() {
346                 defer wg.Done()
347                 leecherGreeting.DownloadAll()
348                 leecher.WaitAll()
349         }()
350         wg.Wait()
351 }
352
353 type ConfigureClient struct {
354         Config func(*torrent.ClientConfig)
355         Client func(*torrent.Client)
356 }