]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Move some code around
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "log"
12         "net"
13         "net/url"
14         "strconv"
15         "strings"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/dht/krpc"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/pproffd"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/anacrolix/sync"
25         "github.com/dustin/go-humanize"
26         "golang.org/x/time/rate"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/mse"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34 )
35
36 // Clients contain zero or more Torrents. A Client manages a blocklist, the
37 // TCP/UDP protocol ports, and DHT as desired.
38 type Client struct {
39         mu     sync.RWMutex
40         event  sync.Cond
41         closed missinggo.Event
42
43         config Config
44
45         halfOpenLimit  int
46         peerID         [20]byte
47         defaultStorage *storage.Client
48         onClose        []func()
49         tcpListener    net.Listener
50         utpSock        utpSocket
51         dHT            *dht.Server
52         ipBlockList    iplist.Ranger
53         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
54         extensionBytes peerExtensionBytes
55         // The net.Addr.String part that should be common to all active listeners.
56         listenAddr    string
57         uploadLimit   *rate.Limiter
58         downloadLimit *rate.Limiter
59
60         // Set of addresses that have our client ID. This intentionally will
61         // include ourselves if we end up trying to connect to our own address
62         // through legitimate channels.
63         dopplegangerAddrs map[string]struct{}
64         badPeerIPs        map[string]struct{}
65         torrents          map[metainfo.Hash]*Torrent
66 }
67
68 func (cl *Client) BadPeerIPs() []string {
69         cl.mu.RLock()
70         defer cl.mu.RUnlock()
71         return cl.badPeerIPsLocked()
72 }
73
74 func (cl *Client) badPeerIPsLocked() []string {
75         return slices.FromMapKeys(cl.badPeerIPs).([]string)
76 }
77
78 func (cl *Client) IPBlockList() iplist.Ranger {
79         cl.mu.Lock()
80         defer cl.mu.Unlock()
81         return cl.ipBlockList
82 }
83
84 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
85         cl.mu.Lock()
86         defer cl.mu.Unlock()
87         cl.ipBlockList = list
88         if cl.dHT != nil {
89                 cl.dHT.SetIPBlockList(list)
90         }
91 }
92
93 func (cl *Client) PeerID() string {
94         return string(cl.peerID[:])
95 }
96
97 type torrentAddr string
98
99 func (torrentAddr) Network() string { return "" }
100
101 func (me torrentAddr) String() string { return string(me) }
102
103 func (cl *Client) ListenAddr() net.Addr {
104         if cl.listenAddr == "" {
105                 return nil
106         }
107         return torrentAddr(cl.listenAddr)
108 }
109
110 // Writes out a human readable status of the client, such as for writing to a
111 // HTTP status page.
112 func (cl *Client) WriteStatus(_w io.Writer) {
113         cl.mu.Lock()
114         defer cl.mu.Unlock()
115         w := bufio.NewWriter(_w)
116         defer w.Flush()
117         if addr := cl.ListenAddr(); addr != nil {
118                 fmt.Fprintf(w, "Listening on %s\n", addr)
119         } else {
120                 fmt.Fprintln(w, "Not listening!")
121         }
122         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
124         if dht := cl.DHT(); dht != nil {
125                 dhtStats := dht.Stats()
126                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
127                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
128                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
129                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
130                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
131         }
132         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133         fmt.Fprintln(w)
134         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135                 return l.InfoHash().AsString() < r.InfoHash().AsString()
136         }).([]*Torrent) {
137                 if t.name() == "" {
138                         fmt.Fprint(w, "<unknown name>")
139                 } else {
140                         fmt.Fprint(w, t.name())
141                 }
142                 fmt.Fprint(w, "\n")
143                 if t.info != nil {
144                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
145                 } else {
146                         w.WriteString("<missing metainfo>")
147                 }
148                 fmt.Fprint(w, "\n")
149                 t.writeStatus(w)
150                 fmt.Fprintln(w)
151         }
152 }
153
154 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
155         return NewUtpSocket("udp"+networkSuffix, addr)
156 }
157
158 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
159         return net.Listen("tcp"+networkSuffix, addr)
160 }
161
162 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
163         for {
164                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
165                 if err != nil {
166                         return
167                 }
168                 listenedAddr = tcpL.Addr().String()
169                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
170                 if err == nil {
171                         return
172                 }
173                 tcpL.Close()
174                 if !strings.Contains(err.Error(), "address already in use") {
175                         return
176                 }
177         }
178 }
179
180 // Listen to enabled protocols, ensuring ports match.
181 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
182         if addr == "" {
183                 addr = ":50007"
184         }
185         if tcp && utp {
186                 var host string
187                 var port int
188                 host, port, err = missinggo.ParseHostPort(addr)
189                 if err != nil {
190                         return
191                 }
192                 if port == 0 {
193                         // If both protocols are active, they need to have the same port.
194                         return listenBothSameDynamicPort(networkSuffix, host)
195                 }
196         }
197         defer func() {
198                 if err != nil {
199                         listenedAddr = ""
200                 }
201         }()
202         if tcp {
203                 tcpL, err = listenTCP(networkSuffix, addr)
204                 if err != nil {
205                         return
206                 }
207                 defer func() {
208                         if err != nil {
209                                 tcpL.Close()
210                         }
211                 }()
212                 listenedAddr = tcpL.Addr().String()
213         }
214         if utp {
215                 utpSock, err = listenUTP(networkSuffix, addr)
216                 if err != nil {
217                         return
218                 }
219                 listenedAddr = utpSock.Addr().String()
220         }
221         return
222 }
223
224 // Creates a new client.
225 func NewClient(cfg *Config) (cl *Client, err error) {
226         if cfg == nil {
227                 cfg = &Config{
228                         DHTConfig: dht.ServerConfig{
229                                 StartingNodes: dht.GlobalBootstrapAddrs,
230                         },
231                 }
232         }
233
234         defer func() {
235                 if err != nil {
236                         cl = nil
237                 }
238         }()
239         cl = &Client{
240                 halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
241                 config:            *cfg,
242                 dopplegangerAddrs: make(map[string]struct{}),
243                 torrents:          make(map[metainfo.Hash]*Torrent),
244         }
245         defer func() {
246                 if err == nil {
247                         return
248                 }
249                 cl.Close()
250         }()
251         if cfg.UploadRateLimiter == nil {
252                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
253         } else {
254                 cl.uploadLimit = cfg.UploadRateLimiter
255         }
256         if cfg.DownloadRateLimiter == nil {
257                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
258         } else {
259                 cl.downloadLimit = cfg.DownloadRateLimiter
260         }
261         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
262         cl.event.L = &cl.mu
263         storageImpl := cfg.DefaultStorage
264         if storageImpl == nil {
265                 // We'd use mmap but HFS+ doesn't support sparse files.
266                 storageImpl = storage.NewFile(cfg.DataDir)
267                 cl.onClose = append(cl.onClose, func() {
268                         if err := storageImpl.Close(); err != nil {
269                                 log.Printf("error closing default storage: %s", err)
270                         }
271                 })
272         }
273         cl.defaultStorage = storage.NewClient(storageImpl)
274         if cfg.IPBlocklist != nil {
275                 cl.ipBlockList = cfg.IPBlocklist
276         }
277
278         if cfg.PeerID != "" {
279                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
280         } else {
281                 o := copy(cl.peerID[:], bep20)
282                 _, err = rand.Read(cl.peerID[o:])
283                 if err != nil {
284                         panic("error generating peer id")
285                 }
286         }
287
288         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
289                 !cl.config.DisableTCP,
290                 !cl.config.DisableUTP,
291                 func() string {
292                         if cl.config.DisableIPv6 {
293                                 return "4"
294                         } else {
295                                 return ""
296                         }
297                 }(),
298                 cl.config.ListenAddr)
299         if err != nil {
300                 return
301         }
302         if cl.tcpListener != nil {
303                 go cl.acceptConnections(cl.tcpListener, false)
304         }
305         if cl.utpSock != nil {
306                 go cl.acceptConnections(cl.utpSock, true)
307         }
308         if !cfg.NoDHT {
309                 dhtCfg := cfg.DHTConfig
310                 if dhtCfg.IPBlocklist == nil {
311                         dhtCfg.IPBlocklist = cl.ipBlockList
312                 }
313                 if dhtCfg.Conn == nil {
314                         if cl.utpSock != nil {
315                                 dhtCfg.Conn = cl.utpSock
316                         } else {
317                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
318                                 if err != nil {
319                                         return
320                                 }
321                         }
322                 }
323                 if dhtCfg.OnAnnouncePeer == nil {
324                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
325                 }
326                 cl.dHT, err = dht.NewServer(&dhtCfg)
327                 if err != nil {
328                         return
329                 }
330                 go func() {
331                         if _, err := cl.dHT.Bootstrap(); err != nil {
332                                 log.Printf("error bootstrapping dht: %s", err)
333                         }
334                 }()
335         }
336
337         return
338 }
339
340 func firstNonEmptyString(ss ...string) string {
341         for _, s := range ss {
342                 if s != "" {
343                         return s
344                 }
345         }
346         return ""
347 }
348
349 // Stops the client. All connections to peers are closed and all activity will
350 // come to a halt.
351 func (cl *Client) Close() {
352         cl.mu.Lock()
353         defer cl.mu.Unlock()
354         cl.closed.Set()
355         if cl.dHT != nil {
356                 cl.dHT.Close()
357         }
358         if cl.utpSock != nil {
359                 cl.utpSock.Close()
360         }
361         if cl.tcpListener != nil {
362                 cl.tcpListener.Close()
363         }
364         for _, t := range cl.torrents {
365                 t.close()
366         }
367         for _, f := range cl.onClose {
368                 f()
369         }
370         cl.event.Broadcast()
371 }
372
373 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
374
375 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
376         if cl.ipBlockList == nil {
377                 return
378         }
379         ip4 := ip.To4()
380         // If blocklists are enabled, then block non-IPv4 addresses, because
381         // blocklists do not yet support IPv6.
382         if ip4 == nil {
383                 if missinggo.CryHeard() {
384                         log.Printf("blocking non-IPv4 address: %s", ip)
385                 }
386                 r = ipv6BlockRange
387                 blocked = true
388                 return
389         }
390         return cl.ipBlockList.Lookup(ip4)
391 }
392
393 func (cl *Client) waitAccept() {
394         for {
395                 for _, t := range cl.torrents {
396                         if t.wantConns() {
397                                 return
398                         }
399                 }
400                 if cl.closed.IsSet() {
401                         return
402                 }
403                 cl.event.Wait()
404         }
405 }
406
407 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
408         cl.mu.Lock()
409         defer cl.mu.Unlock()
410         for {
411                 cl.waitAccept()
412                 cl.mu.Unlock()
413                 conn, err := l.Accept()
414                 conn = pproffd.WrapNetConn(conn)
415                 cl.mu.Lock()
416                 if cl.closed.IsSet() {
417                         if conn != nil {
418                                 conn.Close()
419                         }
420                         return
421                 }
422                 if err != nil {
423                         log.Print(err)
424                         // I think something harsher should happen here? Our accept
425                         // routine just fucked off.
426                         return
427                 }
428                 if utp {
429                         acceptUTP.Add(1)
430                 } else {
431                         acceptTCP.Add(1)
432                 }
433                 if cl.config.Debug {
434                         log.Printf("accepted connection from %s", conn.RemoteAddr())
435                 }
436                 reject := cl.badPeerIPPort(
437                         missinggo.AddrIP(conn.RemoteAddr()),
438                         missinggo.AddrPort(conn.RemoteAddr()))
439                 if reject {
440                         if cl.config.Debug {
441                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
442                         }
443                         acceptReject.Add(1)
444                         conn.Close()
445                         continue
446                 }
447                 go cl.incomingConnection(conn, utp)
448         }
449 }
450
451 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
452         defer nc.Close()
453         if tc, ok := nc.(*net.TCPConn); ok {
454                 tc.SetLinger(0)
455         }
456         c := cl.newConnection(nc)
457         c.Discovery = peerSourceIncoming
458         c.uTP = utp
459         cl.runReceivedConn(c)
460 }
461
462 // Returns a handle to the given torrent, if it's present in the client.
463 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
464         cl.mu.Lock()
465         defer cl.mu.Unlock()
466         t, ok = cl.torrents[ih]
467         return
468 }
469
470 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
471         return cl.torrents[ih]
472 }
473
474 type dialResult struct {
475         Conn net.Conn
476         UTP  bool
477 }
478
479 func countDialResult(err error) {
480         if err == nil {
481                 successfulDials.Add(1)
482         } else {
483                 unsuccessfulDials.Add(1)
484         }
485 }
486
487 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
488         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
489         if ret < minDialTimeout {
490                 ret = minDialTimeout
491         }
492         return
493 }
494
495 // Returns whether an address is known to connect to a client with our own ID.
496 func (cl *Client) dopplegangerAddr(addr string) bool {
497         _, ok := cl.dopplegangerAddrs[addr]
498         return ok
499 }
500
501 // Start the process of connecting to the given peer for the given torrent if
502 // appropriate.
503 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
504         if peer.Id == cl.peerID {
505                 return
506         }
507         if cl.badPeerIPPort(peer.IP, peer.Port) {
508                 return
509         }
510         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
511         if t.addrActive(addr) {
512                 return
513         }
514         t.halfOpen[addr] = peer
515         go cl.outgoingConnection(t, addr, peer.Source)
516 }
517
518 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
519         d := net.Dialer{
520         // LocalAddr: cl.tcpListener.Addr(),
521         }
522         c, err = d.DialContext(ctx, "tcp", addr)
523         countDialResult(err)
524         if err == nil {
525                 c.(*net.TCPConn).SetLinger(0)
526         }
527         c = pproffd.WrapNetConn(c)
528         return
529 }
530
531 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
532         c, err = cl.utpSock.DialContext(ctx, addr)
533         countDialResult(err)
534         return
535 }
536
537 var (
538         dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
539         dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
540 )
541
542 // Returns a connection over UTP or TCP, whichever is first to connect.
543 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
544         ctx, cancel := context.WithCancel(ctx)
545         // As soon as we return one connection, cancel the others.
546         defer cancel()
547         left := 0
548         resCh := make(chan dialResult, left)
549         if !cl.config.DisableUTP {
550                 left++
551                 go func() {
552                         c, _ := cl.dialUTP(ctx, addr)
553                         resCh <- dialResult{c, true}
554                 }()
555         }
556         if !cl.config.DisableTCP {
557                 left++
558                 go func() {
559                         c, _ := cl.dialTCP(ctx, addr)
560                         resCh <- dialResult{c, false}
561                 }()
562         }
563         var res dialResult
564         // Wait for a successful connection.
565         for ; left > 0 && res.Conn == nil; left-- {
566                 res = <-resCh
567         }
568         if left > 0 {
569                 // There are still incompleted dials.
570                 go func() {
571                         for ; left > 0; left-- {
572                                 conn := (<-resCh).Conn
573                                 if conn != nil {
574                                         conn.Close()
575                                 }
576                         }
577                 }()
578         }
579         conn = res.Conn
580         utp = res.UTP
581         if conn != nil {
582                 if utp {
583                         dialledFirstUtp.Add(1)
584                 } else {
585                         dialledFirstNotUtp.Add(1)
586                 }
587         }
588         return
589 }
590
591 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
592         if _, ok := t.halfOpen[addr]; !ok {
593                 panic("invariant broken")
594         }
595         delete(t.halfOpen, addr)
596         cl.openNewConns(t)
597 }
598
599 // Performs initiator handshakes and returns a connection. Returns nil
600 // *connection if no connection for valid reasons.
601 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
602         c = cl.newConnection(nc)
603         c.headerEncrypted = encryptHeader
604         c.uTP = utp
605         ctx, cancel := context.WithTimeout(ctx, handshakesTimeout)
606         defer cancel()
607         dl, ok := ctx.Deadline()
608         if !ok {
609                 panic(ctx)
610         }
611         err = nc.SetDeadline(dl)
612         if err != nil {
613                 panic(err)
614         }
615         ok, err = cl.initiateHandshakes(c, t)
616         if !ok {
617                 c = nil
618         }
619         return
620 }
621
622 var (
623         initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
624         initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
625 )
626
627 // Returns nil connection and nil error if no connection could be established
628 // for valid reasons.
629 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
630         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
631         defer cancel()
632         nc, utp := cl.dialFirst(ctx, addr)
633         if nc == nil {
634                 return
635         }
636         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
637         c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
638         if err != nil {
639                 // log.Printf("error initiating connection handshakes: %s", err)
640                 nc.Close()
641                 return
642         } else if c != nil {
643                 initiatedConnWithPreferredHeaderEncryption.Add(1)
644                 return
645         }
646         nc.Close()
647         if cl.config.ForceEncryption {
648                 // We should have just tried with an obfuscated header. A plaintext
649                 // header can't result in an encrypted connection, so we're done.
650                 if !obfuscatedHeaderFirst {
651                         panic(cl.config.EncryptionPolicy)
652                 }
653                 return
654         }
655         // Try again with encryption if we didn't earlier, or without if we did,
656         // using whichever protocol type worked last time.
657         if utp {
658                 nc, err = cl.dialUTP(ctx, addr)
659         } else {
660                 nc, err = cl.dialTCP(ctx, addr)
661         }
662         if err != nil {
663                 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
664                 return
665         }
666         c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
667         if err != nil || c == nil {
668                 nc.Close()
669         }
670         if err == nil && c != nil {
671                 initiatedConnWithFallbackHeaderEncryption.Add(1)
672         }
673         return
674 }
675
676 // Called to dial out and run a connection. The addr we're given is already
677 // considered half-open.
678 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
679         c, err := cl.establishOutgoingConn(t, addr)
680         cl.mu.Lock()
681         defer cl.mu.Unlock()
682         // Don't release lock between here and addConnection, unless it's for
683         // failure.
684         cl.noLongerHalfOpen(t, addr)
685         if err != nil {
686                 if cl.config.Debug {
687                         log.Printf("error establishing outgoing connection: %s", err)
688                 }
689                 return
690         }
691         if c == nil {
692                 return
693         }
694         defer c.Close()
695         c.Discovery = ps
696         cl.runInitiatedHandshookConn(c, t)
697 }
698
699 // The port number for incoming peer connections. 0 if the client isn't
700 // listening.
701 func (cl *Client) incomingPeerPort() int {
702         if cl.listenAddr == "" {
703                 return 0
704         }
705         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
706         if err != nil {
707                 panic(err)
708         }
709         return port
710 }
711
712 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
713         if c.headerEncrypted {
714                 var rw io.ReadWriter
715                 rw, err = mse.InitiateHandshake(
716                         struct {
717                                 io.Reader
718                                 io.Writer
719                         }{c.r, c.w},
720                         t.infoHash[:],
721                         nil,
722                         func() uint32 {
723                                 switch {
724                                 case cl.config.ForceEncryption:
725                                         return mse.CryptoMethodRC4
726                                 case cl.config.DisableEncryption:
727                                         return mse.CryptoMethodPlaintext
728                                 default:
729                                         return mse.AllSupportedCrypto
730                                 }
731                         }(),
732                 )
733                 c.setRW(rw)
734                 if err != nil {
735                         return
736                 }
737         }
738         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
739         if ih != t.infoHash {
740                 ok = false
741         }
742         return
743 }
744
745 // Calls f with any secret keys.
746 func (cl *Client) forSkeys(f func([]byte) bool) {
747         cl.mu.Lock()
748         defer cl.mu.Unlock()
749         for ih := range cl.torrents {
750                 if !f(ih[:]) {
751                         break
752                 }
753         }
754 }
755
756 // Do encryption and bittorrent handshakes as receiver.
757 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
758         var rw io.ReadWriter
759         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
760         c.setRW(rw)
761         if err != nil {
762                 if err == mse.ErrNoSecretKeyMatch {
763                         err = nil
764                 }
765                 return
766         }
767         if cl.config.ForceEncryption && !c.headerEncrypted {
768                 err = errors.New("connection not encrypted")
769                 return
770         }
771         ih, ok, err := cl.connBTHandshake(c, nil)
772         if err != nil {
773                 err = fmt.Errorf("error during bt handshake: %s", err)
774                 return
775         }
776         if !ok {
777                 return
778         }
779         cl.mu.Lock()
780         t = cl.torrents[ih]
781         cl.mu.Unlock()
782         return
783 }
784
785 // Returns !ok if handshake failed for valid reasons.
786 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
787         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
788         if err != nil || !ok {
789                 return
790         }
791         ret = res.Hash
792         c.PeerExtensionBytes = res.peerExtensionBytes
793         c.PeerID = res.peerID
794         c.completedHandshake = time.Now()
795         return
796 }
797
798 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
799         if c.PeerID == cl.peerID {
800                 connsToSelf.Add(1)
801                 addr := c.conn.RemoteAddr().String()
802                 cl.dopplegangerAddrs[addr] = struct{}{}
803                 return
804         }
805         cl.runHandshookConn(c, t, true)
806 }
807
808 func (cl *Client) runReceivedConn(c *connection) {
809         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
810         if err != nil {
811                 panic(err)
812         }
813         t, err := cl.receiveHandshakes(c)
814         if err != nil {
815                 if cl.config.Debug {
816                         log.Printf("error receiving handshakes: %s", err)
817                 }
818                 return
819         }
820         if t == nil {
821                 return
822         }
823         cl.mu.Lock()
824         defer cl.mu.Unlock()
825         if c.PeerID == cl.peerID {
826                 // Because the remote address is not necessarily the same as its
827                 // client's torrent listen address, we won't record the remote address
828                 // as a doppleganger. Instead, the initiator can record *us* as the
829                 // doppleganger.
830                 return
831         }
832         cl.runHandshookConn(c, t, false)
833 }
834
835 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
836         c.conn.SetWriteDeadline(time.Time{})
837         c.r = deadlineReader{c.conn, c.r}
838         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
839         if !t.addConnection(c, outgoing) {
840                 return
841         }
842         defer t.dropConnection(c)
843         go c.writer(time.Minute)
844         cl.sendInitialMessages(c, t)
845         err := c.mainReadLoop()
846         if err != nil && cl.config.Debug {
847                 log.Printf("error during connection loop: %s", err)
848         }
849 }
850
851 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
852         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
853                 conn.Post(pp.Message{
854                         Type:       pp.Extended,
855                         ExtendedID: pp.HandshakeExtendedID,
856                         ExtendedPayload: func() []byte {
857                                 d := map[string]interface{}{
858                                         "m": func() (ret map[string]int) {
859                                                 ret = make(map[string]int, 2)
860                                                 ret["ut_metadata"] = metadataExtendedId
861                                                 if !cl.config.DisablePEX {
862                                                         ret["ut_pex"] = pexExtendedId
863                                                 }
864                                                 return
865                                         }(),
866                                         "v": extendedHandshakeClientVersion,
867                                         // No upload queue is implemented yet.
868                                         "reqq": 64,
869                                 }
870                                 if !cl.config.DisableEncryption {
871                                         d["e"] = 1
872                                 }
873                                 if torrent.metadataSizeKnown() {
874                                         d["metadata_size"] = torrent.metadataSize()
875                                 }
876                                 if p := cl.incomingPeerPort(); p != 0 {
877                                         d["p"] = p
878                                 }
879                                 yourip, err := addrCompactIP(conn.remoteAddr())
880                                 if err != nil {
881                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
882                                 } else {
883                                         d["yourip"] = yourip
884                                 }
885                                 // log.Printf("sending %v", d)
886                                 b, err := bencode.Marshal(d)
887                                 if err != nil {
888                                         panic(err)
889                                 }
890                                 return b
891                         }(),
892                 })
893         }
894         if torrent.haveAnyPieces() {
895                 conn.Bitfield(torrent.bitfield())
896         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
897                 conn.Post(pp.Message{
898                         Type: pp.HaveNone,
899                 })
900         }
901         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
902                 conn.Post(pp.Message{
903                         Type: pp.Port,
904                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
905                 })
906         }
907 }
908
909 // Process incoming ut_metadata message.
910 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
911         var d map[string]int
912         err := bencode.Unmarshal(payload, &d)
913         if err != nil {
914                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
915         }
916         msgType, ok := d["msg_type"]
917         if !ok {
918                 return errors.New("missing msg_type field")
919         }
920         piece := d["piece"]
921         switch msgType {
922         case pp.DataMetadataExtensionMsgType:
923                 if !c.requestedMetadataPiece(piece) {
924                         return fmt.Errorf("got unexpected piece %d", piece)
925                 }
926                 c.metadataRequests[piece] = false
927                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
928                 if begin < 0 || begin >= len(payload) {
929                         return fmt.Errorf("data has bad offset in payload: %d", begin)
930                 }
931                 t.saveMetadataPiece(piece, payload[begin:])
932                 c.UsefulChunksReceived++
933                 c.lastUsefulChunkReceived = time.Now()
934                 return t.maybeCompleteMetadata()
935         case pp.RequestMetadataExtensionMsgType:
936                 if !t.haveMetadataPiece(piece) {
937                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
938                         return nil
939                 }
940                 start := (1 << 14) * piece
941                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
942                 return nil
943         case pp.RejectMetadataExtensionMsgType:
944                 return nil
945         default:
946                 return errors.New("unknown msg_type value")
947         }
948 }
949
950 func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
951         // Count the chunk being sent, even if it isn't.
952         b := make([]byte, r.Length)
953         p := t.info.Piece(int(r.Index))
954         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
955         if n != len(b) {
956                 if err == nil {
957                         panic("expected error")
958                 }
959                 return
960         } else if err == io.EOF {
961                 err = nil
962         }
963         more = msg(pp.Message{
964                 Type:  pp.Piece,
965                 Index: r.Index,
966                 Begin: r.Begin,
967                 Piece: b,
968         })
969         c.chunksSent++
970         uploadChunksPosted.Add(1)
971         c.lastChunkSent = time.Now()
972         return
973 }
974
975 func (cl *Client) openNewConns(t *Torrent) {
976         defer t.updateWantPeersEvent()
977         for len(t.peers) != 0 {
978                 if !t.wantConns() {
979                         return
980                 }
981                 if len(t.halfOpen) >= cl.halfOpenLimit {
982                         return
983                 }
984                 var (
985                         k peersKey
986                         p Peer
987                 )
988                 for k, p = range t.peers {
989                         break
990                 }
991                 delete(t.peers, k)
992                 cl.initiateConn(p, t)
993         }
994 }
995
996 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
997         if port == 0 {
998                 return true
999         }
1000         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1001                 return true
1002         }
1003         if _, ok := cl.ipBlockRange(ip); ok {
1004                 return true
1005         }
1006         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1007                 return true
1008         }
1009         return false
1010 }
1011
1012 // Return a Torrent ready for insertion into a Client.
1013 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1014         // use provided storage, if provided
1015         storageClient := cl.defaultStorage
1016         if specStorage != nil {
1017                 storageClient = storage.NewClient(specStorage)
1018         }
1019
1020         t = &Torrent{
1021                 cl:       cl,
1022                 infoHash: ih,
1023                 peers:    make(map[peersKey]Peer),
1024                 conns:    make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent),
1025
1026                 halfOpen:          make(map[string]Peer),
1027                 pieceStateChanges: pubsub.NewPubSub(),
1028
1029                 storageOpener:       storageClient,
1030                 maxEstablishedConns: defaultEstablishedConnsPerTorrent,
1031
1032                 networkingEnabled: true,
1033                 requestStrategy:   2,
1034         }
1035         t.setChunkSize(defaultChunkSize)
1036         return
1037 }
1038
1039 // A file-like handle to some torrent data resource.
1040 type Handle interface {
1041         io.Reader
1042         io.Seeker
1043         io.Closer
1044         io.ReaderAt
1045 }
1046
1047 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1048         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1049 }
1050
1051 // Adds a torrent by InfoHash with a custom Storage implementation.
1052 // If the torrent already exists then this Storage is ignored and the
1053 // existing torrent returned with `new` set to `false`
1054 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1055         cl.mu.Lock()
1056         defer cl.mu.Unlock()
1057         t, ok := cl.torrents[infoHash]
1058         if ok {
1059                 return
1060         }
1061         new = true
1062         t = cl.newTorrent(infoHash, specStorage)
1063         if cl.dHT != nil {
1064                 go t.dhtAnnouncer()
1065         }
1066         cl.torrents[infoHash] = t
1067         t.updateWantPeersEvent()
1068         // Tickle Client.waitAccept, new torrent may want conns.
1069         cl.event.Broadcast()
1070         return
1071 }
1072
1073 // Add or merge a torrent spec. If the torrent is already present, the
1074 // trackers will be merged with the existing ones. If the Info isn't yet
1075 // known, it will be set. The display name is replaced if the new spec
1076 // provides one. Returns new if the torrent wasn't already in the client.
1077 // Note that any `Storage` defined on the spec will be ignored if the
1078 // torrent is already present (i.e. `new` return value is `true`)
1079 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1080         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1081         if spec.DisplayName != "" {
1082                 t.SetDisplayName(spec.DisplayName)
1083         }
1084         if spec.InfoBytes != nil {
1085                 err = t.SetInfoBytes(spec.InfoBytes)
1086                 if err != nil {
1087                         return
1088                 }
1089         }
1090         cl.mu.Lock()
1091         defer cl.mu.Unlock()
1092         if spec.ChunkSize != 0 {
1093                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1094         }
1095         t.addTrackers(spec.Trackers)
1096         t.maybeNewConns()
1097         return
1098 }
1099
1100 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1101         t, ok := cl.torrents[infoHash]
1102         if !ok {
1103                 err = fmt.Errorf("no such torrent")
1104                 return
1105         }
1106         err = t.close()
1107         if err != nil {
1108                 panic(err)
1109         }
1110         delete(cl.torrents, infoHash)
1111         return
1112 }
1113
1114 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1115         _url, err := url.Parse(announceURL)
1116         if err != nil {
1117                 return
1118         }
1119         hmp := missinggo.SplitHostMaybePort(_url.Host)
1120         if hmp.Err != nil {
1121                 err = hmp.Err
1122                 return
1123         }
1124         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1125         if err != nil {
1126                 return
1127         }
1128         cl.mu.RLock()
1129         _, blocked = cl.ipBlockRange(addr.IP)
1130         cl.mu.RUnlock()
1131         host = _url.Host
1132         hmp.Host = addr.String()
1133         _url.Host = hmp.String()
1134         urlToUse = _url.String()
1135         return
1136 }
1137
1138 func (cl *Client) allTorrentsCompleted() bool {
1139         for _, t := range cl.torrents {
1140                 if !t.haveInfo() {
1141                         return false
1142                 }
1143                 if t.numPiecesCompleted() != t.numPieces() {
1144                         return false
1145                 }
1146         }
1147         return true
1148 }
1149
1150 // Returns true when all torrents are completely downloaded and false if the
1151 // client is stopped before that.
1152 func (cl *Client) WaitAll() bool {
1153         cl.mu.Lock()
1154         defer cl.mu.Unlock()
1155         for !cl.allTorrentsCompleted() {
1156                 if cl.closed.IsSet() {
1157                         return false
1158                 }
1159                 cl.event.Wait()
1160         }
1161         return true
1162 }
1163
1164 // Returns handles to all the torrents loaded in the Client.
1165 func (cl *Client) Torrents() []*Torrent {
1166         cl.mu.Lock()
1167         defer cl.mu.Unlock()
1168         return cl.torrentsAsSlice()
1169 }
1170
1171 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1172         for _, t := range cl.torrents {
1173                 ret = append(ret, t)
1174         }
1175         return
1176 }
1177
1178 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1179         spec, err := TorrentSpecFromMagnetURI(uri)
1180         if err != nil {
1181                 return
1182         }
1183         T, _, err = cl.AddTorrentSpec(spec)
1184         return
1185 }
1186
1187 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1188         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1189         var ss []string
1190         slices.MakeInto(&ss, mi.Nodes)
1191         cl.AddDHTNodes(ss)
1192         return
1193 }
1194
1195 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1196         mi, err := metainfo.LoadFromFile(filename)
1197         if err != nil {
1198                 return
1199         }
1200         return cl.AddTorrent(mi)
1201 }
1202
1203 func (cl *Client) DHT() *dht.Server {
1204         return cl.dHT
1205 }
1206
1207 func (cl *Client) AddDHTNodes(nodes []string) {
1208         if cl.DHT() == nil {
1209                 return
1210         }
1211         for _, n := range nodes {
1212                 hmp := missinggo.SplitHostMaybePort(n)
1213                 ip := net.ParseIP(hmp.Host)
1214                 if ip == nil {
1215                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1216                         continue
1217                 }
1218                 ni := krpc.NodeInfo{
1219                         Addr: &net.UDPAddr{
1220                                 IP:   ip,
1221                                 Port: hmp.Port,
1222                         },
1223                 }
1224                 cl.DHT().AddNode(ni)
1225         }
1226 }
1227
1228 func (cl *Client) banPeerIP(ip net.IP) {
1229         if cl.badPeerIPs == nil {
1230                 cl.badPeerIPs = make(map[string]struct{})
1231         }
1232         cl.badPeerIPs[ip.String()] = struct{}{}
1233 }
1234
1235 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1236         c = &connection{
1237                 conn: nc,
1238
1239                 Choked:          true,
1240                 PeerChoked:      true,
1241                 PeerMaxRequests: 250,
1242         }
1243         c.writerCond.L = &cl.mu
1244         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1245         c.r = rateLimitedReader{cl.downloadLimit, c.r}
1246         return
1247 }
1248
1249 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1250         cl.mu.Lock()
1251         defer cl.mu.Unlock()
1252         t := cl.torrent(ih)
1253         if t == nil {
1254                 return
1255         }
1256         t.addPeers([]Peer{{
1257                 IP:     p.IP,
1258                 Port:   p.Port,
1259                 Source: peerSourceDHTAnnouncePeer,
1260         }})
1261 }