]> Sergey Matveev's repositories - btrtrc.git/blob - cmd/torrent/download.go
0ec4b99088250bc37cae52daf60c9791155519bc
[btrtrc.git] / cmd / torrent / download.go
1 package main
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "io"
8         "net"
9         "net/http"
10         "os"
11         "os/signal"
12         "strings"
13         "sync"
14         "syscall"
15         "time"
16
17         "github.com/anacrolix/log"
18         "github.com/anacrolix/tagflag"
19         "github.com/davecgh/go-spew/spew"
20         "github.com/dustin/go-humanize"
21         "golang.org/x/time/rate"
22
23         "github.com/anacrolix/torrent"
24         "github.com/anacrolix/torrent/iplist"
25         "github.com/anacrolix/torrent/metainfo"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27         "github.com/anacrolix/torrent/storage"
28 )
29
30 func torrentBar(t *torrent.Torrent, pieceStates bool) {
31         go func() {
32                 start := time.Now()
33                 if t.Info() == nil {
34                         fmt.Printf("%v: getting torrent info for %q\n", time.Since(start), t.Name())
35                         <-t.GotInfo()
36                 }
37                 lastStats := t.Stats()
38                 var lastLine string
39                 interval := 3 * time.Second
40                 for range time.Tick(interval) {
41                         var completedPieces, partialPieces int
42                         psrs := t.PieceStateRuns()
43                         for _, r := range psrs {
44                                 if r.Complete {
45                                         completedPieces += r.Length
46                                 }
47                                 if r.Partial {
48                                         partialPieces += r.Length
49                                 }
50                         }
51                         stats := t.Stats()
52                         byteRate := int64(time.Second)
53                         byteRate *= stats.BytesReadUsefulData.Int64() - lastStats.BytesReadUsefulData.Int64()
54                         byteRate /= int64(interval)
55                         line := fmt.Sprintf(
56                                 "%v: downloading %q: %s/%s, %d/%d pieces completed (%d partial): %v/s\n",
57                                 time.Since(start),
58                                 t.Name(),
59                                 humanize.Bytes(uint64(t.BytesCompleted())),
60                                 humanize.Bytes(uint64(t.Length())),
61                                 completedPieces,
62                                 t.NumPieces(),
63                                 partialPieces,
64                                 humanize.Bytes(uint64(byteRate)),
65                         )
66                         if line != lastLine {
67                                 lastLine = line
68                                 os.Stdout.WriteString(line)
69                         }
70                         if pieceStates {
71                                 fmt.Println(psrs)
72                         }
73                         lastStats = stats
74                 }
75         }()
76 }
77
78 type stringAddr string
79
80 func (stringAddr) Network() string   { return "" }
81 func (me stringAddr) String() string { return string(me) }
82
83 func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) {
84         for _, ta := range addrs {
85                 ret = append(ret, torrent.PeerInfo{
86                         Addr: stringAddr(ta),
87                 })
88         }
89         return
90 }
91
92 func addTorrents(ctx context.Context, client *torrent.Client, flags downloadFlags, wg *sync.WaitGroup) error {
93         testPeers := resolveTestPeers(flags.TestPeer)
94         for _, arg := range flags.Torrent {
95                 t, err := func() (*torrent.Torrent, error) {
96                         if strings.HasPrefix(arg, "magnet:") {
97                                 t, err := client.AddMagnet(arg)
98                                 if err != nil {
99                                         return nil, fmt.Errorf("error adding magnet: %w", err)
100                                 }
101                                 return t, nil
102                         } else if strings.HasPrefix(arg, "http://") || strings.HasPrefix(arg, "https://") {
103                                 response, err := http.Get(arg)
104                                 if err != nil {
105                                         return nil, fmt.Errorf("Error downloading torrent file: %s", err)
106                                 }
107
108                                 metaInfo, err := metainfo.Load(response.Body)
109                                 defer response.Body.Close()
110                                 if err != nil {
111                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
112                                 }
113                                 t, err := client.AddTorrent(metaInfo)
114                                 if err != nil {
115                                         return nil, fmt.Errorf("adding torrent: %w", err)
116                                 }
117                                 return t, nil
118                         } else if strings.HasPrefix(arg, "infohash:") {
119                                 t, _ := client.AddTorrentInfoHash(metainfo.NewHashFromHex(strings.TrimPrefix(arg, "infohash:")))
120                                 return t, nil
121                         } else {
122                                 metaInfo, err := metainfo.LoadFromFile(arg)
123                                 if err != nil {
124                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
125                                 }
126                                 t, err := client.AddTorrent(metaInfo)
127                                 if err != nil {
128                                         return nil, fmt.Errorf("adding torrent: %w", err)
129                                 }
130                                 return t, nil
131                         }
132                 }()
133                 if err != nil {
134                         return fmt.Errorf("adding torrent for %q: %w", arg, err)
135                 }
136                 if flags.Progress {
137                         torrentBar(t, flags.PieceStates)
138                 }
139                 t.AddPeers(testPeers)
140                 wg.Add(1)
141                 go func() {
142                         defer wg.Done()
143                         select {
144                         case <-ctx.Done():
145                                 return
146                         case <-t.GotInfo():
147                         }
148                         if flags.SaveMetainfos {
149                                 path := fmt.Sprintf("%v.torrent", t.InfoHash().HexString())
150                                 err := writeMetainfoToFile(t.Metainfo(), path)
151                                 if err == nil {
152                                         log.Printf("wrote %q", path)
153                                 } else {
154                                         log.Printf("error writing %q: %v", path, err)
155                                 }
156                         }
157                         if len(flags.File) == 0 {
158                                 t.DownloadAll()
159                                 wg.Add(1)
160                                 go func() {
161                                         defer wg.Done()
162                                         waitForPieces(ctx, t, 0, t.NumPieces())
163                                 }()
164                                 if flags.LinearDiscard {
165                                         r := t.NewReader()
166                                         io.Copy(io.Discard, r)
167                                         r.Close()
168                                 }
169                         } else {
170                                 for _, f := range t.Files() {
171                                         for _, fileArg := range flags.File {
172                                                 if f.DisplayPath() == fileArg {
173                                                         wg.Add(1)
174                                                         go func() {
175                                                                 defer wg.Done()
176                                                                 waitForPieces(ctx, t, f.BeginPieceIndex(), f.EndPieceIndex())
177                                                         }()
178                                                         f.Download()
179                                                         if flags.LinearDiscard {
180                                                                 r := f.NewReader()
181                                                                 go func() {
182                                                                         defer r.Close()
183                                                                         io.Copy(io.Discard, r)
184                                                                 }()
185                                                         }
186                                                 }
187                                         }
188                                 }
189                         }
190                 }()
191         }
192         return nil
193 }
194
195 func waitForPieces(ctx context.Context, t *torrent.Torrent, beginIndex, endIndex int) {
196         sub := t.SubscribePieceStateChanges()
197         defer sub.Close()
198         expected := storage.Completion{
199                 Complete: true,
200                 Ok:       true,
201         }
202         pending := make(map[int]struct{})
203         for i := beginIndex; i < endIndex; i++ {
204                 if t.Piece(i).State().Completion != expected {
205                         pending[i] = struct{}{}
206                 }
207         }
208         for {
209                 if len(pending) == 0 {
210                         return
211                 }
212                 select {
213                 case ev := <-sub.Values:
214                         if ev.Completion == expected {
215                                 delete(pending, ev.Index)
216                         }
217                 case <-ctx.Done():
218                         return
219                 }
220         }
221 }
222
223 func writeMetainfoToFile(mi metainfo.MetaInfo, path string) error {
224         f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o640)
225         if err != nil {
226                 return err
227         }
228         defer f.Close()
229         err = mi.Write(f)
230         if err != nil {
231                 return err
232         }
233         return f.Close()
234 }
235
236 type downloadFlags struct {
237         Debug bool
238         DownloadCmd
239 }
240
241 type DownloadCmd struct {
242         SaveMetainfos      bool
243         Mmap               bool           `help:"memory-map torrent data"`
244         Seed               bool           `help:"seed after download is complete"`
245         Addr               string         `help:"network listen addr"`
246         MaxUnverifiedBytes *tagflag.Bytes `help:"maximum number bytes to have pending verification"`
247         UploadRate         *tagflag.Bytes `help:"max piece bytes to send per second"`
248         DownloadRate       *tagflag.Bytes `help:"max bytes per second down from peers"`
249         PackedBlocklist    string
250         PublicIP           net.IP
251         Progress           bool `default:"true"`
252         PieceStates        bool `help:"Output piece state runs at progress intervals."`
253         Quiet              bool `help:"discard client logging"`
254         Stats              bool `help:"print stats at termination"`
255         Dht                bool `default:"true"`
256         PortForward        bool `default:"true"`
257
258         TcpPeers        bool `default:"true"`
259         UtpPeers        bool `default:"true"`
260         Webtorrent      bool `default:"true"`
261         DisableWebseeds bool
262         // Don't progress past handshake for peer connections where the peer doesn't offer the fast
263         // extension.
264         RequireFastExtension bool
265
266         Ipv4 bool `default:"true"`
267         Ipv6 bool `default:"true"`
268         Pex  bool `default:"true"`
269
270         LinearDiscard bool     `help:"Read and discard selected regions from start to finish. Useful for testing simultaneous Reader and static file prioritization."`
271         TestPeer      []string `help:"addresses of some starting peers"`
272
273         File    []string
274         Torrent []string `arity:"+" help:"torrent file path or magnet uri" arg:"positional"`
275 }
276
277 func statsEnabled(flags downloadFlags) bool {
278         return flags.Stats
279 }
280
281 func downloadErr(flags downloadFlags) error {
282         clientConfig := torrent.NewDefaultClientConfig()
283         clientConfig.DisableWebseeds = flags.DisableWebseeds
284         clientConfig.DisableTCP = !flags.TcpPeers
285         clientConfig.DisableUTP = !flags.UtpPeers
286         clientConfig.DisableIPv4 = !flags.Ipv4
287         clientConfig.DisableIPv6 = !flags.Ipv6
288         clientConfig.DisableAcceptRateLimiting = true
289         clientConfig.NoDHT = !flags.Dht
290         clientConfig.Debug = flags.Debug
291         clientConfig.Seed = flags.Seed
292         clientConfig.PublicIp4 = flags.PublicIP.To4()
293         clientConfig.PublicIp6 = flags.PublicIP
294         clientConfig.DisablePEX = !flags.Pex
295         clientConfig.DisableWebtorrent = !flags.Webtorrent
296         clientConfig.NoDefaultPortForwarding = !flags.PortForward
297         if flags.PackedBlocklist != "" {
298                 blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
299                 if err != nil {
300                         return fmt.Errorf("loading blocklist: %v", err)
301                 }
302                 defer blocklist.Close()
303                 clientConfig.IPBlocklist = blocklist
304         }
305         if flags.Mmap {
306                 clientConfig.DefaultStorage = storage.NewMMap("")
307         }
308         if flags.Addr != "" {
309                 clientConfig.SetListenAddr(flags.Addr)
310         }
311         if flags.UploadRate != nil {
312                 // TODO: I think the upload rate limit could be much lower.
313                 clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(*flags.UploadRate), 256<<10)
314         }
315         if flags.DownloadRate != nil {
316                 clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(*flags.DownloadRate), 1<<16)
317         }
318         {
319                 logger := log.Default.WithNames("main", "client")
320                 if flags.Quiet {
321                         logger = logger.FilterLevel(log.Critical)
322                 }
323                 clientConfig.Logger = logger
324         }
325         if flags.RequireFastExtension {
326                 clientConfig.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true)
327         }
328         if flags.MaxUnverifiedBytes != nil {
329                 clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
330         }
331
332         ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
333         defer cancel()
334
335         client, err := torrent.NewClient(clientConfig)
336         if err != nil {
337                 return fmt.Errorf("creating client: %w", err)
338         }
339         defer client.Close()
340
341         // Write status on the root path on the default HTTP muxer. This will be bound to localhost
342         // somewhere if GOPPROF is set, thanks to the envpprof import.
343         http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
344                 client.WriteStatus(w)
345         })
346         var wg sync.WaitGroup
347         err = addTorrents(ctx, client, flags, &wg)
348         if err != nil {
349                 return fmt.Errorf("adding torrents: %w", err)
350         }
351         started := time.Now()
352         defer outputStats(client, flags)
353         wg.Wait()
354         if ctx.Err() == nil {
355                 log.Print("downloaded ALL the torrents")
356         } else {
357                 err = ctx.Err()
358         }
359         clientConnStats := client.ConnStats()
360         log.Printf("average download rate: %v",
361                 humanize.Bytes(uint64(
362                         time.Duration(
363                                 clientConnStats.BytesReadUsefulData.Int64(),
364                         )*time.Second/time.Since(started),
365                 )))
366         if flags.Seed {
367                 if len(client.Torrents()) == 0 {
368                         log.Print("no torrents to seed")
369                 } else {
370                         outputStats(client, flags)
371                         <-ctx.Done()
372                 }
373         }
374         spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
375         spew.Dump(client.ConnStats())
376         clStats := client.ConnStats()
377         sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64()
378         log.Printf(
379                 "client read %v, %.1f%% was useful data. sent %v non-data bytes",
380                 humanize.Bytes(uint64(clStats.BytesRead.Int64())),
381                 100*float64(clStats.BytesReadUsefulData.Int64())/float64(clStats.BytesRead.Int64()),
382                 humanize.Bytes(uint64(sentOverhead)))
383         return err
384 }
385
386 func outputStats(cl *torrent.Client, args downloadFlags) {
387         if !statsEnabled(args) {
388                 return
389         }
390         expvar.Do(func(kv expvar.KeyValue) {
391                 fmt.Printf("%s: %s\n", kv.Key, kv.Value)
392         })
393         cl.WriteStatus(os.Stdout)
394 }