]> Sergey Matveev's repositories - btrtrc.git/blob - cmd/torrent/main.go
cmd/torrent: Refactor and set progress interval to 3s
[btrtrc.git] / cmd / torrent / main.go
1 // Downloads torrents from the command-line.
2 package main
3
4 import (
5         "errors"
6         "expvar"
7         "fmt"
8         "io"
9         stdLog "log"
10         "net"
11         "net/http"
12         "os"
13         "os/signal"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18
19         "github.com/anacrolix/args"
20         "github.com/anacrolix/envpprof"
21         "github.com/anacrolix/log"
22         "github.com/anacrolix/missinggo/v2"
23         "github.com/anacrolix/tagflag"
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/version"
26         "github.com/davecgh/go-spew/spew"
27         "github.com/dustin/go-humanize"
28         "golang.org/x/time/rate"
29
30         "github.com/anacrolix/torrent"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 func torrentBar(t *torrent.Torrent, pieceStates bool) {
37         go func() {
38                 start := time.Now()
39                 if t.Info() == nil {
40                         fmt.Printf("%v: getting torrent info for %q\n", time.Since(start), t.Name())
41                         <-t.GotInfo()
42                 }
43                 lastStats := t.Stats()
44                 var lastLine string
45                 interval := 3 * time.Second
46                 for range time.Tick(interval) {
47                         var completedPieces, partialPieces int
48                         psrs := t.PieceStateRuns()
49                         for _, r := range psrs {
50                                 if r.Complete {
51                                         completedPieces += r.Length
52                                 }
53                                 if r.Partial {
54                                         partialPieces += r.Length
55                                 }
56                         }
57                         stats := t.Stats()
58                         byteRate := int64(time.Second)
59                         byteRate *= stats.BytesReadUsefulData.Int64() - lastStats.BytesReadUsefulData.Int64()
60                         byteRate /= int64(interval)
61                         line := fmt.Sprintf(
62                                 "%v: downloading %q: %s/%s, %d/%d pieces completed (%d partial): %v/s\n",
63                                 time.Since(start),
64                                 t.Name(),
65                                 humanize.Bytes(uint64(t.BytesCompleted())),
66                                 humanize.Bytes(uint64(t.Length())),
67                                 completedPieces,
68                                 t.NumPieces(),
69                                 partialPieces,
70                                 humanize.Bytes(uint64(byteRate)),
71                         )
72                         if line != lastLine {
73                                 lastLine = line
74                                 os.Stdout.WriteString(line)
75                         }
76                         if pieceStates {
77                                 fmt.Println(psrs)
78                         }
79                         lastStats = stats
80                 }
81         }()
82 }
83
84 type stringAddr string
85
86 func (stringAddr) Network() string   { return "" }
87 func (me stringAddr) String() string { return string(me) }
88
89 func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) {
90         for _, ta := range addrs {
91                 ret = append(ret, torrent.PeerInfo{
92                         Addr: stringAddr(ta),
93                 })
94         }
95         return
96 }
97
98 func addTorrents(client *torrent.Client, flags downloadFlags) error {
99         testPeers := resolveTestPeers(flags.TestPeer)
100         for _, arg := range flags.Torrent {
101                 t, err := func() (*torrent.Torrent, error) {
102                         if strings.HasPrefix(arg, "magnet:") {
103                                 t, err := client.AddMagnet(arg)
104                                 if err != nil {
105                                         return nil, fmt.Errorf("error adding magnet: %w", err)
106                                 }
107                                 return t, nil
108                         } else if strings.HasPrefix(arg, "http://") || strings.HasPrefix(arg, "https://") {
109                                 response, err := http.Get(arg)
110                                 if err != nil {
111                                         return nil, fmt.Errorf("Error downloading torrent file: %s", err)
112                                 }
113
114                                 metaInfo, err := metainfo.Load(response.Body)
115                                 defer response.Body.Close()
116                                 if err != nil {
117                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
118                                 }
119                                 t, err := client.AddTorrent(metaInfo)
120                                 if err != nil {
121                                         return nil, fmt.Errorf("adding torrent: %w", err)
122                                 }
123                                 return t, nil
124                         } else if strings.HasPrefix(arg, "infohash:") {
125                                 t, _ := client.AddTorrentInfoHash(metainfo.NewHashFromHex(strings.TrimPrefix(arg, "infohash:")))
126                                 return t, nil
127                         } else {
128                                 metaInfo, err := metainfo.LoadFromFile(arg)
129                                 if err != nil {
130                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
131                                 }
132                                 t, err := client.AddTorrent(metaInfo)
133                                 if err != nil {
134                                         return nil, fmt.Errorf("adding torrent: %w", err)
135                                 }
136                                 return t, nil
137                         }
138                 }()
139                 if err != nil {
140                         return fmt.Errorf("adding torrent for %q: %w", arg, err)
141                 }
142                 if flags.Progress {
143                         torrentBar(t, flags.PieceStates)
144                 }
145                 t.AddPeers(testPeers)
146                 go func() {
147                         <-t.GotInfo()
148                         if len(flags.File) == 0 {
149                                 t.DownloadAll()
150                         } else {
151                                 for _, f := range t.Files() {
152                                         for _, fileArg := range flags.File {
153                                                 if f.DisplayPath() == fileArg {
154                                                         f.Download()
155                                                 }
156                                         }
157                                 }
158                         }
159                 }()
160         }
161         return nil
162 }
163
164 type downloadFlags struct {
165         Debug bool
166         DownloadCmd
167 }
168
169 type DownloadCmd struct {
170         Mmap               bool           `help:"memory-map torrent data"`
171         TestPeer           []string       `help:"addresses of some starting peers"`
172         Seed               bool           `help:"seed after download is complete"`
173         Addr               string         `help:"network listen addr"`
174         MaxUnverifiedBytes tagflag.Bytes  `help:"maximum number bytes to have pending verification"`
175         UploadRate         *tagflag.Bytes `help:"max piece bytes to send per second"`
176         DownloadRate       *tagflag.Bytes `help:"max bytes per second down from peers"`
177         PackedBlocklist    string
178         PublicIP           net.IP
179         Progress           bool `default:"true"`
180         PieceStates        bool
181         Quiet              bool  `help:"discard client logging"`
182         Stats              *bool `help:"print stats at termination"`
183         Dht                bool  `default:"true"`
184
185         TcpPeers        bool `default:"true"`
186         UtpPeers        bool `default:"true"`
187         Webtorrent      bool `default:"true"`
188         DisableWebseeds bool
189
190         Ipv4 bool `default:"true"`
191         Ipv6 bool `default:"true"`
192         Pex  bool `default:"true"`
193
194         File    []string
195         Torrent []string `arity:"+" help:"torrent file path or magnet uri" arg:"positional"`
196 }
197
198 func stdoutAndStderrAreSameFile() bool {
199         fi1, _ := os.Stdout.Stat()
200         fi2, _ := os.Stderr.Stat()
201         return os.SameFile(fi1, fi2)
202 }
203
204 func statsEnabled(flags downloadFlags) bool {
205         if flags.Stats == nil {
206                 return flags.Debug
207         }
208         return *flags.Stats
209 }
210
211 func exitSignalHandlers(notify *missinggo.SynchronizedEvent) {
212         c := make(chan os.Signal, 1)
213         signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
214         for {
215                 log.Printf("close signal received: %+v", <-c)
216                 notify.Set()
217         }
218 }
219
220 func main() {
221         defer envpprof.Stop()
222         if err := mainErr(); err != nil {
223                 log.Printf("error in main: %v", err)
224                 os.Exit(1)
225         }
226 }
227
228 func mainErr() error {
229         stdLog.SetFlags(stdLog.Flags() | stdLog.Lshortfile)
230         debug := args.Flag(args.FlagOpt{Long: "debug"})
231         p := args.ParseMain(
232                 debug,
233                 args.Subcommand("metainfo", metainfoCmd),
234                 args.Subcommand("announce", func(p args.SubCmdCtx) error {
235                         var cmd AnnounceCmd
236                         err := p.NewParser().AddParams(
237                                 args.Pos("tracker", &cmd.Tracker),
238                                 args.Pos("infohash", &cmd.InfoHash)).Parse()
239                         if err != nil {
240                                 return err
241                         }
242                         return announceErr(cmd)
243                 }),
244                 args.Subcommand("download", func(p args.SubCmdCtx) error {
245                         var dlf DownloadCmd
246                         err := p.NewParser().AddParams(
247                                 append(args.FromStruct(&dlf), debug)...,
248                         ).Parse()
249                         if err != nil {
250                                 return err
251                         }
252                         return downloadErr(downloadFlags{
253                                 Debug:       debug.Bool(),
254                                 DownloadCmd: dlf,
255                         })
256                 }),
257                 args.Subcommand(
258                         "spew-bencoding",
259                         func(p args.SubCmdCtx) error {
260                                 d := bencode.NewDecoder(os.Stdin)
261                                 for i := 0; ; i++ {
262                                         var v interface{}
263                                         err := d.Decode(&v)
264                                         if err == io.EOF {
265                                                 break
266                                         }
267                                         if err != nil {
268                                                 return fmt.Errorf("decoding message index %d: %w", i, err)
269                                         }
270                                         spew.Dump(v)
271                                 }
272                                 return nil
273                         },
274                         args.Help("reads bencoding from stdin into Go native types and spews the result"),
275                 ),
276                 args.Subcommand("version", func(p args.SubCmdCtx) error {
277                         fmt.Printf("HTTP User-Agent: %q\n", version.DefaultHttpUserAgent)
278                         fmt.Printf("Torrent client version: %q\n", version.DefaultExtendedHandshakeClientVersion)
279                         fmt.Printf("Torrent version prefix: %q\n", version.DefaultBep20Prefix)
280                         return nil
281                 }),
282         )
283         if p.Err != nil {
284                 if errors.Is(p.Err, args.ErrHelped) {
285                         return nil
286                 }
287                 return p.Err
288         }
289         if !p.RanSubCmd {
290                 p.PrintChoices(os.Stderr)
291                 args.FatalUsage()
292         }
293         return nil
294 }
295
296 func downloadErr(flags downloadFlags) error {
297         clientConfig := torrent.NewDefaultClientConfig()
298         clientConfig.DisableWebseeds = flags.DisableWebseeds
299         clientConfig.DisableTCP = !flags.TcpPeers
300         clientConfig.DisableUTP = !flags.UtpPeers
301         clientConfig.DisableIPv4 = !flags.Ipv4
302         clientConfig.DisableIPv6 = !flags.Ipv6
303         clientConfig.DisableAcceptRateLimiting = true
304         clientConfig.NoDHT = !flags.Dht
305         clientConfig.Debug = flags.Debug
306         clientConfig.Seed = flags.Seed
307         clientConfig.PublicIp4 = flags.PublicIP
308         clientConfig.PublicIp6 = flags.PublicIP
309         clientConfig.DisablePEX = !flags.Pex
310         clientConfig.DisableWebtorrent = !flags.Webtorrent
311         if flags.PackedBlocklist != "" {
312                 blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
313                 if err != nil {
314                         return fmt.Errorf("loading blocklist: %v", err)
315                 }
316                 defer blocklist.Close()
317                 clientConfig.IPBlocklist = blocklist
318         }
319         if flags.Mmap {
320                 clientConfig.DefaultStorage = storage.NewMMap("")
321         }
322         if flags.Addr != "" {
323                 clientConfig.SetListenAddr(flags.Addr)
324         }
325         if flags.UploadRate != nil {
326                 clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(*flags.UploadRate), 256<<10)
327         }
328         if flags.DownloadRate != nil {
329                 clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(*flags.DownloadRate), 1<<20)
330         }
331         if flags.Quiet {
332                 clientConfig.Logger = log.Discard
333         }
334         clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
335
336         var stop missinggo.SynchronizedEvent
337         defer func() {
338                 stop.Set()
339         }()
340
341         client, err := torrent.NewClient(clientConfig)
342         if err != nil {
343                 return fmt.Errorf("creating client: %w", err)
344         }
345         var clientClose sync.Once //In certain situations, close was being called more than once.
346         defer clientClose.Do(client.Close)
347         go exitSignalHandlers(&stop)
348         go func() {
349                 <-stop.C()
350                 clientClose.Do(client.Close)
351         }()
352
353         // Write status on the root path on the default HTTP muxer. This will be bound to localhost
354         // somewhere if GOPPROF is set, thanks to the envpprof import.
355         http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
356                 client.WriteStatus(w)
357         })
358         err = addTorrents(client, flags)
359         if err != nil {
360                 return fmt.Errorf("adding torrents: %w", err)
361         }
362         defer outputStats(client, flags)
363         if client.WaitAll() {
364                 log.Print("downloaded ALL the torrents")
365         } else {
366                 err = errors.New("y u no complete torrents?!")
367         }
368         if flags.Seed {
369                 if len(client.Torrents()) == 0 {
370                         log.Print("no torrents to seed")
371                 } else {
372                         outputStats(client, flags)
373                         <-stop.C()
374                 }
375         }
376         spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
377         spew.Dump(client.ConnStats())
378         clStats := client.ConnStats()
379         sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64()
380         log.Printf(
381                 "client read %v, %v was useful data. sent %v non-data bytes",
382                 humanize.Bytes(uint64(clStats.BytesRead.Int64())),
383                 100*float64(clStats.BytesReadUsefulData.Int64())/float64(clStats.BytesRead.Int64()),
384                 humanize.Bytes(uint64(sentOverhead)))
385         return err
386 }
387
388 func outputStats(cl *torrent.Client, args downloadFlags) {
389         if !statsEnabled(args) {
390                 return
391         }
392         expvar.Do(func(kv expvar.KeyValue) {
393                 fmt.Printf("%s: %s\n", kv.Key, kv.Value)
394         })
395         cl.WriteStatus(os.Stdout)
396 }