12 "github.com/anacrolix/missinggo/v2/filecache"
13 "github.com/anacrolix/torrent"
14 "github.com/anacrolix/torrent/internal/testutil"
15 "github.com/anacrolix/torrent/storage"
16 "golang.org/x/time/rate"
18 "github.com/stretchr/testify/assert"
19 "github.com/stretchr/testify/require"
22 type testClientTransferParams struct {
26 ExportClientStatus bool
27 LeecherStorage func(string) storage.ClientImpl
28 SeederStorage func(string) storage.ClientImpl
29 SeederUploadRateLimiter *rate.Limiter
30 LeecherDownloadRateLimiter *rate.Limiter
33 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
34 pos, err := r.Seek(0, io.SeekStart)
35 assert.NoError(t, err)
36 assert.EqualValues(t, 0, pos)
37 _greeting, err := ioutil.ReadAll(r)
38 assert.NoError(t, err)
39 assert.EqualValues(t, testutil.GreetingFileContents, _greeting)
42 // Creates a seeder and a leecher, and ensures the data transfers when a read
43 // is attempted on the leecher.
44 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
45 greetingTempDir, mi := testutil.GreetingTestTorrent()
46 defer os.RemoveAll(greetingTempDir)
47 // Create seeder and a Torrent.
48 cfg := torrent.TestingConfig()
50 if ps.SeederUploadRateLimiter != nil {
51 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
53 // cfg.ListenAddr = "localhost:4000"
54 if ps.SeederStorage != nil {
55 cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
56 defer cfg.DefaultStorage.Close()
58 cfg.DataDir = greetingTempDir
60 seeder, err := torrent.NewClient(cfg)
61 require.NoError(t, err)
62 if ps.ExportClientStatus {
63 defer testutil.ExportStatusWriter(seeder, "s")()
65 seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
66 // Run a Stats right after Closing the Client. This will trigger the Stats
67 // panic in #214 caused by RemoteAddr on Closed uTP sockets.
68 defer seederTorrent.Stats()
70 seederTorrent.VerifyData()
71 // Create leecher and a Torrent.
72 leecherDataDir, err := ioutil.TempDir("", "")
73 require.NoError(t, err)
74 defer os.RemoveAll(leecherDataDir)
75 cfg = torrent.TestingConfig()
76 if ps.LeecherStorage == nil {
77 cfg.DataDir = leecherDataDir
79 cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
81 if ps.LeecherDownloadRateLimiter != nil {
82 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
86 leecher, err := torrent.NewClient(cfg)
87 require.NoError(t, err)
89 if ps.ExportClientStatus {
90 defer testutil.ExportStatusWriter(leecher, "l")()
92 leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
93 ret = torrent.TorrentSpecFromMetaInfo(mi)
97 require.NoError(t, err)
100 //// This was used when observing coalescing of piece state changes.
101 //logPieceStateChanges(leecherTorrent)
103 // Now do some things with leecher and seeder.
104 leecherTorrent.AddClientPeer(seeder)
105 // The Torrent should not be interested in obtaining peers, so the one we
106 // just added should be the only one.
107 assert.False(t, leecherTorrent.Seeding())
108 assert.EqualValues(t, 1, leecherTorrent.Stats().PendingPeers)
109 r := leecherTorrent.NewReader()
115 r.SetReadahead(ps.Readahead)
117 assertReadAllGreeting(t, r)
119 seederStats := seederTorrent.Stats()
120 assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
121 assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
123 leecherStats := leecherTorrent.Stats()
124 assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
125 assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
127 // Try reading through again for the cases where the torrent data size
128 // exceeds the size of the cache.
129 assertReadAllGreeting(t, r)
132 type fileCacheClientStorageFactoryParams struct {
135 Wrapper func(*filecache.Cache) storage.ClientImpl
138 func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
139 return func(dataDir string) storage.ClientImpl {
140 fc, err := filecache.NewCache(dataDir)
145 fc.SetCapacity(ps.Capacity)
147 return ps.Wrapper(fc)
151 type storageFactory func(string) storage.ClientImpl
153 func TestClientTransferDefault(t *testing.T) {
154 testClientTransfer(t, testClientTransferParams{
155 ExportClientStatus: true,
156 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
157 Wrapper: fileCachePieceResourceStorage,
162 func TestClientTransferRateLimitedUpload(t *testing.T) {
163 started := time.Now()
164 testClientTransfer(t, testClientTransferParams{
165 // We are uploading 13 bytes (the length of the greeting torrent). The
166 // chunks are 2 bytes in length. Then the smallest burst we can run
167 // with is 2. Time taken is (13-burst)/rate.
168 SeederUploadRateLimiter: rate.NewLimiter(11, 2),
169 ExportClientStatus: true,
171 require.True(t, time.Since(started) > time.Second)
174 func TestClientTransferRateLimitedDownload(t *testing.T) {
175 testClientTransfer(t, testClientTransferParams{
176 LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
180 func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
181 return storage.NewResourcePieces(fc.AsResourceProvider())
184 func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
185 testClientTransfer(t, testClientTransferParams{
186 LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
188 // Going below the piece length means it can't complete a piece so
189 // that it can be hashed.
191 Wrapper: fileCachePieceResourceStorage,
193 SetReadahead: setReadahead,
194 // Can't readahead too far or the cache will thrash and drop data we
196 Readahead: readahead,
197 ExportClientStatus: true,
201 func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
202 testClientTransferSmallCache(t, true, 5)
205 func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
206 testClientTransferSmallCache(t, true, 15)
209 func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
210 testClientTransferSmallCache(t, false, -1)
213 func TestClientTransferVarious(t *testing.T) {
215 for _, ls := range []struct {
219 {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
220 Wrapper: fileCachePieceResourceStorage,
222 {"Boltdb", storage.NewBoltDB},
224 t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
226 for _, ss := range []struct {
228 f func(string) storage.ClientImpl
230 {"File", storage.NewFile},
231 {"Mmap", storage.NewMMap},
233 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
234 for _, responsive := range []bool{false, true} {
235 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
236 t.Run("NoReadahead", func(t *testing.T) {
237 testClientTransfer(t, testClientTransferParams{
238 Responsive: responsive,
240 LeecherStorage: ls.f,
243 for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
244 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
245 testClientTransfer(t, testClientTransferParams{
247 Responsive: responsive,
249 Readahead: readahead,
250 LeecherStorage: ls.f,
262 // Check that after completing leeching, a leecher transitions to a seeding
263 // correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
264 func TestSeedAfterDownloading(t *testing.T) {
265 greetingTempDir, mi := testutil.GreetingTestTorrent()
266 defer os.RemoveAll(greetingTempDir)
268 cfg := torrent.TestingConfig()
270 cfg.DataDir = greetingTempDir
271 seeder, err := torrent.NewClient(cfg)
272 require.NoError(t, err)
274 defer testutil.ExportStatusWriter(seeder, "s")()
275 seederTorrent, ok, err := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
276 require.NoError(t, err)
278 seederTorrent.VerifyData()
280 cfg = torrent.TestingConfig()
282 cfg.DataDir, err = ioutil.TempDir("", "")
283 require.NoError(t, err)
284 defer os.RemoveAll(cfg.DataDir)
285 leecher, err := torrent.NewClient(cfg)
286 require.NoError(t, err)
287 defer leecher.Close()
288 defer testutil.ExportStatusWriter(leecher, "l")()
290 cfg = torrent.TestingConfig()
292 cfg.DataDir, err = ioutil.TempDir("", "")
293 require.NoError(t, err)
294 defer os.RemoveAll(cfg.DataDir)
295 leecherLeecher, _ := torrent.NewClient(cfg)
296 require.NoError(t, err)
297 defer leecherLeecher.Close()
298 defer testutil.ExportStatusWriter(leecherLeecher, "ll")()
299 leecherGreeting, ok, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
300 ret = torrent.TorrentSpecFromMetaInfo(mi)
304 require.NoError(t, err)
306 llg, ok, err := leecherLeecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
307 ret = torrent.TorrentSpecFromMetaInfo(mi)
311 require.NoError(t, err)
313 // Simultaneously DownloadAll in Leecher, and read the contents
314 // consecutively in LeecherLeecher. This non-deterministically triggered a
315 // case where the leecher wouldn't unchoke the LeecherLeecher.
316 var wg sync.WaitGroup
322 b, err := ioutil.ReadAll(r)
323 require.NoError(t, err)
324 assert.EqualValues(t, testutil.GreetingFileContents, b)
326 done := make(chan struct{})
328 go leecherGreeting.AddClientPeer(seeder)
329 go leecherGreeting.AddClientPeer(leecherLeecher)
333 leecherGreeting.DownloadAll()