]> Sergey Matveev's repositories - btrtrc.git/blob - cmd/torrent/download.go
a8b5bac4686977815d803eed2f9155d23d6f7e19
[btrtrc.git] / cmd / torrent / download.go
1 package main
2
3 import (
4         "errors"
5         "expvar"
6         "fmt"
7         "net"
8         "net/http"
9         "os"
10         "os/signal"
11         "strings"
12         "sync"
13         "syscall"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo/v2"
18         "github.com/anacrolix/tagflag"
19         "github.com/anacrolix/torrent"
20         "github.com/anacrolix/torrent/iplist"
21         "github.com/anacrolix/torrent/metainfo"
22         pp "github.com/anacrolix/torrent/peer_protocol"
23         "github.com/anacrolix/torrent/storage"
24         "github.com/davecgh/go-spew/spew"
25         "github.com/dustin/go-humanize"
26         "golang.org/x/time/rate"
27 )
28
29 func torrentBar(t *torrent.Torrent, pieceStates bool) {
30         go func() {
31                 start := time.Now()
32                 if t.Info() == nil {
33                         fmt.Printf("%v: getting torrent info for %q\n", time.Since(start), t.Name())
34                         <-t.GotInfo()
35                 }
36                 lastStats := t.Stats()
37                 var lastLine string
38                 interval := 3 * time.Second
39                 for range time.Tick(interval) {
40                         var completedPieces, partialPieces int
41                         psrs := t.PieceStateRuns()
42                         for _, r := range psrs {
43                                 if r.Complete {
44                                         completedPieces += r.Length
45                                 }
46                                 if r.Partial {
47                                         partialPieces += r.Length
48                                 }
49                         }
50                         stats := t.Stats()
51                         byteRate := int64(time.Second)
52                         byteRate *= stats.BytesReadUsefulData.Int64() - lastStats.BytesReadUsefulData.Int64()
53                         byteRate /= int64(interval)
54                         line := fmt.Sprintf(
55                                 "%v: downloading %q: %s/%s, %d/%d pieces completed (%d partial): %v/s\n",
56                                 time.Since(start),
57                                 t.Name(),
58                                 humanize.Bytes(uint64(t.BytesCompleted())),
59                                 humanize.Bytes(uint64(t.Length())),
60                                 completedPieces,
61                                 t.NumPieces(),
62                                 partialPieces,
63                                 humanize.Bytes(uint64(byteRate)),
64                         )
65                         if line != lastLine {
66                                 lastLine = line
67                                 os.Stdout.WriteString(line)
68                         }
69                         if pieceStates {
70                                 fmt.Println(psrs)
71                         }
72                         lastStats = stats
73                 }
74         }()
75 }
76
77 type stringAddr string
78
79 func (stringAddr) Network() string   { return "" }
80 func (me stringAddr) String() string { return string(me) }
81
82 func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) {
83         for _, ta := range addrs {
84                 ret = append(ret, torrent.PeerInfo{
85                         Addr: stringAddr(ta),
86                 })
87         }
88         return
89 }
90
91 func addTorrents(client *torrent.Client, flags downloadFlags) error {
92         testPeers := resolveTestPeers(flags.TestPeer)
93         for _, arg := range flags.Torrent {
94                 t, err := func() (*torrent.Torrent, error) {
95                         if strings.HasPrefix(arg, "magnet:") {
96                                 t, err := client.AddMagnet(arg)
97                                 if err != nil {
98                                         return nil, fmt.Errorf("error adding magnet: %w", err)
99                                 }
100                                 return t, nil
101                         } else if strings.HasPrefix(arg, "http://") || strings.HasPrefix(arg, "https://") {
102                                 response, err := http.Get(arg)
103                                 if err != nil {
104                                         return nil, fmt.Errorf("Error downloading torrent file: %s", err)
105                                 }
106
107                                 metaInfo, err := metainfo.Load(response.Body)
108                                 defer response.Body.Close()
109                                 if err != nil {
110                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
111                                 }
112                                 t, err := client.AddTorrent(metaInfo)
113                                 if err != nil {
114                                         return nil, fmt.Errorf("adding torrent: %w", err)
115                                 }
116                                 return t, nil
117                         } else if strings.HasPrefix(arg, "infohash:") {
118                                 t, _ := client.AddTorrentInfoHash(metainfo.NewHashFromHex(strings.TrimPrefix(arg, "infohash:")))
119                                 return t, nil
120                         } else {
121                                 metaInfo, err := metainfo.LoadFromFile(arg)
122                                 if err != nil {
123                                         return nil, fmt.Errorf("error loading torrent file %q: %s\n", arg, err)
124                                 }
125                                 t, err := client.AddTorrent(metaInfo)
126                                 if err != nil {
127                                         return nil, fmt.Errorf("adding torrent: %w", err)
128                                 }
129                                 return t, nil
130                         }
131                 }()
132                 if err != nil {
133                         return fmt.Errorf("adding torrent for %q: %w", arg, err)
134                 }
135                 if flags.Progress {
136                         torrentBar(t, flags.PieceStates)
137                 }
138                 t.AddPeers(testPeers)
139                 go func() {
140                         <-t.GotInfo()
141                         if len(flags.File) == 0 {
142                                 t.DownloadAll()
143                         } else {
144                                 for _, f := range t.Files() {
145                                         for _, fileArg := range flags.File {
146                                                 if f.DisplayPath() == fileArg {
147                                                         f.Download()
148                                                 }
149                                         }
150                                 }
151                         }
152                 }()
153         }
154         return nil
155 }
156
157 type downloadFlags struct {
158         Debug bool
159         DownloadCmd
160 }
161
162 type DownloadCmd struct {
163         Mmap               bool           `help:"memory-map torrent data"`
164         TestPeer           []string       `help:"addresses of some starting peers"`
165         Seed               bool           `help:"seed after download is complete"`
166         Addr               string         `help:"network listen addr"`
167         MaxUnverifiedBytes tagflag.Bytes  `help:"maximum number bytes to have pending verification"`
168         UploadRate         *tagflag.Bytes `help:"max piece bytes to send per second"`
169         DownloadRate       *tagflag.Bytes `help:"max bytes per second down from peers"`
170         PackedBlocklist    string
171         PublicIP           net.IP
172         Progress           bool `default:"true"`
173         PieceStates        bool
174         Quiet              bool `help:"discard client logging"`
175         Stats              bool `help:"print stats at termination"`
176         Dht                bool `default:"true"`
177
178         TcpPeers        bool `default:"true"`
179         UtpPeers        bool `default:"true"`
180         Webtorrent      bool `default:"true"`
181         DisableWebseeds bool
182         // Don't progress past handshake for peer connections where the peer doesn't offer the fast
183         // extension.
184         RequireFastExtension bool
185
186         Ipv4 bool `default:"true"`
187         Ipv6 bool `default:"true"`
188         Pex  bool `default:"true"`
189
190         File    []string
191         Torrent []string `arity:"+" help:"torrent file path or magnet uri" arg:"positional"`
192 }
193
194 func statsEnabled(flags downloadFlags) bool {
195         return flags.Stats
196 }
197
198 func exitSignalHandlers(notify *missinggo.SynchronizedEvent) {
199         c := make(chan os.Signal, 1)
200         signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
201         for {
202                 log.Printf("close signal received: %+v", <-c)
203                 notify.Set()
204         }
205 }
206
207 func downloadErr(flags downloadFlags) error {
208         clientConfig := torrent.NewDefaultClientConfig()
209         clientConfig.DisableWebseeds = flags.DisableWebseeds
210         clientConfig.DisableTCP = !flags.TcpPeers
211         clientConfig.DisableUTP = !flags.UtpPeers
212         clientConfig.DisableIPv4 = !flags.Ipv4
213         clientConfig.DisableIPv6 = !flags.Ipv6
214         clientConfig.DisableAcceptRateLimiting = true
215         clientConfig.NoDHT = !flags.Dht
216         clientConfig.Debug = flags.Debug
217         clientConfig.Seed = flags.Seed
218         clientConfig.PublicIp4 = flags.PublicIP
219         clientConfig.PublicIp6 = flags.PublicIP
220         clientConfig.DisablePEX = !flags.Pex
221         clientConfig.DisableWebtorrent = !flags.Webtorrent
222         if flags.PackedBlocklist != "" {
223                 blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
224                 if err != nil {
225                         return fmt.Errorf("loading blocklist: %v", err)
226                 }
227                 defer blocklist.Close()
228                 clientConfig.IPBlocklist = blocklist
229         }
230         if flags.Mmap {
231                 clientConfig.DefaultStorage = storage.NewMMap("")
232         }
233         if flags.Addr != "" {
234                 clientConfig.SetListenAddr(flags.Addr)
235         }
236         if flags.UploadRate != nil {
237                 clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(*flags.UploadRate), 256<<10)
238         }
239         if flags.DownloadRate != nil {
240                 clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(*flags.DownloadRate), 1<<20)
241         }
242         if flags.Quiet {
243                 clientConfig.Logger = log.Discard
244         }
245         if flags.RequireFastExtension {
246                 clientConfig.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true)
247         }
248         clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
249
250         var stop missinggo.SynchronizedEvent
251         defer func() {
252                 stop.Set()
253         }()
254
255         client, err := torrent.NewClient(clientConfig)
256         if err != nil {
257                 return fmt.Errorf("creating client: %w", err)
258         }
259         var clientClose sync.Once // In certain situations, close was being called more than once.
260         defer clientClose.Do(func() { client.Close() })
261         go exitSignalHandlers(&stop)
262         go func() {
263                 <-stop.C()
264                 clientClose.Do(func() { client.Close() })
265         }()
266
267         // Write status on the root path on the default HTTP muxer. This will be bound to localhost
268         // somewhere if GOPPROF is set, thanks to the envpprof import.
269         http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
270                 client.WriteStatus(w)
271         })
272         err = addTorrents(client, flags)
273         started := time.Now()
274         if err != nil {
275                 return fmt.Errorf("adding torrents: %w", err)
276         }
277         defer outputStats(client, flags)
278         if client.WaitAll() {
279                 log.Print("downloaded ALL the torrents")
280         } else {
281                 err = errors.New("y u no complete torrents?!")
282         }
283         clientConnStats := client.ConnStats()
284         log.Printf("average download rate: %v",
285                 humanize.Bytes(uint64(
286                         time.Duration(
287                                 clientConnStats.BytesReadUsefulData.Int64(),
288                         )*time.Second/time.Since(started),
289                 )))
290         if flags.Seed {
291                 if len(client.Torrents()) == 0 {
292                         log.Print("no torrents to seed")
293                 } else {
294                         outputStats(client, flags)
295                         <-stop.C()
296                 }
297         }
298         spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
299         spew.Dump(client.ConnStats())
300         clStats := client.ConnStats()
301         sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64()
302         log.Printf(
303                 "client read %v, %.1f%% was useful data. sent %v non-data bytes",
304                 humanize.Bytes(uint64(clStats.BytesRead.Int64())),
305                 100*float64(clStats.BytesReadUsefulData.Int64())/float64(clStats.BytesRead.Int64()),
306                 humanize.Bytes(uint64(sentOverhead)))
307         return err
308 }
309
310 func outputStats(cl *torrent.Client, args downloadFlags) {
311         if !statsEnabled(args) {
312                 return
313         }
314         expvar.Do(func(kv expvar.KeyValue) {
315                 fmt.Printf("%s: %s\n", kv.Key, kv.Value)
316         })
317         cl.WriteStatus(os.Stdout)
318 }