]> Sergey Matveev's repositories - btrtrc.git/blob - test/transfer_test.go
Extract the transfer tests
[btrtrc.git] / test / transfer_test.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "os"
8         "sync"
9         "testing"
10         "time"
11
12         "github.com/anacrolix/missinggo/v2/filecache"
13         "github.com/anacrolix/torrent"
14         "github.com/anacrolix/torrent/internal/testutil"
15         "github.com/anacrolix/torrent/storage"
16         "golang.org/x/time/rate"
17
18         "github.com/stretchr/testify/assert"
19         "github.com/stretchr/testify/require"
20 )
21
22 type testClientTransferParams struct {
23         Responsive                 bool
24         Readahead                  int64
25         SetReadahead               bool
26         ExportClientStatus         bool
27         LeecherStorage             func(string) storage.ClientImpl
28         SeederStorage              func(string) storage.ClientImpl
29         SeederUploadRateLimiter    *rate.Limiter
30         LeecherDownloadRateLimiter *rate.Limiter
31 }
32
33 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
34         pos, err := r.Seek(0, io.SeekStart)
35         assert.NoError(t, err)
36         assert.EqualValues(t, 0, pos)
37         _greeting, err := ioutil.ReadAll(r)
38         assert.NoError(t, err)
39         assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
40 }
41
42 // Creates a seeder and a leecher, and ensures the data transfers when a read
43 // is attempted on the leecher.
44 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
45         greetingTempDir, mi := testutil.GreetingTestTorrent()
46         defer os.RemoveAll(greetingTempDir)
47         // Create seeder and a Torrent.
48         cfg := torrent.TestingConfig()
49         cfg.Seed = true
50         if ps.SeederUploadRateLimiter != nil {
51                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
52         }
53         // cfg.ListenAddr = "localhost:4000"
54         if ps.SeederStorage != nil {
55                 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
56                 defer cfg.DefaultStorage.Close()
57         } else {
58                 cfg.DataDir = greetingTempDir
59         }
60         seeder, err := torrent.NewClient(cfg)
61         require.NoError(t, err)
62         if ps.ExportClientStatus {
63                 defer testutil.ExportStatusWriter(seeder, "s")()
64         }
65         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
66         // Run a Stats right after Closing the Client. This will trigger the Stats
67         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
68         defer seederTorrent.Stats()
69         defer seeder.Close()
70         seederTorrent.VerifyData()
71         // Create leecher and a Torrent.
72         leecherDataDir, err := ioutil.TempDir("", "")
73         require.NoError(t, err)
74         defer os.RemoveAll(leecherDataDir)
75         cfg = torrent.TestingConfig()
76         if ps.LeecherStorage == nil {
77                 cfg.DataDir = leecherDataDir
78         } else {
79                 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
80         }
81         if ps.LeecherDownloadRateLimiter != nil {
82                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
83         }
84         cfg.Seed = false
85         //cfg.Debug = true
86         leecher, err := torrent.NewClient(cfg)
87         require.NoError(t, err)
88         defer leecher.Close()
89         if ps.ExportClientStatus {
90                 defer testutil.ExportStatusWriter(leecher, "l")()
91         }
92         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
93                 ret = torrent.TorrentSpecFromMetaInfo(mi)
94                 ret.ChunkSize = 2
95                 return
96         }())
97         require.NoError(t, err)
98         assert.True(t, new)
99
100         //// This was used when observing coalescing of piece state changes.
101         //logPieceStateChanges(leecherTorrent)
102
103         // Now do some things with leecher and seeder.
104         leecherTorrent.AddClientPeer(seeder)
105         // The Torrent should not be interested in obtaining peers, so the one we
106         // just added should be the only one.
107         assert.False(t, leecherTorrent.Seeding())
108         assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
109         r := leecherTorrent.NewReader()
110         defer r.Close()
111         if ps.Responsive {
112                 r.SetResponsive()
113         }
114         if ps.SetReadahead {
115                 r.SetReadahead(ps.Readahead)
116         }
117         assertReadAllGreeting(t, r)
118
119         seederStats := seederTorrent.Stats()
120         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
121         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
122
123         leecherStats := leecherTorrent.Stats()
124         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
125         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
126
127         // Try reading through again for the cases where the torrent data size
128         // exceeds the size of the cache.
129         assertReadAllGreeting(t, r)
130 }
131
132 type fileCacheClientStorageFactoryParams struct {
133         Capacity    int64
134         SetCapacity bool
135         Wrapper     func(*filecache.Cache) storage.ClientImpl
136 }
137
138 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
139         return func(dataDir string) storage.ClientImpl {
140                 fc, err := filecache.NewCache(dataDir)
141                 if err != nil {
142                         panic(err)
143                 }
144                 if ps.SetCapacity {
145                         fc.SetCapacity(ps.Capacity)
146                 }
147                 return ps.Wrapper(fc)
148         }
149 }
150
151 type storageFactory func(string) storage.ClientImpl
152
153 func TestClientTransferDefault(t *testing.T) {
154         testClientTransfer(t, testClientTransferParams{
155                 ExportClientStatus: true,
156                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
157                         Wrapper: fileCachePieceResourceStorage,
158                 }),
159         })
160 }
161
162 func TestClientTransferRateLimitedUpload(t *testing.T) {
163         started := time.Now()
164         testClientTransfer(t, testClientTransferParams{
165                 // We are uploading 13 bytes (the length of the greeting torrent). The
166                 // chunks are 2 bytes in length. Then the smallest burst we can run
167                 // with is 2. Time taken is (13-burst)/rate.
168                 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
169                 ExportClientStatus:      true,
170         })
171         require.True(t, time.Since(started) > time.Second)
172 }
173
174 func TestClientTransferRateLimitedDownload(t *testing.T) {
175         testClientTransfer(t, testClientTransferParams{
176                 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
177         })
178 }
179
180 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
181         return storage.NewResourcePieces(fc.AsResourceProvider())
182 }
183
184 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
185         testClientTransfer(t, testClientTransferParams{
186                 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
187                         SetCapacity: true,
188                         // Going below the piece length means it can't complete a piece so
189                         // that it can be hashed.
190                         Capacity: 5,
191                         Wrapper:  fileCachePieceResourceStorage,
192                 }),
193                 SetReadahead: setReadahead,
194                 // Can't readahead too far or the cache will thrash and drop data we
195                 // thought we had.
196                 Readahead:          readahead,
197                 ExportClientStatus: true,
198         })
199 }
200
201 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
202         testClientTransferSmallCache(t, true, 5)
203 }
204
205 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
206         testClientTransferSmallCache(t, true, 15)
207 }
208
209 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
210         testClientTransferSmallCache(t, false, -1)
211 }
212
213 func TestClientTransferVarious(t *testing.T) {
214         // Leecher storage
215         for _, ls := range []struct {
216                 name string
217                 f    storageFactory
218         }{
219                 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
220                         Wrapper: fileCachePieceResourceStorage,
221                 })},
222                 {"Boltdb", storage.NewBoltDB},
223         } {
224                 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
225                         // Seeder storage
226                         for _, ss := range []struct {
227                                 name string
228                                 f    func(string) storage.ClientImpl
229                         }{
230                                 {"File", storage.NewFile},
231                                 {"Mmap", storage.NewMMap},
232                         } {
233                                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
234                                         for _, responsive := range []bool{false, true} {
235                                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
236                                                         t.Run("NoReadahead", func(t *testing.T) {
237                                                                 testClientTransfer(t, testClientTransferParams{
238                                                                         Responsive:     responsive,
239                                                                         SeederStorage:  ss.f,
240                                                                         LeecherStorage: ls.f,
241                                                                 })
242                                                         })
243                                                         for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
244                                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
245                                                                         testClientTransfer(t, testClientTransferParams{
246                                                                                 SeederStorage:  ss.f,
247                                                                                 Responsive:     responsive,
248                                                                                 SetReadahead:   true,
249                                                                                 Readahead:      readahead,
250                                                                                 LeecherStorage: ls.f,
251                                                                         })
252                                                                 })
253                                                         }
254                                                 })
255                                         }
256                                 })
257                         }
258                 })
259         }
260 }
261
262 // Check that after completing leeching, a leecher transitions to a seeding
263 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
264 func TestSeedAfterDownloading(t *testing.T) {
265         greetingTempDir, mi := testutil.GreetingTestTorrent()
266         defer os.RemoveAll(greetingTempDir)
267
268         cfg := torrent.TestingConfig()
269         cfg.Seed = true
270         cfg.DataDir = greetingTempDir
271         seeder, err := torrent.NewClient(cfg)
272         require.NoError(t, err)
273         defer seeder.Close()
274         defer testutil.ExportStatusWriter(seeder, "s")()
275         seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
276         require.NoError(t, err)
277         assert.True(t, ok)
278         seederTorrent.VerifyData()
279
280         cfg = torrent.TestingConfig()
281         cfg.Seed = true
282         cfg.DataDir, err = ioutil.TempDir("", "")
283         require.NoError(t, err)
284         defer os.RemoveAll(cfg.DataDir)
285         leecher, err := torrent.NewClient(cfg)
286         require.NoError(t, err)
287         defer leecher.Close()
288         defer testutil.ExportStatusWriter(leecher, "l")()
289
290         cfg = torrent.TestingConfig()
291         cfg.Seed = false
292         cfg.DataDir, err = ioutil.TempDir("", "")
293         require.NoError(t, err)
294         defer os.RemoveAll(cfg.DataDir)
295         leecherLeecher, _ := torrent.NewClient(cfg)
296         require.NoError(t, err)
297         defer leecherLeecher.Close()
298         defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
299         leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
300                 ret = torrent.TorrentSpecFromMetaInfo(mi)
301                 ret.ChunkSize = 2
302                 return
303         }())
304         require.NoError(t, err)
305         assert.True(t, ok)
306         llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
307                 ret = torrent.TorrentSpecFromMetaInfo(mi)
308                 ret.ChunkSize = 3
309                 return
310         }())
311         require.NoError(t, err)
312         assert.True(t, ok)
313         // Simultaneously DownloadAll in Leecher, and read the contents
314         // consecutively in LeecherLeecher. This non-deterministically triggered a
315         // case where the leecher wouldn't unchoke the LeecherLeecher.
316         var wg sync.WaitGroup
317         wg.Add(1)
318         go func() {
319                 defer wg.Done()
320                 r := llg.NewReader()
321                 defer r.Close()
322                 b, err := ioutil.ReadAll(r)
323                 require.NoError(t, err)
324                 assert.EqualValues(t, testutil.GreetingFileContents, b)
325         }()
326         done := make(chan struct{})
327         defer close(done)
328         go leecherGreeting.AddClientPeer(seeder)
329         go leecherGreeting.AddClientPeer(leecherLeecher)
330         wg.Add(1)
331         go func() {
332                 defer wg.Done()
333                 leecherGreeting.DownloadAll()
334                 leecher.WaitAll()
335         }()
336         wg.Wait()
337 }