]> Sergey Matveev's repositories - btrtrc.git/blob - test/leecher-storage.go
Rearrange transfer tests so build directives are applied by the right packages
[btrtrc.git] / test / leecher-storage.go
1 package test
2
3 import (
4         "fmt"
5         "io"
6         "os"
7         "runtime"
8         "testing"
9         "testing/iotest"
10
11         "github.com/anacrolix/missinggo/v2/bitmap"
12         "github.com/anacrolix/torrent"
13         "github.com/anacrolix/torrent/internal/testutil"
14         "github.com/anacrolix/torrent/storage"
15         "github.com/frankban/quicktest"
16         "github.com/stretchr/testify/assert"
17         "github.com/stretchr/testify/require"
18         "golang.org/x/time/rate"
19 )
20
21 type LeecherStorageTestCase struct {
22         Name       string
23         Factory    StorageFactory
24         GoMaxProcs int
25 }
26
27 type StorageFactory func(string) storage.ClientImplCloser
28
29 func TestLeecherStorage(t *testing.T, ls LeecherStorageTestCase) {
30         // Seeder storage
31         for _, ss := range []struct {
32                 name string
33                 f    StorageFactory
34         }{
35                 {"File", storage.NewFile},
36                 {"Mmap", storage.NewMMap},
37         } {
38                 t.Run(fmt.Sprintf("%sSeederStorage", ss.name), func(t *testing.T) {
39                         for _, responsive := range []bool{false, true} {
40                                 t.Run(fmt.Sprintf("Responsive=%v", responsive), func(t *testing.T) {
41                                         t.Run("NoReadahead", func(t *testing.T) {
42                                                 testClientTransfer(t, testClientTransferParams{
43                                                         Responsive:     responsive,
44                                                         SeederStorage:  ss.f,
45                                                         LeecherStorage: ls.Factory,
46                                                         GOMAXPROCS:     ls.GoMaxProcs,
47                                                 })
48                                         })
49                                         for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
50                                                 t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
51                                                         testClientTransfer(t, testClientTransferParams{
52                                                                 SeederStorage:  ss.f,
53                                                                 Responsive:     responsive,
54                                                                 SetReadahead:   true,
55                                                                 Readahead:      readahead,
56                                                                 LeecherStorage: ls.Factory,
57                                                                 GOMAXPROCS:     ls.GoMaxProcs,
58                                                         })
59                                                 })
60                                         }
61                                 })
62                         }
63                 })
64         }
65 }
66
67 type ConfigureClient struct {
68         Config func(cfg *torrent.ClientConfig)
69         Client func(cl *torrent.Client)
70 }
71
72 type testClientTransferParams struct {
73         Responsive     bool
74         Readahead      int64
75         SetReadahead   bool
76         LeecherStorage func(string) storage.ClientImplCloser
77         // TODO: Use a generic option type. This is the capacity of the leecher storage for determining
78         // whether it's possible for the leecher to be Complete. 0 currently means no limit.
79         LeecherStorageCapacity     int64
80         SeederStorage              func(string) storage.ClientImplCloser
81         SeederUploadRateLimiter    *rate.Limiter
82         LeecherDownloadRateLimiter *rate.Limiter
83         ConfigureSeeder            ConfigureClient
84         ConfigureLeecher           ConfigureClient
85         GOMAXPROCS                 int
86
87         LeecherStartsWithoutMetadata bool
88 }
89
90 // Creates a seeder and a leecher, and ensures the data transfers when a read
91 // is attempted on the leecher.
92 func testClientTransfer(t *testing.T, ps testClientTransferParams) {
93         t.Parallel()
94
95         prevGOMAXPROCS := runtime.GOMAXPROCS(ps.GOMAXPROCS)
96         newGOMAXPROCS := prevGOMAXPROCS
97         if ps.GOMAXPROCS > 0 {
98                 newGOMAXPROCS = ps.GOMAXPROCS
99         }
100         defer func() {
101                 quicktest.Check(t, runtime.GOMAXPROCS(prevGOMAXPROCS), quicktest.ContentEquals, newGOMAXPROCS)
102         }()
103
104         greetingTempDir, mi := testutil.GreetingTestTorrent()
105         defer os.RemoveAll(greetingTempDir)
106         // Create seeder and a Torrent.
107         cfg := torrent.TestingConfig(t)
108         // cfg.Debug = true
109         cfg.Seed = true
110         // Some test instances don't like this being on, even when there's no cache involved.
111         cfg.DropMutuallyCompletePeers = false
112         if ps.SeederUploadRateLimiter != nil {
113                 cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
114         }
115         // cfg.ListenAddr = "localhost:4000"
116         if ps.SeederStorage != nil {
117                 storage := ps.SeederStorage(greetingTempDir)
118                 defer storage.Close()
119                 cfg.DefaultStorage = storage
120         } else {
121                 cfg.DataDir = greetingTempDir
122         }
123         if ps.ConfigureSeeder.Config != nil {
124                 ps.ConfigureSeeder.Config(cfg)
125         }
126         seeder, err := torrent.NewClient(cfg)
127         require.NoError(t, err)
128         if ps.ConfigureSeeder.Client != nil {
129                 ps.ConfigureSeeder.Client(seeder)
130         }
131         defer testutil.ExportStatusWriter(seeder, "s", t)()
132         seederTorrent, _, _ := seeder.AddTorrentSpec(torrent.TorrentSpecFromMetaInfo(mi))
133         // Run a Stats right after Closing the Client. This will trigger the Stats
134         // panic in #214 caused by RemoteAddr on Closed uTP sockets.
135         defer seederTorrent.Stats()
136         defer seeder.Close()
137         // Adding a torrent and setting the info should trigger piece checks for everything
138         // automatically. Wait until the seed Torrent agrees that everything is available.
139         <-seederTorrent.Complete.On()
140         // Create leecher and a Torrent.
141         leecherDataDir := t.TempDir()
142         cfg = torrent.TestingConfig(t)
143         // See the seeder client config comment.
144         cfg.DropMutuallyCompletePeers = false
145         if ps.LeecherStorage == nil {
146                 cfg.DataDir = leecherDataDir
147         } else {
148                 storage := ps.LeecherStorage(leecherDataDir)
149                 defer storage.Close()
150                 cfg.DefaultStorage = storage
151         }
152         if ps.LeecherDownloadRateLimiter != nil {
153                 cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
154         }
155         cfg.Seed = false
156         // cfg.Debug = true
157         if ps.ConfigureLeecher.Config != nil {
158                 ps.ConfigureLeecher.Config(cfg)
159         }
160         leecher, err := torrent.NewClient(cfg)
161         require.NoError(t, err)
162         defer leecher.Close()
163         if ps.ConfigureLeecher.Client != nil {
164                 ps.ConfigureLeecher.Client(leecher)
165         }
166         defer testutil.ExportStatusWriter(leecher, "l", t)()
167         leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *torrent.TorrentSpec) {
168                 ret = torrent.TorrentSpecFromMetaInfo(mi)
169                 ret.ChunkSize = 2
170                 if ps.LeecherStartsWithoutMetadata {
171                         ret.InfoBytes = nil
172                 }
173                 return
174         }())
175         require.NoError(t, err)
176         assert.False(t, leecherTorrent.Complete.Bool())
177         assert.True(t, new)
178
179         //// This was used when observing coalescing of piece state changes.
180         //logPieceStateChanges(leecherTorrent)
181
182         // Now do some things with leecher and seeder.
183         added := leecherTorrent.AddClientPeer(seeder)
184         assert.False(t, leecherTorrent.Seeding())
185         // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
186         // should be sitting idle until we demand data.
187         if !ps.LeecherStartsWithoutMetadata {
188                 assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
189         }
190         if ps.LeecherStartsWithoutMetadata {
191                 <-leecherTorrent.GotInfo()
192         }
193         r := leecherTorrent.NewReader()
194         defer r.Close()
195         go leecherTorrent.SetInfoBytes(mi.InfoBytes)
196         if ps.Responsive {
197                 r.SetResponsive()
198         }
199         if ps.SetReadahead {
200                 r.SetReadahead(ps.Readahead)
201         }
202         assertReadAllGreeting(t, r)
203         info, err := mi.UnmarshalInfo()
204         require.NoError(t, err)
205         canComplete := ps.LeecherStorageCapacity == 0 || ps.LeecherStorageCapacity >= info.TotalLength()
206         if !canComplete {
207                 // Reading from a cache doesn't refresh older pieces until we fail to read those, so we need
208                 // to force a refresh since we just read the contents from start to finish.
209                 go leecherTorrent.VerifyData()
210         }
211         if canComplete {
212                 <-leecherTorrent.Complete.On()
213         } else {
214                 <-leecherTorrent.Complete.Off()
215         }
216         assert.NotEmpty(t, seederTorrent.PeerConns())
217         leecherPeerConns := leecherTorrent.PeerConns()
218         if cfg.DropMutuallyCompletePeers {
219                 // I don't think we can assume it will be empty already, due to timing.
220                 // assert.Empty(t, leecherPeerConns)
221         } else {
222                 assert.NotEmpty(t, leecherPeerConns)
223         }
224         foundSeeder := false
225         for _, pc := range leecherPeerConns {
226                 completed := pc.PeerPieces().GetCardinality()
227                 t.Logf("peer conn %v has %v completed pieces", pc, completed)
228                 if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
229                         foundSeeder = true
230                 }
231         }
232         if !foundSeeder {
233                 t.Errorf("didn't find seeder amongst leecher peer conns")
234         }
235
236         seederStats := seederTorrent.Stats()
237         assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
238         assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
239
240         leecherStats := leecherTorrent.Stats()
241         assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
242         assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
243
244         // Try reading through again for the cases where the torrent data size
245         // exceeds the size of the cache.
246         assertReadAllGreeting(t, r)
247 }
248
249 func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
250         pos, err := r.Seek(0, io.SeekStart)
251         assert.NoError(t, err)
252         assert.EqualValues(t, 0, pos)
253         quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
254 }