14 "github.com/anacrolix/missinggo/v2/filecache"
15 "github.com/anacrolix/torrent"
16 "github.com/anacrolix/torrent/internal/testutil"
17 "github.com/anacrolix/torrent/storage"
18 sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
19 "github.com/frankban/quicktest"
20 "golang.org/x/time/rate"
22 "github.com/stretchr/testify/assert"
23 "github.com/stretchr/testify/require"
26 type testClientTransferParams struct {
30 LeecherStorage func(string) storage.ClientImplCloser
31 SeederStorage func(string) storage.ClientImplCloser
32 SeederUploadRateLimiter *rate.Limiter
33 LeecherDownloadRateLimiter *rate.Limiter
34 ConfigureSeeder ConfigureClient
35 ConfigureLeecher ConfigureClient
37 LeecherStartsWithoutMetadata bool
40 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
41 pos, err := r.Seek(0, io.SeekStart)
42 assert.NoError(t, err)
43 assert.EqualValues(t, 0, pos)
44 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
47 // Creates a seeder and a leecher, and ensures the data transfers when a read
48 // is attempted on the leecher.
49 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
50 greetingTempDir, mi := testutil.GreetingTestTorrent()
51 defer os.RemoveAll(greetingTempDir)
52 // Create seeder and a Torrent.
53 cfg := torrent.TestingConfig()
55 // Some test instances don't like this being on, even when there's no cache involved.
56 cfg.DropMutuallyCompletePeers = false
57 if ps.SeederUploadRateLimiter != nil {
58 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
60 // cfg.ListenAddr = "localhost:4000"
61 if ps.SeederStorage != nil {
62 storage := ps.SeederStorage(greetingTempDir)
64 cfg.DefaultStorage = storage
66 cfg.DataDir = greetingTempDir
68 if ps.ConfigureSeeder.Config != nil {
69 ps.ConfigureSeeder.Config(cfg)
71 seeder, err := torrent.NewClient(cfg)
72 require.NoError(t, err)
73 if ps.ConfigureSeeder.Client != nil {
74 ps.ConfigureSeeder.Client(seeder)
76 defer testutil.ExportStatusWriter(seeder, "s", t)()
77 seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
78 // Run a Stats right after Closing the Client. This will trigger the Stats
79 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
80 defer seederTorrent.Stats()
82 seederTorrent.VerifyData()
83 // Create leecher and a Torrent.
84 leecherDataDir, err := ioutil.TempDir("", "")
85 require.NoError(t, err)
86 defer os.RemoveAll(leecherDataDir)
87 cfg = torrent.TestingConfig()
88 // See the seeder client config comment.
89 cfg.DropMutuallyCompletePeers = false
90 if ps.LeecherStorage == nil {
91 cfg.DataDir = leecherDataDir
93 storage := ps.LeecherStorage(leecherDataDir)
95 cfg.DefaultStorage = storage
97 if ps.LeecherDownloadRateLimiter != nil {
98 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
101 if ps.ConfigureLeecher.Config != nil {
102 ps.ConfigureLeecher.Config(cfg)
104 leecher, err := torrent.NewClient(cfg)
105 require.NoError(t, err)
106 defer leecher.Close()
107 if ps.ConfigureLeecher.Client != nil {
108 ps.ConfigureLeecher.Client(leecher)
110 defer testutil.ExportStatusWriter(leecher, "l", t)()
111 leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
112 ret = torrent.TorrentSpecFromMetaInfo(mi)
114 if ps.LeecherStartsWithoutMetadata {
119 require.NoError(t, err)
122 //// This was used when observing coalescing of piece state changes.
123 //logPieceStateChanges(leecherTorrent)
125 // Now do some things with leecher and seeder.
126 added := leecherTorrent.AddClientPeer(seeder)
127 assert.False(t, leecherTorrent.Seeding())
128 // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
129 // should be sitting idle until we demand data.
130 if !ps.LeecherStartsWithoutMetadata {
131 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
133 if ps.LeecherStartsWithoutMetadata {
134 <-leecherTorrent.GotInfo()
136 r := leecherTorrent.NewReader()
138 go leecherTorrent.SetInfoBytes(mi.InfoBytes)
143 r.SetReadahead(ps.Readahead)
145 assertReadAllGreeting(t, r)
146 assert.NotEmpty(t, seederTorrent.PeerConns())
147 leecherPeerConns := leecherTorrent.PeerConns()
148 if cfg.DropMutuallyCompletePeers {
149 // I don't think we can assume it will be empty already, due to timing.
150 //assert.Empty(t, leecherPeerConns)
152 assert.NotEmpty(t, leecherPeerConns)
155 for _, pc := range leecherPeerConns {
156 completed := pc.PeerPieces().Len()
157 t.Logf("peer conn %v has %v completed pieces", pc, completed)
158 if completed == leecherTorrent.Info().NumPieces() {
163 t.Errorf("didn't find seeder amongst leecher peer conns")
166 seederStats := seederTorrent.Stats()
167 assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
168 assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
170 leecherStats := leecherTorrent.Stats()
171 assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
172 assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
174 // Try reading through again for the cases where the torrent data size
175 // exceeds the size of the cache.
176 assertReadAllGreeting(t, r)
179 type fileCacheClientStorageFactoryParams struct {
182 Wrapper func(*filecache.Cache) storage.ClientImplCloser
185 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
186 return func(dataDir string) storage.ClientImplCloser {
187 fc, err := filecache.NewCache(dataDir)
192 fc.SetCapacity(ps.Capacity)
194 return ps.Wrapper(fc)
198 type storageFactory func(string) storage.ClientImplCloser
200 func TestClientTransferDefault(t *testing.T) {
201 testClientTransfer(t, testClientTransferParams{
202 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
203 Wrapper: fileCachePieceResourceStorage,
208 func TestClientTransferDefaultNoMetadata(t *testing.T) {
209 testClientTransfer(t, testClientTransferParams{
210 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
211 Wrapper: fileCachePieceResourceStorage,
213 LeecherStartsWithoutMetadata: true,
217 func TestClientTransferRateLimitedUpload(t *testing.T) {
218 started := time.Now()
219 testClientTransfer(t, testClientTransferParams{
220 // We are uploading 13 bytes (the length of the greeting torrent). The
221 // chunks are 2 bytes in length. Then the smallest burst we can run
222 // with is 2. Time taken is (13-burst)/rate.
223 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
225 require.True(t, time.Since(started) > time.Second)
228 func TestClientTransferRateLimitedDownload(t *testing.T) {
229 testClientTransfer(t, testClientTransferParams{
230 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
234 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
239 storage.NewResourcePieces(fc.AsResourceProvider()),
240 ioutil.NopCloser(nil),
244 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
245 testClientTransfer(t, testClientTransferParams{
246 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
248 // Going below the piece length means it can't complete a piece so
249 // that it can be hashed.
251 Wrapper: fileCachePieceResourceStorage,
253 SetReadahead: setReadahead,
254 // Can't readahead too far or the cache will thrash and drop data we
256 Readahead: readahead,
258 // These tests don't work well with more than 1 connection to the seeder.
259 ConfigureLeecher: ConfigureClient{
260 Config: func(cfg *torrent.ClientConfig) {
261 cfg.DropDuplicatePeerIds = true
262 //cfg.DisableIPv6 = true
263 //cfg.DisableUTP = true
269 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
270 testClientTransferSmallCache(t, true, 5)
273 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
274 testClientTransferSmallCache(t, true, 15)
277 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
278 testClientTransferSmallCache(t, false, -1)
281 func sqliteClientStorageFactory(optsMaker func(dataDir string) sqliteStorage.NewPiecesStorageOpts) storageFactory {
282 return func(dataDir string) storage.ClientImplCloser {
283 opts := optsMaker(dataDir)
284 //log.Printf("opening sqlite db: %#v", opts)
285 ret, err := sqliteStorage.NewPiecesStorage(opts)
293 func TestClientTransferVarious(t *testing.T) {
295 for _, ls := range []struct {
299 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
300 Wrapper: fileCachePieceResourceStorage,
302 {"Boltdb", storage.NewBoltDB},
303 {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
304 return sqliteStorage.NewPiecesStorageOpts{
305 NewPoolOpts: sqliteStorage.NewPoolOpts{
306 Path: filepath.Join(dataDir, "sqlite.db"),
310 {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
311 return sqliteStorage.NewPiecesStorageOpts{
312 NewPoolOpts: sqliteStorage.NewPoolOpts{
318 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
320 for _, ss := range []struct {
324 {"File", storage.NewFile},
325 {"Mmap", storage.NewMMap},
327 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
328 for _, responsive := range []bool{false, true} {
329 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
330 t.Run("NoReadahead", func(t *testing.T) {
331 testClientTransfer(t, testClientTransferParams{
332 Responsive: responsive,
334 LeecherStorage: ls.f,
337 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
338 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
339 testClientTransfer(t, testClientTransferParams{
341 Responsive: responsive,
343 Readahead: readahead,
344 LeecherStorage: ls.f,
356 // Check that after completing leeching, a leecher transitions to a seeding
357 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
358 func TestSeedAfterDownloading(t *testing.T) {
359 greetingTempDir, mi := testutil.GreetingTestTorrent()
360 defer os.RemoveAll(greetingTempDir)
362 cfg := torrent.TestingConfig()
364 cfg.DataDir = greetingTempDir
365 seeder, err := torrent.NewClient(cfg)
366 require.NoError(t, err)
368 defer testutil.ExportStatusWriter(seeder, "s", t)()
369 seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
370 require.NoError(t, err)
372 seederTorrent.VerifyData()
374 cfg = torrent.TestingConfig()
376 cfg.DataDir, err = ioutil.TempDir("", "")
377 require.NoError(t, err)
378 defer os.RemoveAll(cfg.DataDir)
379 leecher, err := torrent.NewClient(cfg)
380 require.NoError(t, err)
381 defer leecher.Close()
382 defer testutil.ExportStatusWriter(leecher, "l", t)()
384 cfg = torrent.TestingConfig()
386 cfg.DataDir, err = ioutil.TempDir("", "")
387 require.NoError(t, err)
388 defer os.RemoveAll(cfg.DataDir)
389 leecherLeecher, _ := torrent.NewClient(cfg)
390 require.NoError(t, err)
391 defer leecherLeecher.Close()
392 defer testutil.ExportStatusWriter(leecherLeecher, "ll", t)()
393 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
394 ret = torrent.TorrentSpecFromMetaInfo(mi)
398 require.NoError(t, err)
400 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
401 ret = torrent.TorrentSpecFromMetaInfo(mi)
405 require.NoError(t, err)
407 // Simultaneously DownloadAll in Leecher, and read the contents
408 // consecutively in LeecherLeecher. This non-deterministically triggered a
409 // case where the leecher wouldn't unchoke the LeecherLeecher.
410 var wg sync.WaitGroup
416 quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
418 done := make(chan struct{})
420 go leecherGreeting.AddClientPeer(seeder)
421 go leecherGreeting.AddClientPeer(leecherLeecher)
425 leecherGreeting.DownloadAll()
431 type ConfigureClient struct {
432 Config func(*torrent.ClientConfig)
433 Client func(*torrent.Client)