14 "github.com/anacrolix/missinggo/v2/filecache"
15 "github.com/anacrolix/torrent"
16 "github.com/anacrolix/torrent/internal/testutil"
17 "github.com/anacrolix/torrent/storage"
18 sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
19 "golang.org/x/time/rate"
21 "github.com/stretchr/testify/assert"
22 "github.com/stretchr/testify/require"
25 type testClientTransferParams struct {
29 LeecherStorage func(string) storage.ClientImplCloser
30 SeederStorage func(string) storage.ClientImplCloser
31 SeederUploadRateLimiter *rate.Limiter
32 LeecherDownloadRateLimiter *rate.Limiter
33 ConfigureSeeder ConfigureClient
34 ConfigureLeecher ConfigureClient
36 LeecherStartsWithoutMetadata bool
39 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
40 pos, err := r.Seek(0, io.SeekStart)
41 assert.NoError(t, err)
42 assert.EqualValues(t, 0, pos)
43 _greeting, err := ioutil.ReadAll(r)
44 assert.NoError(t, err)
45 assert.EqualValues(t, testutil.GreetingFileContents, string(_greeting))
48 // Creates a seeder and a leecher, and ensures the data transfers when a read
49 // is attempted on the leecher.
50 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
51 greetingTempDir, mi := testutil.GreetingTestTorrent()
52 defer os.RemoveAll(greetingTempDir)
53 // Create seeder and a Torrent.
54 cfg := torrent.TestingConfig()
56 if ps.SeederUploadRateLimiter != nil {
57 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
59 // cfg.ListenAddr = "localhost:4000"
60 if ps.SeederStorage != nil {
61 storage := ps.SeederStorage(greetingTempDir)
63 cfg.DefaultStorage = storage
65 cfg.DataDir = greetingTempDir
67 if ps.ConfigureSeeder.Config != nil {
68 ps.ConfigureSeeder.Config(cfg)
70 seeder, err := torrent.NewClient(cfg)
71 require.NoError(t, err)
72 if ps.ConfigureSeeder.Client != nil {
73 ps.ConfigureSeeder.Client(seeder)
75 defer testutil.ExportStatusWriter(seeder, "s", t)()
76 seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
77 // Run a Stats right after Closing the Client. This will trigger the Stats
78 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
79 defer seederTorrent.Stats()
81 seederTorrent.VerifyData()
82 // Create leecher and a Torrent.
83 leecherDataDir, err := ioutil.TempDir("", "")
84 require.NoError(t, err)
85 defer os.RemoveAll(leecherDataDir)
86 cfg = torrent.TestingConfig()
87 if ps.LeecherStorage == nil {
88 cfg.DataDir = leecherDataDir
90 storage := ps.LeecherStorage(leecherDataDir)
92 cfg.DefaultStorage = storage
94 if ps.LeecherDownloadRateLimiter != nil {
95 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
98 if ps.ConfigureLeecher.Config != nil {
99 ps.ConfigureLeecher.Config(cfg)
101 leecher, err := torrent.NewClient(cfg)
102 require.NoError(t, err)
103 defer leecher.Close()
104 if ps.ConfigureLeecher.Client != nil {
105 ps.ConfigureLeecher.Client(leecher)
107 defer testutil.ExportStatusWriter(leecher, "l", t)()
108 leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
109 ret = torrent.TorrentSpecFromMetaInfo(mi)
111 if ps.LeecherStartsWithoutMetadata {
116 require.NoError(t, err)
119 //// This was used when observing coalescing of piece state changes.
120 //logPieceStateChanges(leecherTorrent)
122 // Now do some things with leecher and seeder.
123 added := leecherTorrent.AddClientPeer(seeder)
124 assert.False(t, leecherTorrent.Seeding())
125 // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
126 // should be sitting idle until we demand data.
127 if !ps.LeecherStartsWithoutMetadata {
128 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
130 if ps.LeecherStartsWithoutMetadata {
131 <-leecherTorrent.GotInfo()
133 r := leecherTorrent.NewReader()
135 go leecherTorrent.SetInfoBytes(mi.InfoBytes)
140 r.SetReadahead(ps.Readahead)
142 assertReadAllGreeting(t, r)
143 assert.NotEmpty(t, seederTorrent.PeerConns())
144 leecherPeerConns := leecherTorrent.PeerConns()
145 assert.NotEmpty(t, leecherPeerConns)
147 for _, pc := range leecherPeerConns {
148 completed := pc.PeerPieces().Len()
149 t.Logf("peer conn %v has %v completed pieces", pc, completed)
150 if completed == leecherTorrent.Info().NumPieces() {
155 t.Errorf("didn't find seeder amongst leecher peer conns")
158 seederStats := seederTorrent.Stats()
159 assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
160 assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
162 leecherStats := leecherTorrent.Stats()
163 assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
164 assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
166 // Try reading through again for the cases where the torrent data size
167 // exceeds the size of the cache.
168 assertReadAllGreeting(t, r)
171 type fileCacheClientStorageFactoryParams struct {
174 Wrapper func(*filecache.Cache) storage.ClientImplCloser
177 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
178 return func(dataDir string) storage.ClientImplCloser {
179 fc, err := filecache.NewCache(dataDir)
184 fc.SetCapacity(ps.Capacity)
186 return ps.Wrapper(fc)
190 type storageFactory func(string) storage.ClientImplCloser
192 func TestClientTransferDefault(t *testing.T) {
193 testClientTransfer(t, testClientTransferParams{
194 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
195 Wrapper: fileCachePieceResourceStorage,
200 func TestClientTransferDefaultNoMetadata(t *testing.T) {
201 testClientTransfer(t, testClientTransferParams{
202 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
203 Wrapper: fileCachePieceResourceStorage,
205 LeecherStartsWithoutMetadata: true,
209 func TestClientTransferRateLimitedUpload(t *testing.T) {
210 started := time.Now()
211 testClientTransfer(t, testClientTransferParams{
212 // We are uploading 13 bytes (the length of the greeting torrent). The
213 // chunks are 2 bytes in length. Then the smallest burst we can run
214 // with is 2. Time taken is (13-burst)/rate.
215 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
217 require.True(t, time.Since(started) > time.Second)
220 func TestClientTransferRateLimitedDownload(t *testing.T) {
221 testClientTransfer(t, testClientTransferParams{
222 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
226 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
231 storage.NewResourcePieces(fc.AsResourceProvider()),
232 ioutil.NopCloser(nil),
236 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
237 testClientTransfer(t, testClientTransferParams{
238 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
240 // Going below the piece length means it can't complete a piece so
241 // that it can be hashed.
243 Wrapper: fileCachePieceResourceStorage,
245 SetReadahead: setReadahead,
246 // Can't readahead too far or the cache will thrash and drop data we
248 Readahead: readahead,
250 // These tests don't work well with more than 1 connection to the seeder.
251 ConfigureLeecher: ConfigureClient{
252 Config: func(cfg *torrent.ClientConfig) {
253 cfg.DropDuplicatePeerIds = true
254 //cfg.DisableIPv6 = true
255 //cfg.DisableUTP = true
261 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
262 testClientTransferSmallCache(t, true, 5)
265 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
266 testClientTransferSmallCache(t, true, 15)
269 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
270 testClientTransferSmallCache(t, false, -1)
273 func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
274 return func(dataDir string) storage.ClientImplCloser {
275 connOpts := connOptsMaker(dataDir)
276 log.Printf("opening sqlite db: %#v", connOpts)
277 ret, err := sqliteStorage.NewPiecesStorage(connOpts)
285 func TestClientTransferVarious(t *testing.T) {
287 for _, ls := range []struct {
291 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
292 Wrapper: fileCachePieceResourceStorage,
294 {"Boltdb", storage.NewBoltDB},
295 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
296 return sqliteStorage.NewPoolOpts{
297 Path: filepath.Join(dataDir, "sqlite.db"),
300 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
301 return sqliteStorage.NewPoolOpts{
306 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
308 for _, ss := range []struct {
312 {"File", storage.NewFile},
313 {"Mmap", storage.NewMMap},
315 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
316 for _, responsive := range []bool{false, true} {
317 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
318 t.Run("NoReadahead", func(t *testing.T) {
319 testClientTransfer(t, testClientTransferParams{
320 Responsive: responsive,
322 LeecherStorage: ls.f,
325 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
326 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
327 testClientTransfer(t, testClientTransferParams{
329 Responsive: responsive,
331 Readahead: readahead,
332 LeecherStorage: ls.f,
344 // Check that after completing leeching, a leecher transitions to a seeding
345 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
346 func TestSeedAfterDownloading(t *testing.T) {
347 greetingTempDir, mi := testutil.GreetingTestTorrent()
348 defer os.RemoveAll(greetingTempDir)
350 cfg := torrent.TestingConfig()
352 cfg.DataDir = greetingTempDir
353 seeder, err := torrent.NewClient(cfg)
354 require.NoError(t, err)
356 defer testutil.ExportStatusWriter(seeder, "s", t)()
357 seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
358 require.NoError(t, err)
360 seederTorrent.VerifyData()
362 cfg = torrent.TestingConfig()
364 cfg.DataDir, err = ioutil.TempDir("", "")
365 require.NoError(t, err)
366 defer os.RemoveAll(cfg.DataDir)
367 leecher, err := torrent.NewClient(cfg)
368 require.NoError(t, err)
369 defer leecher.Close()
370 defer testutil.ExportStatusWriter(leecher, "l", t)()
372 cfg = torrent.TestingConfig()
374 cfg.DataDir, err = ioutil.TempDir("", "")
375 require.NoError(t, err)
376 defer os.RemoveAll(cfg.DataDir)
377 leecherLeecher, _ := torrent.NewClient(cfg)
378 require.NoError(t, err)
379 defer leecherLeecher.Close()
380 defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
381 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
382 ret = torrent.TorrentSpecFromMetaInfo(mi)
386 require.NoError(t, err)
388 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
389 ret = torrent.TorrentSpecFromMetaInfo(mi)
393 require.NoError(t, err)
395 // Simultaneously DownloadAll in Leecher, and read the contents
396 // consecutively in LeecherLeecher. This non-deterministically triggered a
397 // case where the leecher wouldn't unchoke the LeecherLeecher.
398 var wg sync.WaitGroup
404 b, err := ioutil.ReadAll(r)
405 require.NoError(t, err)
406 assert.EqualValues(t, testutil.GreetingFileContents, b)
408 done := make(chan struct{})
410 go leecherGreeting.AddClientPeer(seeder)
411 go leecherGreeting.AddClientPeer(leecherLeecher)
415 leecherGreeting.DownloadAll()
421 type ConfigureClient struct {
422 Config func(*torrent.ClientConfig)
423 Client func(*torrent.Client)