]> Sergey Matveev's repositories - btrtrc.git/blob - cmd/torrent/main.go
cmd/torrent: Add bencode {json,spew} commands
[btrtrc.git] / cmd / torrent / main.go
1 // Downloads torrents from the command-line.
2 package main
3
4 import (
5         "encoding/json"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         stdLog "log"
11         "net"
12         "net/http"
13         "os"
14         "os/signal"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19
20         "github.com/anacrolix/args"
21         "github.com/anacrolix/envpprof"
22         "github.com/anacrolix/log"
23         "github.com/anacrolix/missinggo/v2"
24         "github.com/anacrolix/tagflag"
25         "github.com/anacrolix/torrent/bencode"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27         "github.com/anacrolix/torrent/version"
28         "github.com/davecgh/go-spew/spew"
29         "github.com/dustin/go-humanize"
30         "golang.org/x/time/rate"
31
32         "github.com/anacrolix/torrent"
33         "github.com/anacrolix/torrent/iplist"
34         "github.com/anacrolix/torrent/metainfo"
35         "github.com/anacrolix/torrent/storage"
36 )
37
38 func torrentBar(t *torrent.Torrent, pieceStates bool) {
39         go func() {
40                 start := time.Now()
41                 if t.Info() == nil {
42                         fmt.Printf("%v: getting torrent info for %q\n", time.Since(start), t.Name())
43                         <-t.GotInfo()
44                 }
45                 lastStats := t.Stats()
46                 var lastLine string
47                 interval := 3 * time.Second
48                 for range time.Tick(interval) {
49                         var completedPieces, partialPieces int
50                         psrs := t.PieceStateRuns()
51                         for _, r := range psrs {
52                                 if r.Complete {
53                                         completedPieces += r.Length
54                                 }
55                                 if r.Partial {
56                                         partialPieces += r.Length
57                                 }
58                         }
59                         stats := t.Stats()
60                         byteRate := int64(time.Second)
61                         byteRate *= stats.BytesReadUsefulData.Int64() - lastStats.BytesReadUsefulData.Int64()
62                         byteRate /= int64(interval)
63                         line := fmt.Sprintf(
64                                 "%v: downloading %q: %s/%s, %d/%d pieces completed (%d partial): %v/s\n",
65                                 time.Since(start),
66                                 t.Name(),
67                                 humanize.Bytes(uint64(t.BytesCompleted())),
68                                 humanize.Bytes(uint64(t.Length())),
69                                 completedPieces,
70                                 t.NumPieces(),
71                                 partialPieces,
72                                 humanize.Bytes(uint64(byteRate)),
73                         )
74                         if line != lastLine {
75                                 lastLine = line
76                                 os.Stdout.WriteString(line)
77                         }
78                         if pieceStates {
79                                 fmt.Println(psrs)
80                         }
81                         lastStats = stats
82                 }
83         }()
84 }
85
86 type stringAddr string
87
88 func (stringAddr) Network() string   { return "" }
89 func (me stringAddr) String() string { return string(me) }
90
91 func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) {
92         for _, ta := range addrs {
93                 ret = append(ret, torrent.PeerInfo{
94                         Addr: stringAddr(ta),
95                 })
96         }
97         return
98 }
99
100 func addTorrents(client *torrent.Client, flags downloadFlags) error {
101         testPeers := resolveTestPeers(flags.TestPeer)
102         for _, arg := range flags.Torrent {
103                 t, err := func() (*torrent.Torrent, error) {
104                         if strings.HasPrefix(arg, "magnet:") {
105                                 t, err := client.AddMagnet(arg)
106                                 if err != nil {
107                                         return nil, fmt.Errorf("error adding magnet: %w", err)
108                                 }
109                                 return t, nil
110                         } else if strings.HasPrefix(arg, "http://") || strings.HasPrefix(arg, "https://") {
111                                 response, err := http.Get(arg)
112                                 if err != nil {
113                                         return nil, fmt.Errorf("Error downloading torrent file: %s", err)
114                                 }
115
116                                 metaInfo, err := metainfo.Load(response.Body)
117                                 defer response.Body.Close()
118                                 if err != nil {
119                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
120                                 }
121                                 t, err := client.AddTorrent(metaInfo)
122                                 if err != nil {
123                                         return nil, fmt.Errorf("adding torrent: %w", err)
124                                 }
125                                 return t, nil
126                         } else if strings.HasPrefix(arg, "infohash:") {
127                                 t, _ := client.AddTorrentInfoHash(metainfo.NewHashFromHex(strings.TrimPrefix(arg, "infohash:")))
128                                 return t, nil
129                         } else {
130                                 metaInfo, err := metainfo.LoadFromFile(arg)
131                                 if err != nil {
132                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
133                                 }
134                                 t, err := client.AddTorrent(metaInfo)
135                                 if err != nil {
136                                         return nil, fmt.Errorf("adding torrent: %w", err)
137                                 }
138                                 return t, nil
139                         }
140                 }()
141                 if err != nil {
142                         return fmt.Errorf("adding torrent for %q: %w", arg, err)
143                 }
144                 if flags.Progress {
145                         torrentBar(t, flags.PieceStates)
146                 }
147                 t.AddPeers(testPeers)
148                 go func() {
149                         <-t.GotInfo()
150                         if len(flags.File) == 0 {
151                                 t.DownloadAll()
152                         } else {
153                                 for _, f := range t.Files() {
154                                         for _, fileArg := range flags.File {
155                                                 if f.DisplayPath() == fileArg {
156                                                         f.Download()
157                                                 }
158                                         }
159                                 }
160                         }
161                 }()
162         }
163         return nil
164 }
165
166 type downloadFlags struct {
167         Debug bool
168         DownloadCmd
169 }
170
171 type DownloadCmd struct {
172         Mmap               bool           `help:"memory-map torrent data"`
173         TestPeer           []string       `help:"addresses of some starting peers"`
174         Seed               bool           `help:"seed after download is complete"`
175         Addr               string         `help:"network listen addr"`
176         MaxUnverifiedBytes tagflag.Bytes  `help:"maximum number bytes to have pending verification"`
177         UploadRate         *tagflag.Bytes `help:"max piece bytes to send per second"`
178         DownloadRate       *tagflag.Bytes `help:"max bytes per second down from peers"`
179         PackedBlocklist    string
180         PublicIP           net.IP
181         Progress           bool `default:"true"`
182         PieceStates        bool
183         Quiet              bool `help:"discard client logging"`
184         Stats              bool `help:"print stats at termination"`
185         Dht                bool `default:"true"`
186
187         TcpPeers        bool `default:"true"`
188         UtpPeers        bool `default:"true"`
189         Webtorrent      bool `default:"true"`
190         DisableWebseeds bool
191         // Don't progress past handshake for peer connections where the peer doesn't offer the fast
192         // extension.
193         RequireFastExtension bool
194
195         Ipv4 bool `default:"true"`
196         Ipv6 bool `default:"true"`
197         Pex  bool `default:"true"`
198
199         File    []string
200         Torrent []string `arity:"+" help:"torrent file path or magnet uri" arg:"positional"`
201 }
202
203 func statsEnabled(flags downloadFlags) bool {
204         return flags.Stats
205 }
206
207 func exitSignalHandlers(notify *missinggo.SynchronizedEvent) {
208         c := make(chan os.Signal, 1)
209         signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
210         for {
211                 log.Printf("close signal received: %+v", <-c)
212                 notify.Set()
213         }
214 }
215
216 func main() {
217         if err := mainErr(); err != nil {
218                 log.Printf("error in main: %v", err)
219                 os.Exit(1)
220         }
221 }
222
223 func mainErr() error {
224         defer envpprof.Stop()
225         stdLog.SetFlags(stdLog.Flags() | stdLog.Lshortfile)
226         debug := args.Flag(args.FlagOpt{Long: "debug"})
227         args.ParseMain(
228                 debug,
229                 args.Subcommand("metainfo", metainfoCmd),
230                 args.Subcommand("announce", func(p args.SubCmdCtx) error {
231                         var cmd AnnounceCmd
232                         err := p.NewParser().AddParams(
233                                 args.Pos("tracker", &cmd.Tracker),
234                                 args.Pos("infohash", &cmd.InfoHash)).Parse()
235                         if err != nil {
236                                 return err
237                         }
238                         return announceErr(cmd)
239                 }),
240                 args.Subcommand("download", func(p args.SubCmdCtx) error {
241                         var dlc DownloadCmd
242                         err := p.NewParser().AddParams(
243                                 append(args.FromStruct(&dlc), debug)...,
244                         ).Parse()
245                         if err != nil {
246                                 return err
247                         }
248                         dlf := downloadFlags{
249                                 Debug:       debug.Bool(),
250                                 DownloadCmd: dlc,
251                         }
252                         p.Defer(func() error {
253                                 return downloadErr(dlf)
254                         })
255                         return nil
256                 }),
257                 args.Subcommand(
258                         "bencode",
259                         func(p args.SubCmdCtx) error {
260                                 var print func(interface{}) error
261                                 if !p.Parse(
262                                         args.Subcommand("json", func(ctx args.SubCmdCtx) (err error) {
263                                                 ctx.Parse()
264                                                 je := json.NewEncoder(os.Stdout)
265                                                 je.SetIndent("", "  ")
266                                                 print = je.Encode
267                                                 return nil
268                                         }),
269                                         args.Subcommand("spew", func(ctx args.SubCmdCtx) (err error) {
270                                                 ctx.Parse()
271                                                 config := spew.NewDefaultConfig()
272                                                 config.DisableCapacities = true
273                                                 config.Indent = "  "
274                                                 print = func(v interface{}) error {
275                                                         config.Dump(v)
276                                                         return nil
277                                                 }
278                                                 return nil
279                                         }),
280                                 ).RanSubCmd {
281                                         return errors.New("an output type is required")
282                                 }
283                                 d := bencode.NewDecoder(os.Stdin)
284                                 p.Defer(func() error {
285                                         for i := 0; ; i++ {
286                                                 var v interface{}
287                                                 err := d.Decode(&v)
288                                                 if err == io.EOF {
289                                                         break
290                                                 }
291                                                 if err != nil {
292                                                         return fmt.Errorf("decoding message index %d: %w", i, err)
293                                                 }
294                                                 print(v)
295                                         }
296                                         return nil
297                                 })
298                                 return nil
299                         },
300                         args.Help("reads bencoding from stdin into Go native types and spews the result"),
301                 ),
302                 args.Subcommand("version", func(p args.SubCmdCtx) error {
303                         fmt.Printf("HTTP User-Agent: %q\n", version.DefaultHttpUserAgent)
304                         fmt.Printf("Torrent client version: %q\n", version.DefaultExtendedHandshakeClientVersion)
305                         fmt.Printf("Torrent version prefix: %q\n", version.DefaultBep20Prefix)
306                         return nil
307                 }),
308         )
309         return nil
310 }
311
312 func downloadErr(flags downloadFlags) error {
313         clientConfig := torrent.NewDefaultClientConfig()
314         clientConfig.DisableWebseeds = flags.DisableWebseeds
315         clientConfig.DisableTCP = !flags.TcpPeers
316         clientConfig.DisableUTP = !flags.UtpPeers
317         clientConfig.DisableIPv4 = !flags.Ipv4
318         clientConfig.DisableIPv6 = !flags.Ipv6
319         clientConfig.DisableAcceptRateLimiting = true
320         clientConfig.NoDHT = !flags.Dht
321         clientConfig.Debug = flags.Debug
322         clientConfig.Seed = flags.Seed
323         clientConfig.PublicIp4 = flags.PublicIP
324         clientConfig.PublicIp6 = flags.PublicIP
325         clientConfig.DisablePEX = !flags.Pex
326         clientConfig.DisableWebtorrent = !flags.Webtorrent
327         if flags.PackedBlocklist != "" {
328                 blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
329                 if err != nil {
330                         return fmt.Errorf("loading blocklist: %v", err)
331                 }
332                 defer blocklist.Close()
333                 clientConfig.IPBlocklist = blocklist
334         }
335         if flags.Mmap {
336                 clientConfig.DefaultStorage = storage.NewMMap("")
337         }
338         if flags.Addr != "" {
339                 clientConfig.SetListenAddr(flags.Addr)
340         }
341         if flags.UploadRate != nil {
342                 clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(*flags.UploadRate), 256<<10)
343         }
344         if flags.DownloadRate != nil {
345                 clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(*flags.DownloadRate), 1<<20)
346         }
347         if flags.Quiet {
348                 clientConfig.Logger = log.Discard
349         }
350         if flags.RequireFastExtension {
351                 clientConfig.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true)
352         }
353         clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
354
355         var stop missinggo.SynchronizedEvent
356         defer func() {
357                 stop.Set()
358         }()
359
360         client, err := torrent.NewClient(clientConfig)
361         if err != nil {
362                 return fmt.Errorf("creating client: %w", err)
363         }
364         var clientClose sync.Once //In certain situations, close was being called more than once.
365         defer clientClose.Do(func() { client.Close() })
366         go exitSignalHandlers(&stop)
367         go func() {
368                 <-stop.C()
369                 clientClose.Do(func() { client.Close() })
370         }()
371
372         // Write status on the root path on the default HTTP muxer. This will be bound to localhost
373         // somewhere if GOPPROF is set, thanks to the envpprof import.
374         http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
375                 client.WriteStatus(w)
376         })
377         err = addTorrents(client, flags)
378         started := time.Now()
379         if err != nil {
380                 return fmt.Errorf("adding torrents: %w", err)
381         }
382         defer outputStats(client, flags)
383         if client.WaitAll() {
384                 log.Print("downloaded ALL the torrents")
385         } else {
386                 err = errors.New("y u no complete torrents?!")
387         }
388         clientConnStats := client.ConnStats()
389         log.Printf("average download rate: %v",
390                 humanize.Bytes(uint64(
391                         time.Duration(
392                                 clientConnStats.BytesReadUsefulData.Int64(),
393                         )*time.Second/time.Since(started),
394                 )))
395         if flags.Seed {
396                 if len(client.Torrents()) == 0 {
397                         log.Print("no torrents to seed")
398                 } else {
399                         outputStats(client, flags)
400                         <-stop.C()
401                 }
402         }
403         spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
404         spew.Dump(client.ConnStats())
405         clStats := client.ConnStats()
406         sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64()
407         log.Printf(
408                 "client read %v, %.1f%% was useful data. sent %v non-data bytes",
409                 humanize.Bytes(uint64(clStats.BytesRead.Int64())),
410                 100*float64(clStats.BytesReadUsefulData.Int64())/float64(clStats.BytesRead.Int64()),
411                 humanize.Bytes(uint64(sentOverhead)))
412         return err
413 }
414
415 func outputStats(cl *torrent.Client, args downloadFlags) {
416         if !statsEnabled(args) {
417                 return
418         }
419         expvar.Do(func(kv expvar.KeyValue) {
420                 fmt.Printf("%s: %s\n", kv.Key, kv.Value)
421         })
422         cl.WriteStatus(os.Stdout)
423 }