]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Add very tentative UPnP NAT traversal
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "log"
12         "net"
13         "net/url"
14         "strconv"
15         "strings"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/dht/krpc"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/pproffd"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/anacrolix/sync"
25         "github.com/dustin/go-humanize"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/mse"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 // Clients contain zero or more Torrents. A Client manages a blocklist, the
37 // TCP/UDP protocol ports, and DHT as desired.
38 type Client struct {
39         mu     sync.RWMutex
40         event  sync.Cond
41         closed missinggo.Event
42
43         config Config
44
45         halfOpenLimit  int
46         peerID         PeerID
47         defaultStorage *storage.Client
48         onClose        []func()
49         tcpListener    net.Listener
50         utpSock        utpSocket
51         dHT            *dht.Server
52         ipBlockList    iplist.Ranger
53         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
54         extensionBytes peerExtensionBytes
55         // The net.Addr.String part that should be common to all active listeners.
56         listenAddr    string
57         uploadLimit   *rate.Limiter
58         downloadLimit *rate.Limiter
59
60         // Set of addresses that have our client ID. This intentionally will
61         // include ourselves if we end up trying to connect to our own address
62         // through legitimate channels.
63         dopplegangerAddrs map[string]struct{}
64         badPeerIPs        map[string]struct{}
65         torrents          map[metainfo.Hash]*Torrent
66 }
67
68 func (cl *Client) BadPeerIPs() []string {
69         cl.mu.RLock()
70         defer cl.mu.RUnlock()
71         return cl.badPeerIPsLocked()
72 }
73
74 func (cl *Client) badPeerIPsLocked() []string {
75         return slices.FromMapKeys(cl.badPeerIPs).([]string)
76 }
77
78 func (cl *Client) IPBlockList() iplist.Ranger {
79         cl.mu.Lock()
80         defer cl.mu.Unlock()
81         return cl.ipBlockList
82 }
83
84 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
85         cl.mu.Lock()
86         defer cl.mu.Unlock()
87         cl.ipBlockList = list
88         if cl.dHT != nil {
89                 cl.dHT.SetIPBlockList(list)
90         }
91 }
92
93 func (cl *Client) PeerID() PeerID {
94         return cl.peerID
95 }
96
97 type torrentAddr string
98
99 func (torrentAddr) Network() string { return "" }
100
101 func (me torrentAddr) String() string { return string(me) }
102
103 func (cl *Client) ListenAddr() net.Addr {
104         if cl.listenAddr == "" {
105                 return nil
106         }
107         return torrentAddr(cl.listenAddr)
108 }
109
110 // Writes out a human readable status of the client, such as for writing to a
111 // HTTP status page.
112 func (cl *Client) WriteStatus(_w io.Writer) {
113         cl.mu.Lock()
114         defer cl.mu.Unlock()
115         w := bufio.NewWriter(_w)
116         defer w.Flush()
117         if addr := cl.ListenAddr(); addr != nil {
118                 fmt.Fprintf(w, "Listening on %s\n", addr)
119         } else {
120                 fmt.Fprintln(w, "Not listening!")
121         }
122         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
124         if dht := cl.DHT(); dht != nil {
125                 dhtStats := dht.Stats()
126                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
127                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
128                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
129                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
130                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
131         }
132         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133         fmt.Fprintln(w)
134         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135                 return l.InfoHash().AsString() < r.InfoHash().AsString()
136         }).([]*Torrent) {
137                 if t.name() == "" {
138                         fmt.Fprint(w, "<unknown name>")
139                 } else {
140                         fmt.Fprint(w, t.name())
141                 }
142                 fmt.Fprint(w, "\n")
143                 if t.info != nil {
144                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
145                 } else {
146                         w.WriteString("<missing metainfo>")
147                 }
148                 fmt.Fprint(w, "\n")
149                 t.writeStatus(w)
150                 fmt.Fprintln(w)
151         }
152 }
153
154 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
155         return NewUtpSocket("udp"+networkSuffix, addr)
156 }
157
158 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
159         return net.Listen("tcp"+networkSuffix, addr)
160 }
161
162 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
163         for {
164                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
165                 if err != nil {
166                         return
167                 }
168                 listenedAddr = tcpL.Addr().String()
169                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
170                 if err == nil {
171                         return
172                 }
173                 tcpL.Close()
174                 if !strings.Contains(err.Error(), "address already in use") {
175                         return
176                 }
177         }
178 }
179
180 // Listen to enabled protocols, ensuring ports match.
181 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
182         if addr == "" {
183                 addr = ":50007"
184         }
185         if tcp && utp {
186                 var host string
187                 var port int
188                 host, port, err = missinggo.ParseHostPort(addr)
189                 if err != nil {
190                         return
191                 }
192                 if port == 0 {
193                         // If both protocols are active, they need to have the same port.
194                         return listenBothSameDynamicPort(networkSuffix, host)
195                 }
196         }
197         defer func() {
198                 if err != nil {
199                         listenedAddr = ""
200                 }
201         }()
202         if tcp {
203                 tcpL, err = listenTCP(networkSuffix, addr)
204                 if err != nil {
205                         return
206                 }
207                 defer func() {
208                         if err != nil {
209                                 tcpL.Close()
210                         }
211                 }()
212                 listenedAddr = tcpL.Addr().String()
213         }
214         if utp {
215                 utpSock, err = listenUTP(networkSuffix, addr)
216                 if err != nil {
217                         return
218                 }
219                 listenedAddr = utpSock.Addr().String()
220         }
221         return
222 }
223
224 // Creates a new client.
225 func NewClient(cfg *Config) (cl *Client, err error) {
226         if cfg == nil {
227                 cfg = &Config{
228                         DHTConfig: dht.ServerConfig{
229                                 StartingNodes: dht.GlobalBootstrapAddrs,
230                         },
231                 }
232         }
233         if cfg == nil {
234                 cfg = &Config{}
235         }
236         cfg.setDefaults()
237
238         defer func() {
239                 if err != nil {
240                         cl = nil
241                 }
242         }()
243         cl = &Client{
244                 halfOpenLimit:     cfg.HalfOpenConnsPerTorrent,
245                 config:            *cfg,
246                 dopplegangerAddrs: make(map[string]struct{}),
247                 torrents:          make(map[metainfo.Hash]*Torrent),
248         }
249         defer func() {
250                 if err == nil {
251                         return
252                 }
253                 cl.Close()
254         }()
255         if cfg.UploadRateLimiter == nil {
256                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
257         } else {
258                 cl.uploadLimit = cfg.UploadRateLimiter
259         }
260         if cfg.DownloadRateLimiter == nil {
261                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
262         } else {
263                 cl.downloadLimit = cfg.DownloadRateLimiter
264         }
265         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
266         cl.event.L = &cl.mu
267         storageImpl := cfg.DefaultStorage
268         if storageImpl == nil {
269                 // We'd use mmap but HFS+ doesn't support sparse files.
270                 storageImpl = storage.NewFile(cfg.DataDir)
271                 cl.onClose = append(cl.onClose, func() {
272                         if err := storageImpl.Close(); err != nil {
273                                 log.Printf("error closing default storage: %s", err)
274                         }
275                 })
276         }
277         cl.defaultStorage = storage.NewClient(storageImpl)
278         if cfg.IPBlocklist != nil {
279                 cl.ipBlockList = cfg.IPBlocklist
280         }
281
282         if cfg.PeerID != "" {
283                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
284         } else {
285                 o := copy(cl.peerID[:], cfg.Bep20)
286                 _, err = rand.Read(cl.peerID[o:])
287                 if err != nil {
288                         panic("error generating peer id")
289                 }
290         }
291
292         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
293                 !cl.config.DisableTCP,
294                 !cl.config.DisableUTP,
295                 func() string {
296                         if cl.config.DisableIPv6 {
297                                 return "4"
298                         } else {
299                                 return ""
300                         }
301                 }(),
302                 cl.config.ListenAddr)
303         if err != nil {
304                 return
305         }
306         go cl.forwardPort()
307         if cl.tcpListener != nil {
308                 go cl.acceptConnections(cl.tcpListener, false)
309         }
310         if cl.utpSock != nil {
311                 go cl.acceptConnections(cl.utpSock, true)
312         }
313         if !cfg.NoDHT {
314                 dhtCfg := cfg.DHTConfig
315                 if dhtCfg.IPBlocklist == nil {
316                         dhtCfg.IPBlocklist = cl.ipBlockList
317                 }
318                 if dhtCfg.Conn == nil {
319                         if cl.utpSock != nil {
320                                 dhtCfg.Conn = cl.utpSock
321                         } else {
322                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
323                                 if err != nil {
324                                         return
325                                 }
326                         }
327                 }
328                 if dhtCfg.OnAnnouncePeer == nil {
329                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
330                 }
331                 cl.dHT, err = dht.NewServer(&dhtCfg)
332                 if err != nil {
333                         return
334                 }
335                 go func() {
336                         if _, err := cl.dHT.Bootstrap(); err != nil {
337                                 log.Printf("error bootstrapping dht: %s", err)
338                         }
339                 }()
340         }
341
342         return
343 }
344
345 func firstNonEmptyString(ss ...string) string {
346         for _, s := range ss {
347                 if s != "" {
348                         return s
349                 }
350         }
351         return ""
352 }
353
354 // Stops the client. All connections to peers are closed and all activity will
355 // come to a halt.
356 func (cl *Client) Close() {
357         cl.mu.Lock()
358         defer cl.mu.Unlock()
359         cl.closed.Set()
360         if cl.dHT != nil {
361                 cl.dHT.Close()
362         }
363         if cl.utpSock != nil {
364                 cl.utpSock.Close()
365         }
366         if cl.tcpListener != nil {
367                 cl.tcpListener.Close()
368         }
369         for _, t := range cl.torrents {
370                 t.close()
371         }
372         for _, f := range cl.onClose {
373                 f()
374         }
375         cl.event.Broadcast()
376 }
377
378 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
379
380 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
381         if cl.ipBlockList == nil {
382                 return
383         }
384         ip4 := ip.To4()
385         // If blocklists are enabled, then block non-IPv4 addresses, because
386         // blocklists do not yet support IPv6.
387         if ip4 == nil {
388                 if missinggo.CryHeard() {
389                         log.Printf("blocking non-IPv4 address: %s", ip)
390                 }
391                 r = ipv6BlockRange
392                 blocked = true
393                 return
394         }
395         return cl.ipBlockList.Lookup(ip4)
396 }
397
398 func (cl *Client) waitAccept() {
399         for {
400                 for _, t := range cl.torrents {
401                         if t.wantConns() {
402                                 return
403                         }
404                 }
405                 if cl.closed.IsSet() {
406                         return
407                 }
408                 cl.event.Wait()
409         }
410 }
411
412 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
413         cl.mu.Lock()
414         defer cl.mu.Unlock()
415         for {
416                 cl.waitAccept()
417                 cl.mu.Unlock()
418                 conn, err := l.Accept()
419                 conn = pproffd.WrapNetConn(conn)
420                 cl.mu.Lock()
421                 if cl.closed.IsSet() {
422                         if conn != nil {
423                                 conn.Close()
424                         }
425                         return
426                 }
427                 if err != nil {
428                         log.Print(err)
429                         // I think something harsher should happen here? Our accept
430                         // routine just fucked off.
431                         return
432                 }
433                 if utp {
434                         acceptUTP.Add(1)
435                 } else {
436                         acceptTCP.Add(1)
437                 }
438                 if cl.config.Debug {
439                         log.Printf("accepted connection from %s", conn.RemoteAddr())
440                 }
441                 reject := cl.badPeerIPPort(
442                         missinggo.AddrIP(conn.RemoteAddr()),
443                         missinggo.AddrPort(conn.RemoteAddr()))
444                 if reject {
445                         if cl.config.Debug {
446                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
447                         }
448                         acceptReject.Add(1)
449                         conn.Close()
450                         continue
451                 }
452                 go cl.incomingConnection(conn, utp)
453         }
454 }
455
456 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
457         defer nc.Close()
458         if tc, ok := nc.(*net.TCPConn); ok {
459                 tc.SetLinger(0)
460         }
461         c := cl.newConnection(nc)
462         c.Discovery = peerSourceIncoming
463         c.uTP = utp
464         cl.runReceivedConn(c)
465 }
466
467 // Returns a handle to the given torrent, if it's present in the client.
468 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
469         cl.mu.Lock()
470         defer cl.mu.Unlock()
471         t, ok = cl.torrents[ih]
472         return
473 }
474
475 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
476         return cl.torrents[ih]
477 }
478
479 type dialResult struct {
480         Conn net.Conn
481         UTP  bool
482 }
483
484 func countDialResult(err error) {
485         if err == nil {
486                 successfulDials.Add(1)
487         } else {
488                 unsuccessfulDials.Add(1)
489         }
490 }
491
492 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
493         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
494         if ret < minDialTimeout {
495                 ret = minDialTimeout
496         }
497         return
498 }
499
500 // Returns whether an address is known to connect to a client with our own ID.
501 func (cl *Client) dopplegangerAddr(addr string) bool {
502         _, ok := cl.dopplegangerAddrs[addr]
503         return ok
504 }
505
506 // Start the process of connecting to the given peer for the given torrent if
507 // appropriate.
508 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
509         if peer.Id == cl.peerID {
510                 return
511         }
512         if cl.badPeerIPPort(peer.IP, peer.Port) {
513                 return
514         }
515         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
516         if t.addrActive(addr) {
517                 return
518         }
519         t.halfOpen[addr] = peer
520         go cl.outgoingConnection(t, addr, peer.Source)
521 }
522
523 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
524         d := net.Dialer{
525                 // LocalAddr: cl.tcpListener.Addr(),
526         }
527         c, err = d.DialContext(ctx, "tcp", addr)
528         countDialResult(err)
529         if err == nil {
530                 c.(*net.TCPConn).SetLinger(0)
531         }
532         c = pproffd.WrapNetConn(c)
533         return
534 }
535
536 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
537         c, err = cl.utpSock.DialContext(ctx, addr)
538         countDialResult(err)
539         return
540 }
541
542 var (
543         dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
544         dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
545 )
546
547 // Returns a connection over UTP or TCP, whichever is first to connect.
548 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
549         ctx, cancel := context.WithCancel(ctx)
550         // As soon as we return one connection, cancel the others.
551         defer cancel()
552         left := 0
553         resCh := make(chan dialResult, left)
554         if !cl.config.DisableUTP {
555                 left++
556                 go func() {
557                         c, _ := cl.dialUTP(ctx, addr)
558                         resCh <- dialResult{c, true}
559                 }()
560         }
561         if !cl.config.DisableTCP {
562                 left++
563                 go func() {
564                         c, _ := cl.dialTCP(ctx, addr)
565                         resCh <- dialResult{c, false}
566                 }()
567         }
568         var res dialResult
569         // Wait for a successful connection.
570         for ; left > 0 && res.Conn == nil; left-- {
571                 res = <-resCh
572         }
573         if left > 0 {
574                 // There are still incompleted dials.
575                 go func() {
576                         for ; left > 0; left-- {
577                                 conn := (<-resCh).Conn
578                                 if conn != nil {
579                                         conn.Close()
580                                 }
581                         }
582                 }()
583         }
584         conn = res.Conn
585         utp = res.UTP
586         if conn != nil {
587                 if utp {
588                         dialledFirstUtp.Add(1)
589                 } else {
590                         dialledFirstNotUtp.Add(1)
591                 }
592         }
593         return
594 }
595
596 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
597         if _, ok := t.halfOpen[addr]; !ok {
598                 panic("invariant broken")
599         }
600         delete(t.halfOpen, addr)
601         cl.openNewConns(t)
602 }
603
604 // Performs initiator handshakes and returns a connection. Returns nil
605 // *connection if no connection for valid reasons.
606 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
607         c = cl.newConnection(nc)
608         c.headerEncrypted = encryptHeader
609         c.uTP = utp
610         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
611         defer cancel()
612         dl, ok := ctx.Deadline()
613         if !ok {
614                 panic(ctx)
615         }
616         err = nc.SetDeadline(dl)
617         if err != nil {
618                 panic(err)
619         }
620         ok, err = cl.initiateHandshakes(c, t)
621         if !ok {
622                 c = nil
623         }
624         return
625 }
626
627 var (
628         initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
629         initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
630 )
631
632 // Returns nil connection and nil error if no connection could be established
633 // for valid reasons.
634 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
635         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
636         defer cancel()
637         nc, utp := cl.dialFirst(ctx, addr)
638         if nc == nil {
639                 return
640         }
641         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
642         c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
643         if err != nil {
644                 // log.Printf("error initiating connection handshakes: %s", err)
645                 nc.Close()
646                 return
647         } else if c != nil {
648                 initiatedConnWithPreferredHeaderEncryption.Add(1)
649                 return
650         }
651         nc.Close()
652         if cl.config.ForceEncryption {
653                 // We should have just tried with an obfuscated header. A plaintext
654                 // header can't result in an encrypted connection, so we're done.
655                 if !obfuscatedHeaderFirst {
656                         panic(cl.config.EncryptionPolicy)
657                 }
658                 return
659         }
660         // Try again with encryption if we didn't earlier, or without if we did,
661         // using whichever protocol type worked last time.
662         if utp {
663                 nc, err = cl.dialUTP(ctx, addr)
664         } else {
665                 nc, err = cl.dialTCP(ctx, addr)
666         }
667         if err != nil {
668                 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
669                 return
670         }
671         c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
672         if err != nil || c == nil {
673                 nc.Close()
674         }
675         if err == nil && c != nil {
676                 initiatedConnWithFallbackHeaderEncryption.Add(1)
677         }
678         return
679 }
680
681 // Called to dial out and run a connection. The addr we're given is already
682 // considered half-open.
683 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
684         c, err := cl.establishOutgoingConn(t, addr)
685         cl.mu.Lock()
686         defer cl.mu.Unlock()
687         // Don't release lock between here and addConnection, unless it's for
688         // failure.
689         cl.noLongerHalfOpen(t, addr)
690         if err != nil {
691                 if cl.config.Debug {
692                         log.Printf("error establishing outgoing connection: %s", err)
693                 }
694                 return
695         }
696         if c == nil {
697                 return
698         }
699         defer c.Close()
700         c.Discovery = ps
701         cl.runInitiatedHandshookConn(c, t)
702 }
703
704 // The port number for incoming peer connections. 0 if the client isn't
705 // listening.
706 func (cl *Client) incomingPeerPort() int {
707         if cl.listenAddr == "" {
708                 return 0
709         }
710         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
711         if err != nil {
712                 panic(err)
713         }
714         return port
715 }
716
717 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
718         if c.headerEncrypted {
719                 var rw io.ReadWriter
720                 rw, err = mse.InitiateHandshake(
721                         struct {
722                                 io.Reader
723                                 io.Writer
724                         }{c.r, c.w},
725                         t.infoHash[:],
726                         nil,
727                         func() uint32 {
728                                 switch {
729                                 case cl.config.ForceEncryption:
730                                         return mse.CryptoMethodRC4
731                                 case cl.config.DisableEncryption:
732                                         return mse.CryptoMethodPlaintext
733                                 default:
734                                         return mse.AllSupportedCrypto
735                                 }
736                         }(),
737                 )
738                 c.setRW(rw)
739                 if err != nil {
740                         return
741                 }
742         }
743         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
744         if ih != t.infoHash {
745                 ok = false
746         }
747         return
748 }
749
750 // Calls f with any secret keys.
751 func (cl *Client) forSkeys(f func([]byte) bool) {
752         cl.mu.Lock()
753         defer cl.mu.Unlock()
754         for ih := range cl.torrents {
755                 if !f(ih[:]) {
756                         break
757                 }
758         }
759 }
760
761 // Do encryption and bittorrent handshakes as receiver.
762 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
763         var rw io.ReadWriter
764         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
765         c.setRW(rw)
766         if err != nil {
767                 if err == mse.ErrNoSecretKeyMatch {
768                         err = nil
769                 }
770                 return
771         }
772         if cl.config.ForceEncryption && !c.headerEncrypted {
773                 err = errors.New("connection not encrypted")
774                 return
775         }
776         ih, ok, err := cl.connBTHandshake(c, nil)
777         if err != nil {
778                 err = fmt.Errorf("error during bt handshake: %s", err)
779                 return
780         }
781         if !ok {
782                 return
783         }
784         cl.mu.Lock()
785         t = cl.torrents[ih]
786         cl.mu.Unlock()
787         return
788 }
789
790 // Returns !ok if handshake failed for valid reasons.
791 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
792         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
793         if err != nil || !ok {
794                 return
795         }
796         ret = res.Hash
797         c.PeerExtensionBytes = res.peerExtensionBytes
798         c.PeerID = res.PeerID
799         c.completedHandshake = time.Now()
800         return
801 }
802
803 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
804         if c.PeerID == cl.peerID {
805                 connsToSelf.Add(1)
806                 addr := c.conn.RemoteAddr().String()
807                 cl.dopplegangerAddrs[addr] = struct{}{}
808                 return
809         }
810         cl.runHandshookConn(c, t, true)
811 }
812
813 func (cl *Client) runReceivedConn(c *connection) {
814         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
815         if err != nil {
816                 panic(err)
817         }
818         t, err := cl.receiveHandshakes(c)
819         if err != nil {
820                 if cl.config.Debug {
821                         log.Printf("error receiving handshakes: %s", err)
822                 }
823                 return
824         }
825         if t == nil {
826                 return
827         }
828         cl.mu.Lock()
829         defer cl.mu.Unlock()
830         if c.PeerID == cl.peerID {
831                 // Because the remote address is not necessarily the same as its
832                 // client's torrent listen address, we won't record the remote address
833                 // as a doppleganger. Instead, the initiator can record *us* as the
834                 // doppleganger.
835                 return
836         }
837         cl.runHandshookConn(c, t, false)
838 }
839
840 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
841         c.conn.SetWriteDeadline(time.Time{})
842         c.r = deadlineReader{c.conn, c.r}
843         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
844         if !t.addConnection(c, outgoing) {
845                 return
846         }
847         defer t.dropConnection(c)
848         go c.writer(time.Minute)
849         cl.sendInitialMessages(c, t)
850         err := c.mainReadLoop()
851         if err != nil && cl.config.Debug {
852                 log.Printf("error during connection main read loop: %s", err)
853         }
854 }
855
856 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
857         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
858                 conn.Post(pp.Message{
859                         Type:       pp.Extended,
860                         ExtendedID: pp.HandshakeExtendedID,
861                         ExtendedPayload: func() []byte {
862                                 d := map[string]interface{}{
863                                         "m": func() (ret map[string]int) {
864                                                 ret = make(map[string]int, 2)
865                                                 ret["ut_metadata"] = metadataExtendedId
866                                                 if !cl.config.DisablePEX {
867                                                         ret["ut_pex"] = pexExtendedId
868                                                 }
869                                                 return
870                                         }(),
871                                         "v": cl.config.ExtendedHandshakeClientVersion,
872                                         // No upload queue is implemented yet.
873                                         "reqq": 64,
874                                 }
875                                 if !cl.config.DisableEncryption {
876                                         d["e"] = 1
877                                 }
878                                 if torrent.metadataSizeKnown() {
879                                         d["metadata_size"] = torrent.metadataSize()
880                                 }
881                                 if p := cl.incomingPeerPort(); p != 0 {
882                                         d["p"] = p
883                                 }
884                                 yourip, err := addrCompactIP(conn.remoteAddr())
885                                 if err != nil {
886                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
887                                 } else {
888                                         d["yourip"] = yourip
889                                 }
890                                 // log.Printf("sending %v", d)
891                                 b, err := bencode.Marshal(d)
892                                 if err != nil {
893                                         panic(err)
894                                 }
895                                 return b
896                         }(),
897                 })
898         }
899         if torrent.haveAnyPieces() {
900                 conn.Bitfield(torrent.bitfield())
901         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
902                 conn.Post(pp.Message{
903                         Type: pp.HaveNone,
904                 })
905         }
906         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
907                 conn.Post(pp.Message{
908                         Type: pp.Port,
909                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
910                 })
911         }
912 }
913
914 // Process incoming ut_metadata message.
915 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
916         var d map[string]int
917         err := bencode.Unmarshal(payload, &d)
918         if err != nil {
919                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
920         }
921         msgType, ok := d["msg_type"]
922         if !ok {
923                 return errors.New("missing msg_type field")
924         }
925         piece := d["piece"]
926         switch msgType {
927         case pp.DataMetadataExtensionMsgType:
928                 if !c.requestedMetadataPiece(piece) {
929                         return fmt.Errorf("got unexpected piece %d", piece)
930                 }
931                 c.metadataRequests[piece] = false
932                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
933                 if begin < 0 || begin >= len(payload) {
934                         return fmt.Errorf("data has bad offset in payload: %d", begin)
935                 }
936                 t.saveMetadataPiece(piece, payload[begin:])
937                 c.UsefulChunksReceived++
938                 c.lastUsefulChunkReceived = time.Now()
939                 return t.maybeCompleteMetadata()
940         case pp.RequestMetadataExtensionMsgType:
941                 if !t.haveMetadataPiece(piece) {
942                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
943                         return nil
944                 }
945                 start := (1 << 14) * piece
946                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
947                 return nil
948         case pp.RejectMetadataExtensionMsgType:
949                 return nil
950         default:
951                 return errors.New("unknown msg_type value")
952         }
953 }
954
955 func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
956         // Count the chunk being sent, even if it isn't.
957         b := make([]byte, r.Length)
958         p := t.info.Piece(int(r.Index))
959         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
960         if n != len(b) {
961                 if err == nil {
962                         panic("expected error")
963                 }
964                 return
965         } else if err == io.EOF {
966                 err = nil
967         }
968         more = msg(pp.Message{
969                 Type:  pp.Piece,
970                 Index: r.Index,
971                 Begin: r.Begin,
972                 Piece: b,
973         })
974         c.chunksSent++
975         uploadChunksPosted.Add(1)
976         c.lastChunkSent = time.Now()
977         return
978 }
979
980 func (cl *Client) openNewConns(t *Torrent) {
981         defer t.updateWantPeersEvent()
982         for len(t.peers) != 0 {
983                 if !t.wantConns() {
984                         return
985                 }
986                 if len(t.halfOpen) >= cl.halfOpenLimit {
987                         return
988                 }
989                 var (
990                         k peersKey
991                         p Peer
992                 )
993                 for k, p = range t.peers {
994                         break
995                 }
996                 delete(t.peers, k)
997                 cl.initiateConn(p, t)
998         }
999 }
1000
1001 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1002         if port == 0 {
1003                 return true
1004         }
1005         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1006                 return true
1007         }
1008         if _, ok := cl.ipBlockRange(ip); ok {
1009                 return true
1010         }
1011         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1012                 return true
1013         }
1014         return false
1015 }
1016
1017 // Return a Torrent ready for insertion into a Client.
1018 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1019         // use provided storage, if provided
1020         storageClient := cl.defaultStorage
1021         if specStorage != nil {
1022                 storageClient = storage.NewClient(specStorage)
1023         }
1024
1025         t = &Torrent{
1026                 cl:       cl,
1027                 infoHash: ih,
1028                 peers:    make(map[peersKey]Peer),
1029                 conns:    make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1030
1031                 halfOpen:          make(map[string]Peer),
1032                 pieceStateChanges: pubsub.NewPubSub(),
1033
1034                 storageOpener:       storageClient,
1035                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1036
1037                 networkingEnabled: true,
1038                 requestStrategy:   2,
1039                 metadataChanged: sync.Cond{
1040                         L: &cl.mu,
1041                 },
1042         }
1043         t.setChunkSize(defaultChunkSize)
1044         return
1045 }
1046
1047 // A file-like handle to some torrent data resource.
1048 type Handle interface {
1049         io.Reader
1050         io.Seeker
1051         io.Closer
1052         io.ReaderAt
1053 }
1054
1055 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1056         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1057 }
1058
1059 // Adds a torrent by InfoHash with a custom Storage implementation.
1060 // If the torrent already exists then this Storage is ignored and the
1061 // existing torrent returned with `new` set to `false`
1062 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1063         cl.mu.Lock()
1064         defer cl.mu.Unlock()
1065         t, ok := cl.torrents[infoHash]
1066         if ok {
1067                 return
1068         }
1069         new = true
1070         t = cl.newTorrent(infoHash, specStorage)
1071         if cl.dHT != nil {
1072                 go t.dhtAnnouncer()
1073         }
1074         cl.torrents[infoHash] = t
1075         t.updateWantPeersEvent()
1076         // Tickle Client.waitAccept, new torrent may want conns.
1077         cl.event.Broadcast()
1078         return
1079 }
1080
1081 // Add or merge a torrent spec. If the torrent is already present, the
1082 // trackers will be merged with the existing ones. If the Info isn't yet
1083 // known, it will be set. The display name is replaced if the new spec
1084 // provides one. Returns new if the torrent wasn't already in the client.
1085 // Note that any `Storage` defined on the spec will be ignored if the
1086 // torrent is already present (i.e. `new` return value is `true`)
1087 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1088         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1089         if spec.DisplayName != "" {
1090                 t.SetDisplayName(spec.DisplayName)
1091         }
1092         if spec.InfoBytes != nil {
1093                 err = t.SetInfoBytes(spec.InfoBytes)
1094                 if err != nil {
1095                         return
1096                 }
1097         }
1098         cl.mu.Lock()
1099         defer cl.mu.Unlock()
1100         if spec.ChunkSize != 0 {
1101                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1102         }
1103         t.addTrackers(spec.Trackers)
1104         t.maybeNewConns()
1105         return
1106 }
1107
1108 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1109         t, ok := cl.torrents[infoHash]
1110         if !ok {
1111                 err = fmt.Errorf("no such torrent")
1112                 return
1113         }
1114         err = t.close()
1115         if err != nil {
1116                 panic(err)
1117         }
1118         delete(cl.torrents, infoHash)
1119         return
1120 }
1121
1122 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1123         _url, err := url.Parse(announceURL)
1124         if err != nil {
1125                 return
1126         }
1127         hmp := missinggo.SplitHostMaybePort(_url.Host)
1128         if hmp.Err != nil {
1129                 err = hmp.Err
1130                 return
1131         }
1132         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1133         if err != nil {
1134                 return
1135         }
1136         cl.mu.RLock()
1137         _, blocked = cl.ipBlockRange(addr.IP)
1138         cl.mu.RUnlock()
1139         host = _url.Host
1140         hmp.Host = addr.String()
1141         _url.Host = hmp.String()
1142         urlToUse = _url.String()
1143         return
1144 }
1145
1146 func (cl *Client) allTorrentsCompleted() bool {
1147         for _, t := range cl.torrents {
1148                 if !t.haveInfo() {
1149                         return false
1150                 }
1151                 if t.numPiecesCompleted() != t.numPieces() {
1152                         return false
1153                 }
1154         }
1155         return true
1156 }
1157
1158 // Returns true when all torrents are completely downloaded and false if the
1159 // client is stopped before that.
1160 func (cl *Client) WaitAll() bool {
1161         cl.mu.Lock()
1162         defer cl.mu.Unlock()
1163         for !cl.allTorrentsCompleted() {
1164                 if cl.closed.IsSet() {
1165                         return false
1166                 }
1167                 cl.event.Wait()
1168         }
1169         return true
1170 }
1171
1172 // Returns handles to all the torrents loaded in the Client.
1173 func (cl *Client) Torrents() []*Torrent {
1174         cl.mu.Lock()
1175         defer cl.mu.Unlock()
1176         return cl.torrentsAsSlice()
1177 }
1178
1179 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1180         for _, t := range cl.torrents {
1181                 ret = append(ret, t)
1182         }
1183         return
1184 }
1185
1186 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1187         spec, err := TorrentSpecFromMagnetURI(uri)
1188         if err != nil {
1189                 return
1190         }
1191         T, _, err = cl.AddTorrentSpec(spec)
1192         return
1193 }
1194
1195 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1196         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1197         var ss []string
1198         slices.MakeInto(&ss, mi.Nodes)
1199         cl.AddDHTNodes(ss)
1200         return
1201 }
1202
1203 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1204         mi, err := metainfo.LoadFromFile(filename)
1205         if err != nil {
1206                 return
1207         }
1208         return cl.AddTorrent(mi)
1209 }
1210
1211 func (cl *Client) DHT() *dht.Server {
1212         return cl.dHT
1213 }
1214
1215 func (cl *Client) AddDHTNodes(nodes []string) {
1216         if cl.DHT() == nil {
1217                 return
1218         }
1219         for _, n := range nodes {
1220                 hmp := missinggo.SplitHostMaybePort(n)
1221                 ip := net.ParseIP(hmp.Host)
1222                 if ip == nil {
1223                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1224                         continue
1225                 }
1226                 ni := krpc.NodeInfo{
1227                         Addr: &net.UDPAddr{
1228                                 IP:   ip,
1229                                 Port: hmp.Port,
1230                         },
1231                 }
1232                 cl.DHT().AddNode(ni)
1233         }
1234 }
1235
1236 func (cl *Client) banPeerIP(ip net.IP) {
1237         if cl.badPeerIPs == nil {
1238                 cl.badPeerIPs = make(map[string]struct{})
1239         }
1240         cl.badPeerIPs[ip.String()] = struct{}{}
1241 }
1242
1243 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1244         c = &connection{
1245                 conn: nc,
1246
1247                 Choked:          true,
1248                 PeerChoked:      true,
1249                 PeerMaxRequests: 250,
1250         }
1251         c.writerCond.L = &cl.mu
1252         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1253         c.r = rateLimitedReader{cl.downloadLimit, c.r}
1254         return
1255 }
1256
1257 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1258         cl.mu.Lock()
1259         defer cl.mu.Unlock()
1260         t := cl.torrent(ih)
1261         if t == nil {
1262                 return
1263         }
1264         t.addPeers([]Peer{{
1265                 IP:     p.IP,
1266                 Port:   p.Port,
1267                 Source: peerSourceDHTAnnouncePeer,
1268         }})
1269 }