]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
40dc88af5045630c5027e8af514d9234f155430b
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "net/url"
14         "strconv"
15         "strings"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/dht/krpc"
20         "github.com/anacrolix/log"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/pproffd"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/anacrolix/sync"
26         "github.com/dustin/go-humanize"
27         "golang.org/x/time/rate"
28
29         "github.com/anacrolix/torrent/bencode"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/mse"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 // Clients contain zero or more Torrents. A Client manages a blocklist, the
38 // TCP/UDP protocol ports, and DHT as desired.
39 type Client struct {
40         mu     sync.RWMutex
41         event  sync.Cond
42         closed missinggo.Event
43
44         config Config
45         logger *log.Logger
46
47         halfOpenLimit  int
48         peerID         PeerID
49         defaultStorage *storage.Client
50         onClose        []func()
51         tcpListener    net.Listener
52         utpSock        utpSocket
53         dHT            *dht.Server
54         ipBlockList    iplist.Ranger
55         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
56         extensionBytes peerExtensionBytes
57         // The net.Addr.String part that should be common to all active listeners.
58         listenAddr    string
59         uploadLimit   *rate.Limiter
60         downloadLimit *rate.Limiter
61
62         // Set of addresses that have our client ID. This intentionally will
63         // include ourselves if we end up trying to connect to our own address
64         // through legitimate channels.
65         dopplegangerAddrs map[string]struct{}
66         badPeerIPs        map[string]struct{}
67         torrents          map[metainfo.Hash]*Torrent
68 }
69
70 func (cl *Client) BadPeerIPs() []string {
71         cl.mu.RLock()
72         defer cl.mu.RUnlock()
73         return cl.badPeerIPsLocked()
74 }
75
76 func (cl *Client) badPeerIPsLocked() []string {
77         return slices.FromMapKeys(cl.badPeerIPs).([]string)
78 }
79
80 func (cl *Client) IPBlockList() iplist.Ranger {
81         cl.mu.Lock()
82         defer cl.mu.Unlock()
83         return cl.ipBlockList
84 }
85
86 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
87         cl.mu.Lock()
88         defer cl.mu.Unlock()
89         cl.ipBlockList = list
90         if cl.dHT != nil {
91                 cl.dHT.SetIPBlockList(list)
92         }
93 }
94
95 func (cl *Client) PeerID() PeerID {
96         return cl.peerID
97 }
98
99 type torrentAddr string
100
101 func (torrentAddr) Network() string { return "" }
102
103 func (me torrentAddr) String() string { return string(me) }
104
105 func (cl *Client) ListenAddr() net.Addr {
106         if cl.listenAddr == "" {
107                 return nil
108         }
109         return torrentAddr(cl.listenAddr)
110 }
111
112 // Writes out a human readable status of the client, such as for writing to a
113 // HTTP status page.
114 func (cl *Client) WriteStatus(_w io.Writer) {
115         cl.mu.Lock()
116         defer cl.mu.Unlock()
117         w := bufio.NewWriter(_w)
118         defer w.Flush()
119         if addr := cl.ListenAddr(); addr != nil {
120                 fmt.Fprintf(w, "Listening on %s\n", addr)
121         } else {
122                 fmt.Fprintln(w, "Not listening!")
123         }
124         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126         if dht := cl.DHT(); dht != nil {
127                 dhtStats := dht.Stats()
128                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
129                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
130                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
131                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
132                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
133         }
134         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135         fmt.Fprintln(w)
136         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137                 return l.InfoHash().AsString() < r.InfoHash().AsString()
138         }).([]*Torrent) {
139                 if t.name() == "" {
140                         fmt.Fprint(w, "<unknown name>")
141                 } else {
142                         fmt.Fprint(w, t.name())
143                 }
144                 fmt.Fprint(w, "\n")
145                 if t.info != nil {
146                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147                 } else {
148                         w.WriteString("<missing metainfo>")
149                 }
150                 fmt.Fprint(w, "\n")
151                 t.writeStatus(w)
152                 fmt.Fprintln(w)
153         }
154 }
155
156 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
157         return NewUtpSocket("udp"+networkSuffix, addr)
158 }
159
160 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
161         return net.Listen("tcp"+networkSuffix, addr)
162 }
163
164 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
165         for {
166                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
167                 if err != nil {
168                         return
169                 }
170                 listenedAddr = tcpL.Addr().String()
171                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
172                 if err == nil {
173                         return
174                 }
175                 tcpL.Close()
176                 if !strings.Contains(err.Error(), "address already in use") {
177                         return
178                 }
179         }
180 }
181
182 // Listen to enabled protocols, ensuring ports match.
183 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
184         if addr == "" {
185                 addr = ":50007"
186         }
187         if tcp && utp {
188                 var host string
189                 var port int
190                 host, port, err = missinggo.ParseHostPort(addr)
191                 if err != nil {
192                         return
193                 }
194                 if port == 0 {
195                         // If both protocols are active, they need to have the same port.
196                         return listenBothSameDynamicPort(networkSuffix, host)
197                 }
198         }
199         defer func() {
200                 if err != nil {
201                         listenedAddr = ""
202                 }
203         }()
204         if tcp {
205                 tcpL, err = listenTCP(networkSuffix, addr)
206                 if err != nil {
207                         return
208                 }
209                 defer func() {
210                         if err != nil {
211                                 tcpL.Close()
212                         }
213                 }()
214                 listenedAddr = tcpL.Addr().String()
215         }
216         if utp {
217                 utpSock, err = listenUTP(networkSuffix, addr)
218                 if err != nil {
219                         return
220                 }
221                 listenedAddr = utpSock.Addr().String()
222         }
223         return
224 }
225
226 const debugLogValue = "debug"
227
228 func (cl *Client) debugLogFilter(m *log.Msg) bool {
229         if !cl.config.Debug {
230                 _, ok := m.Values()[debugLogValue]
231                 return !ok
232         }
233         return true
234 }
235
236 func (cl *Client) initLogger() {
237         cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
238 }
239
240 // Creates a new client.
241 func NewClient(cfg *Config) (cl *Client, err error) {
242         if cfg == nil {
243                 cfg = &Config{
244                         DHTConfig: dht.ServerConfig{
245                                 StartingNodes: dht.GlobalBootstrapAddrs,
246                         },
247                 }
248         }
249         if cfg == nil {
250                 cfg = &Config{}
251         }
252         cfg.setDefaults()
253
254         defer func() {
255                 if err != nil {
256                         cl = nil
257                 }
258         }()
259         cl = &Client{
260                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
261                 config:            *cfg,
262                 dopplegangerAddrs: make(map[string]struct{}),
263                 torrents:          make(map[metainfo.Hash]*Torrent),
264         }
265         cl.initLogger()
266         defer func() {
267                 if err == nil {
268                         return
269                 }
270                 cl.Close()
271         }()
272         if cfg.UploadRateLimiter == nil {
273                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
274         } else {
275                 cl.uploadLimit = cfg.UploadRateLimiter
276         }
277         if cfg.DownloadRateLimiter == nil {
278                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
279         } else {
280                 cl.downloadLimit = cfg.DownloadRateLimiter
281         }
282         cl.extensionBytes = defaultPeerExtensionBytes()
283         cl.event.L = &cl.mu
284         storageImpl := cfg.DefaultStorage
285         if storageImpl == nil {
286                 // We'd use mmap but HFS+ doesn't support sparse files.
287                 storageImpl = storage.NewFile(cfg.DataDir)
288                 cl.onClose = append(cl.onClose, func() {
289                         if err := storageImpl.Close(); err != nil {
290                                 log.Printf("error closing default storage: %s", err)
291                         }
292                 })
293         }
294         cl.defaultStorage = storage.NewClient(storageImpl)
295         if cfg.IPBlocklist != nil {
296                 cl.ipBlockList = cfg.IPBlocklist
297         }
298
299         if cfg.PeerID != "" {
300                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
301         } else {
302                 o := copy(cl.peerID[:], cfg.Bep20)
303                 _, err = rand.Read(cl.peerID[o:])
304                 if err != nil {
305                         panic("error generating peer id")
306                 }
307         }
308
309         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
310                 !cl.config.DisableTCP,
311                 !cl.config.DisableUTP,
312                 func() string {
313                         if cl.config.DisableIPv6 {
314                                 return "4"
315                         } else {
316                                 return ""
317                         }
318                 }(),
319                 cl.config.ListenAddr)
320         if err != nil {
321                 return
322         }
323         go cl.forwardPort()
324         if cl.tcpListener != nil {
325                 go cl.acceptConnections(cl.tcpListener, false)
326         }
327         if cl.utpSock != nil {
328                 go cl.acceptConnections(cl.utpSock, true)
329         }
330         if !cfg.NoDHT {
331                 dhtCfg := cfg.DHTConfig
332                 if dhtCfg.IPBlocklist == nil {
333                         dhtCfg.IPBlocklist = cl.ipBlockList
334                 }
335                 if dhtCfg.Conn == nil {
336                         if cl.utpSock != nil {
337                                 dhtCfg.Conn = cl.utpSock
338                         } else {
339                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
340                                 if err != nil {
341                                         return
342                                 }
343                         }
344                 }
345                 if dhtCfg.OnAnnouncePeer == nil {
346                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
347                 }
348                 cl.dHT, err = dht.NewServer(&dhtCfg)
349                 if err != nil {
350                         return
351                 }
352                 go func() {
353                         if _, err := cl.dHT.Bootstrap(); err != nil {
354                                 log.Printf("error bootstrapping dht: %s", err)
355                         }
356                 }()
357         }
358
359         return
360 }
361
362 func firstNonEmptyString(ss ...string) string {
363         for _, s := range ss {
364                 if s != "" {
365                         return s
366                 }
367         }
368         return ""
369 }
370
371 func (cl *Client) Closed() <-chan struct{} {
372         cl.mu.Lock()
373         defer cl.mu.Unlock()
374         return cl.closed.C()
375 }
376
377 // Stops the client. All connections to peers are closed and all activity will
378 // come to a halt.
379 func (cl *Client) Close() {
380         cl.mu.Lock()
381         defer cl.mu.Unlock()
382         cl.closed.Set()
383         if cl.dHT != nil {
384                 cl.dHT.Close()
385         }
386         if cl.utpSock != nil {
387                 cl.utpSock.Close()
388         }
389         if cl.tcpListener != nil {
390                 cl.tcpListener.Close()
391         }
392         for _, t := range cl.torrents {
393                 t.close()
394         }
395         for _, f := range cl.onClose {
396                 f()
397         }
398         cl.event.Broadcast()
399 }
400
401 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
402         if cl.ipBlockList == nil {
403                 return
404         }
405         return cl.ipBlockList.Lookup(ip)
406 }
407
408 func (cl *Client) waitAccept() {
409         for {
410                 for _, t := range cl.torrents {
411                         if t.wantConns() {
412                                 return
413                         }
414                 }
415                 if cl.closed.IsSet() {
416                         return
417                 }
418                 cl.event.Wait()
419         }
420 }
421
422 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
423         cl.mu.Lock()
424         defer cl.mu.Unlock()
425         for {
426                 cl.waitAccept()
427                 cl.mu.Unlock()
428                 conn, err := l.Accept()
429                 conn = pproffd.WrapNetConn(conn)
430                 cl.mu.Lock()
431                 if cl.closed.IsSet() {
432                         if conn != nil {
433                                 conn.Close()
434                         }
435                         return
436                 }
437                 if err != nil {
438                         log.Print(err)
439                         // I think something harsher should happen here? Our accept
440                         // routine just fucked off.
441                         return
442                 }
443                 if utp {
444                         acceptUTP.Add(1)
445                 } else {
446                         acceptTCP.Add(1)
447                 }
448                 if cl.config.Debug {
449                         log.Printf("accepted connection from %s", conn.RemoteAddr())
450                 }
451                 reject := cl.badPeerIPPort(
452                         missinggo.AddrIP(conn.RemoteAddr()),
453                         missinggo.AddrPort(conn.RemoteAddr()))
454                 if reject {
455                         if cl.config.Debug {
456                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
457                         }
458                         acceptReject.Add(1)
459                         conn.Close()
460                         continue
461                 }
462                 go cl.incomingConnection(conn, utp)
463         }
464 }
465
466 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
467         defer nc.Close()
468         if tc, ok := nc.(*net.TCPConn); ok {
469                 tc.SetLinger(0)
470         }
471         c := cl.newConnection(nc)
472         c.Discovery = peerSourceIncoming
473         c.uTP = utp
474         cl.runReceivedConn(c)
475 }
476
477 // Returns a handle to the given torrent, if it's present in the client.
478 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
479         cl.mu.Lock()
480         defer cl.mu.Unlock()
481         t, ok = cl.torrents[ih]
482         return
483 }
484
485 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
486         return cl.torrents[ih]
487 }
488
489 type dialResult struct {
490         Conn net.Conn
491         UTP  bool
492 }
493
494 func countDialResult(err error) {
495         if err == nil {
496                 successfulDials.Add(1)
497         } else {
498                 unsuccessfulDials.Add(1)
499         }
500 }
501
502 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
503         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
504         if ret < minDialTimeout {
505                 ret = minDialTimeout
506         }
507         return
508 }
509
510 // Returns whether an address is known to connect to a client with our own ID.
511 func (cl *Client) dopplegangerAddr(addr string) bool {
512         _, ok := cl.dopplegangerAddrs[addr]
513         return ok
514 }
515
516 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
517         d := net.Dialer{
518         // LocalAddr: cl.tcpListener.Addr(),
519         }
520         c, err = d.DialContext(ctx, "tcp", addr)
521         countDialResult(err)
522         if err == nil {
523                 c.(*net.TCPConn).SetLinger(0)
524         }
525         c = pproffd.WrapNetConn(c)
526         return
527 }
528
529 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
530         c, err = cl.utpSock.DialContext(ctx, addr)
531         countDialResult(err)
532         return
533 }
534
535 var (
536         dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
537         dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
538 )
539
540 // Returns a connection over UTP or TCP, whichever is first to connect.
541 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
542         ctx, cancel := context.WithCancel(ctx)
543         // As soon as we return one connection, cancel the others.
544         defer cancel()
545         left := 0
546         resCh := make(chan dialResult, left)
547         if !cl.config.DisableUTP {
548                 left++
549                 go func() {
550                         c, _ := cl.dialUTP(ctx, addr)
551                         resCh <- dialResult{c, true}
552                 }()
553         }
554         if !cl.config.DisableTCP {
555                 left++
556                 go func() {
557                         c, _ := cl.dialTCP(ctx, addr)
558                         resCh <- dialResult{c, false}
559                 }()
560         }
561         var res dialResult
562         // Wait for a successful connection.
563         for ; left > 0 && res.Conn == nil; left-- {
564                 res = <-resCh
565         }
566         if left > 0 {
567                 // There are still incompleted dials.
568                 go func() {
569                         for ; left > 0; left-- {
570                                 conn := (<-resCh).Conn
571                                 if conn != nil {
572                                         conn.Close()
573                                 }
574                         }
575                 }()
576         }
577         conn = res.Conn
578         utp = res.UTP
579         if conn != nil {
580                 if utp {
581                         dialledFirstUtp.Add(1)
582                 } else {
583                         dialledFirstNotUtp.Add(1)
584                 }
585         }
586         return
587 }
588
589 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
590         if _, ok := t.halfOpen[addr]; !ok {
591                 panic("invariant broken")
592         }
593         delete(t.halfOpen, addr)
594         t.openNewConns()
595 }
596
597 // Performs initiator handshakes and returns a connection. Returns nil
598 // *connection if no connection for valid reasons.
599 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
600         c = cl.newConnection(nc)
601         c.headerEncrypted = encryptHeader
602         c.uTP = utp
603         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
604         defer cancel()
605         dl, ok := ctx.Deadline()
606         if !ok {
607                 panic(ctx)
608         }
609         err = nc.SetDeadline(dl)
610         if err != nil {
611                 panic(err)
612         }
613         ok, err = cl.initiateHandshakes(c, t)
614         if !ok {
615                 c = nil
616         }
617         return
618 }
619
620 var (
621         initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
622         initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
623 )
624
625 // Returns nil connection and nil error if no connection could be established
626 // for valid reasons.
627 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
628         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
629         defer cancel()
630         nc, utp := cl.dialFirst(ctx, addr)
631         if nc == nil {
632                 return
633         }
634         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
635         c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
636         if err != nil {
637                 // log.Printf("error initiating connection handshakes: %s", err)
638                 nc.Close()
639                 return
640         } else if c != nil {
641                 initiatedConnWithPreferredHeaderEncryption.Add(1)
642                 return
643         }
644         nc.Close()
645         if cl.config.ForceEncryption {
646                 // We should have just tried with an obfuscated header. A plaintext
647                 // header can't result in an encrypted connection, so we're done.
648                 if !obfuscatedHeaderFirst {
649                         panic(cl.config.EncryptionPolicy)
650                 }
651                 return
652         }
653         // Try again with encryption if we didn't earlier, or without if we did,
654         // using whichever protocol type worked last time.
655         if utp {
656                 nc, err = cl.dialUTP(ctx, addr)
657         } else {
658                 nc, err = cl.dialTCP(ctx, addr)
659         }
660         if err != nil {
661                 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
662                 return
663         }
664         c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
665         if err != nil || c == nil {
666                 nc.Close()
667         }
668         if err == nil && c != nil {
669                 initiatedConnWithFallbackHeaderEncryption.Add(1)
670         }
671         return
672 }
673
674 // Called to dial out and run a connection. The addr we're given is already
675 // considered half-open.
676 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
677         c, err := cl.establishOutgoingConn(t, addr)
678         cl.mu.Lock()
679         defer cl.mu.Unlock()
680         // Don't release lock between here and addConnection, unless it's for
681         // failure.
682         cl.noLongerHalfOpen(t, addr)
683         if err != nil {
684                 if cl.config.Debug {
685                         log.Printf("error establishing outgoing connection: %s", err)
686                 }
687                 return
688         }
689         if c == nil {
690                 return
691         }
692         defer c.Close()
693         c.Discovery = ps
694         cl.runHandshookConn(c, t, true)
695 }
696
697 // The port number for incoming peer connections. 0 if the client isn't
698 // listening.
699 func (cl *Client) incomingPeerPort() int {
700         if cl.listenAddr == "" {
701                 return 0
702         }
703         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
704         if err != nil {
705                 panic(err)
706         }
707         return port
708 }
709
710 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
711         if c.headerEncrypted {
712                 var rw io.ReadWriter
713                 rw, err = mse.InitiateHandshake(
714                         struct {
715                                 io.Reader
716                                 io.Writer
717                         }{c.r, c.w},
718                         t.infoHash[:],
719                         nil,
720                         func() uint32 {
721                                 switch {
722                                 case cl.config.ForceEncryption:
723                                         return mse.CryptoMethodRC4
724                                 case cl.config.DisableEncryption:
725                                         return mse.CryptoMethodPlaintext
726                                 default:
727                                         return mse.AllSupportedCrypto
728                                 }
729                         }(),
730                 )
731                 c.setRW(rw)
732                 if err != nil {
733                         return
734                 }
735         }
736         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
737         if ih != t.infoHash {
738                 ok = false
739         }
740         return
741 }
742
743 // Calls f with any secret keys.
744 func (cl *Client) forSkeys(f func([]byte) bool) {
745         cl.mu.Lock()
746         defer cl.mu.Unlock()
747         for ih := range cl.torrents {
748                 if !f(ih[:]) {
749                         break
750                 }
751         }
752 }
753
754 // Do encryption and bittorrent handshakes as receiver.
755 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
756         var rw io.ReadWriter
757         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
758         c.setRW(rw)
759         if err != nil {
760                 if err == mse.ErrNoSecretKeyMatch {
761                         err = nil
762                 }
763                 return
764         }
765         if cl.config.ForceEncryption && !c.headerEncrypted {
766                 err = errors.New("connection not encrypted")
767                 return
768         }
769         ih, ok, err := cl.connBTHandshake(c, nil)
770         if err != nil {
771                 err = fmt.Errorf("error during bt handshake: %s", err)
772                 return
773         }
774         if !ok {
775                 return
776         }
777         cl.mu.Lock()
778         t = cl.torrents[ih]
779         cl.mu.Unlock()
780         return
781 }
782
783 // Returns !ok if handshake failed for valid reasons.
784 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
785         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
786         if err != nil || !ok {
787                 return
788         }
789         ret = res.Hash
790         c.PeerExtensionBytes = res.peerExtensionBytes
791         c.PeerID = res.PeerID
792         c.completedHandshake = time.Now()
793         return
794 }
795
796 func (cl *Client) runReceivedConn(c *connection) {
797         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
798         if err != nil {
799                 panic(err)
800         }
801         t, err := cl.receiveHandshakes(c)
802         if err != nil {
803                 if cl.config.Debug {
804                         log.Printf("error receiving handshakes: %s", err)
805                 }
806                 return
807         }
808         if t == nil {
809                 return
810         }
811         cl.mu.Lock()
812         defer cl.mu.Unlock()
813         cl.runHandshookConn(c, t, false)
814 }
815
816 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
817         t.reconcileHandshakeStats(c)
818         if c.PeerID == cl.peerID {
819                 if outgoing {
820                         connsToSelf.Add(1)
821                         addr := c.conn.RemoteAddr().String()
822                         cl.dopplegangerAddrs[addr] = struct{}{}
823                 } else {
824                         // Because the remote address is not necessarily the same as its
825                         // client's torrent listen address, we won't record the remote address
826                         // as a doppleganger. Instead, the initiator can record *us* as the
827                         // doppleganger.
828                 }
829                 return
830         }
831         c.conn.SetWriteDeadline(time.Time{})
832         c.r = deadlineReader{c.conn, c.r}
833         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
834         if !t.addConnection(c, outgoing) {
835                 return
836         }
837         defer t.dropConnection(c)
838         go c.writer(time.Minute)
839         cl.sendInitialMessages(c, t)
840         err := c.mainReadLoop()
841         if err != nil && cl.config.Debug {
842                 log.Printf("error during connection main read loop: %s", err)
843         }
844 }
845
846 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
847         func() {
848                 if conn.fastEnabled() {
849                         if torrent.haveAllPieces() {
850                                 conn.Post(pp.Message{Type: pp.HaveAll})
851                                 conn.sentHaves.AddRange(0, conn.t.NumPieces())
852                                 return
853                         } else if !torrent.haveAnyPieces() {
854                                 conn.Post(pp.Message{Type: pp.HaveNone})
855                                 conn.sentHaves.Clear()
856                                 return
857                         }
858                 }
859                 conn.PostBitfield()
860         }()
861         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
862                 conn.Post(pp.Message{
863                         Type:       pp.Extended,
864                         ExtendedID: pp.HandshakeExtendedID,
865                         ExtendedPayload: func() []byte {
866                                 d := map[string]interface{}{
867                                         "m": func() (ret map[string]int) {
868                                                 ret = make(map[string]int, 2)
869                                                 ret["ut_metadata"] = metadataExtendedId
870                                                 if !cl.config.DisablePEX {
871                                                         ret["ut_pex"] = pexExtendedId
872                                                 }
873                                                 return
874                                         }(),
875                                         "v": cl.config.ExtendedHandshakeClientVersion,
876                                         // No upload queue is implemented yet.
877                                         "reqq": 64,
878                                 }
879                                 if !cl.config.DisableEncryption {
880                                         d["e"] = 1
881                                 }
882                                 if torrent.metadataSizeKnown() {
883                                         d["metadata_size"] = torrent.metadataSize()
884                                 }
885                                 if p := cl.incomingPeerPort(); p != 0 {
886                                         d["p"] = p
887                                 }
888                                 yourip, err := addrCompactIP(conn.remoteAddr())
889                                 if err != nil {
890                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
891                                 } else {
892                                         d["yourip"] = yourip
893                                 }
894                                 // log.Printf("sending %v", d)
895                                 b, err := bencode.Marshal(d)
896                                 if err != nil {
897                                         panic(err)
898                                 }
899                                 return b
900                         }(),
901                 })
902         }
903         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
904                 conn.Post(pp.Message{
905                         Type: pp.Port,
906                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
907                 })
908         }
909 }
910
911 // Process incoming ut_metadata message.
912 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
913         var d map[string]int
914         err := bencode.Unmarshal(payload, &d)
915         if err != nil {
916                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
917         }
918         msgType, ok := d["msg_type"]
919         if !ok {
920                 return errors.New("missing msg_type field")
921         }
922         piece := d["piece"]
923         switch msgType {
924         case pp.DataMetadataExtensionMsgType:
925                 if !c.requestedMetadataPiece(piece) {
926                         return fmt.Errorf("got unexpected piece %d", piece)
927                 }
928                 c.metadataRequests[piece] = false
929                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
930                 if begin < 0 || begin >= len(payload) {
931                         return fmt.Errorf("data has bad offset in payload: %d", begin)
932                 }
933                 t.saveMetadataPiece(piece, payload[begin:])
934                 c.stats.ChunksReadUseful++
935                 c.t.stats.ChunksReadUseful++
936                 c.lastUsefulChunkReceived = time.Now()
937                 return t.maybeCompleteMetadata()
938         case pp.RequestMetadataExtensionMsgType:
939                 if !t.haveMetadataPiece(piece) {
940                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
941                         return nil
942                 }
943                 start := (1 << 14) * piece
944                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
945                 return nil
946         case pp.RejectMetadataExtensionMsgType:
947                 return nil
948         default:
949                 return errors.New("unknown msg_type value")
950         }
951 }
952
953 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
954         if port == 0 {
955                 return true
956         }
957         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
958                 return true
959         }
960         if _, ok := cl.ipBlockRange(ip); ok {
961                 return true
962         }
963         if _, ok := cl.badPeerIPs[ip.String()]; ok {
964                 return true
965         }
966         return false
967 }
968
969 // Return a Torrent ready for insertion into a Client.
970 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
971         // use provided storage, if provided
972         storageClient := cl.defaultStorage
973         if specStorage != nil {
974                 storageClient = storage.NewClient(specStorage)
975         }
976
977         t = &Torrent{
978                 cl:       cl,
979                 infoHash: ih,
980                 peers:    make(map[peersKey]Peer),
981                 conns:    make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
982
983                 halfOpen:          make(map[string]Peer),
984                 pieceStateChanges: pubsub.NewPubSub(),
985
986                 storageOpener:       storageClient,
987                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
988
989                 networkingEnabled: true,
990                 requestStrategy:   2,
991                 metadataChanged: sync.Cond{
992                         L: &cl.mu,
993                 },
994         }
995         t.logger = cl.logger.Clone().AddValue(t)
996         t.setChunkSize(defaultChunkSize)
997         return
998 }
999
1000 // A file-like handle to some torrent data resource.
1001 type Handle interface {
1002         io.Reader
1003         io.Seeker
1004         io.Closer
1005         io.ReaderAt
1006 }
1007
1008 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1009         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1010 }
1011
1012 // Adds a torrent by InfoHash with a custom Storage implementation.
1013 // If the torrent already exists then this Storage is ignored and the
1014 // existing torrent returned with `new` set to `false`
1015 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1016         cl.mu.Lock()
1017         defer cl.mu.Unlock()
1018         t, ok := cl.torrents[infoHash]
1019         if ok {
1020                 return
1021         }
1022         new = true
1023         t = cl.newTorrent(infoHash, specStorage)
1024         if cl.dHT != nil {
1025                 go t.dhtAnnouncer()
1026         }
1027         cl.torrents[infoHash] = t
1028         t.updateWantPeersEvent()
1029         // Tickle Client.waitAccept, new torrent may want conns.
1030         cl.event.Broadcast()
1031         return
1032 }
1033
1034 // Add or merge a torrent spec. If the torrent is already present, the
1035 // trackers will be merged with the existing ones. If the Info isn't yet
1036 // known, it will be set. The display name is replaced if the new spec
1037 // provides one. Returns new if the torrent wasn't already in the client.
1038 // Note that any `Storage` defined on the spec will be ignored if the
1039 // torrent is already present (i.e. `new` return value is `true`)
1040 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1041         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1042         if spec.DisplayName != "" {
1043                 t.SetDisplayName(spec.DisplayName)
1044         }
1045         if spec.InfoBytes != nil {
1046                 err = t.SetInfoBytes(spec.InfoBytes)
1047                 if err != nil {
1048                         return
1049                 }
1050         }
1051         cl.mu.Lock()
1052         defer cl.mu.Unlock()
1053         if spec.ChunkSize != 0 {
1054                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1055         }
1056         t.addTrackers(spec.Trackers)
1057         t.maybeNewConns()
1058         return
1059 }
1060
1061 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1062         t, ok := cl.torrents[infoHash]
1063         if !ok {
1064                 err = fmt.Errorf("no such torrent")
1065                 return
1066         }
1067         err = t.close()
1068         if err != nil {
1069                 panic(err)
1070         }
1071         delete(cl.torrents, infoHash)
1072         return
1073 }
1074
1075 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1076         _url, err := url.Parse(announceURL)
1077         if err != nil {
1078                 return
1079         }
1080         hmp := missinggo.SplitHostMaybePort(_url.Host)
1081         if hmp.Err != nil {
1082                 err = hmp.Err
1083                 return
1084         }
1085         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1086         if err != nil {
1087                 return
1088         }
1089         cl.mu.RLock()
1090         _, blocked = cl.ipBlockRange(addr.IP)
1091         cl.mu.RUnlock()
1092         host = _url.Host
1093         hmp.Host = addr.String()
1094         _url.Host = hmp.String()
1095         urlToUse = _url.String()
1096         return
1097 }
1098
1099 func (cl *Client) allTorrentsCompleted() bool {
1100         for _, t := range cl.torrents {
1101                 if !t.haveInfo() {
1102                         return false
1103                 }
1104                 if !t.haveAllPieces() {
1105                         return false
1106                 }
1107         }
1108         return true
1109 }
1110
1111 // Returns true when all torrents are completely downloaded and false if the
1112 // client is stopped before that.
1113 func (cl *Client) WaitAll() bool {
1114         cl.mu.Lock()
1115         defer cl.mu.Unlock()
1116         for !cl.allTorrentsCompleted() {
1117                 if cl.closed.IsSet() {
1118                         return false
1119                 }
1120                 cl.event.Wait()
1121         }
1122         return true
1123 }
1124
1125 // Returns handles to all the torrents loaded in the Client.
1126 func (cl *Client) Torrents() []*Torrent {
1127         cl.mu.Lock()
1128         defer cl.mu.Unlock()
1129         return cl.torrentsAsSlice()
1130 }
1131
1132 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1133         for _, t := range cl.torrents {
1134                 ret = append(ret, t)
1135         }
1136         return
1137 }
1138
1139 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1140         spec, err := TorrentSpecFromMagnetURI(uri)
1141         if err != nil {
1142                 return
1143         }
1144         T, _, err = cl.AddTorrentSpec(spec)
1145         return
1146 }
1147
1148 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1149         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1150         var ss []string
1151         slices.MakeInto(&ss, mi.Nodes)
1152         cl.AddDHTNodes(ss)
1153         return
1154 }
1155
1156 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1157         mi, err := metainfo.LoadFromFile(filename)
1158         if err != nil {
1159                 return
1160         }
1161         return cl.AddTorrent(mi)
1162 }
1163
1164 func (cl *Client) DHT() *dht.Server {
1165         return cl.dHT
1166 }
1167
1168 func (cl *Client) AddDHTNodes(nodes []string) {
1169         if cl.DHT() == nil {
1170                 return
1171         }
1172         for _, n := range nodes {
1173                 hmp := missinggo.SplitHostMaybePort(n)
1174                 ip := net.ParseIP(hmp.Host)
1175                 if ip == nil {
1176                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1177                         continue
1178                 }
1179                 ni := krpc.NodeInfo{
1180                         Addr: &net.UDPAddr{
1181                                 IP:   ip,
1182                                 Port: hmp.Port,
1183                         },
1184                 }
1185                 cl.DHT().AddNode(ni)
1186         }
1187 }
1188
1189 func (cl *Client) banPeerIP(ip net.IP) {
1190         if cl.badPeerIPs == nil {
1191                 cl.badPeerIPs = make(map[string]struct{})
1192         }
1193         cl.badPeerIPs[ip.String()] = struct{}{}
1194 }
1195
1196 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1197         c = &connection{
1198                 conn:            nc,
1199                 Choked:          true,
1200                 PeerChoked:      true,
1201                 PeerMaxRequests: 250,
1202                 writeBuffer:     new(bytes.Buffer),
1203         }
1204         c.writerCond.L = &cl.mu
1205         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1206         c.r = &rateLimitedReader{
1207                 l: cl.downloadLimit,
1208                 r: c.r,
1209         }
1210         return
1211 }
1212
1213 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1214         cl.mu.Lock()
1215         defer cl.mu.Unlock()
1216         t := cl.torrent(ih)
1217         if t == nil {
1218                 return
1219         }
1220         t.addPeers([]Peer{{
1221                 IP:     p.IP,
1222                 Port:   p.Port,
1223                 Source: peerSourceDHTAnnouncePeer,
1224         }})
1225 }