]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Track metadata chunks read with its own Count
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "strconv"
14         "strings"
15         "time"
16
17         "github.com/anacrolix/dht"
18         "github.com/anacrolix/dht/krpc"
19         "github.com/anacrolix/log"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pproffd"
24         "github.com/anacrolix/missinggo/pubsub"
25         "github.com/anacrolix/missinggo/slices"
26         "github.com/anacrolix/sync"
27         "github.com/davecgh/go-spew/spew"
28         "github.com/dustin/go-humanize"
29         "github.com/google/btree"
30
31         "github.com/anacrolix/torrent/bencode"
32         "github.com/anacrolix/torrent/iplist"
33         "github.com/anacrolix/torrent/metainfo"
34         "github.com/anacrolix/torrent/mse"
35         pp "github.com/anacrolix/torrent/peer_protocol"
36         "github.com/anacrolix/torrent/storage"
37 )
38
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
41 type Client struct {
42         // An aggregate of stats over all connections. First in struct to ensure
43         // 64-bit alignment of fields. See #262.
44         stats  ConnStats
45         mu     sync.RWMutex
46         event  sync.Cond
47         closed missinggo.Event
48
49         config *ClientConfig
50         logger *log.Logger
51
52         peerID         PeerID
53         defaultStorage *storage.Client
54         onClose        []func()
55         conns          []socket
56         dhtServers     []*dht.Server
57         ipBlockList    iplist.Ranger
58         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
59         extensionBytes pp.PeerExtensionBits
60
61         // Set of addresses that have our client ID. This intentionally will
62         // include ourselves if we end up trying to connect to our own address
63         // through legitimate channels.
64         dopplegangerAddrs map[string]struct{}
65         badPeerIPs        map[string]struct{}
66         torrents          map[InfoHash]*Torrent
67
68         acceptLimiter map[ipStr]int
69 }
70
71 type ipStr string
72
73 func (cl *Client) BadPeerIPs() []string {
74         cl.mu.RLock()
75         defer cl.mu.RUnlock()
76         return cl.badPeerIPsLocked()
77 }
78
79 func (cl *Client) badPeerIPsLocked() []string {
80         return slices.FromMapKeys(cl.badPeerIPs).([]string)
81 }
82
83 func (cl *Client) PeerID() PeerID {
84         return cl.peerID
85 }
86
87 type torrentAddr string
88
89 func (torrentAddr) Network() string { return "" }
90
91 func (me torrentAddr) String() string { return string(me) }
92
93 func (cl *Client) LocalPort() (port int) {
94         cl.eachListener(func(l socket) bool {
95                 _port := missinggo.AddrPort(l.Addr())
96                 if _port == 0 {
97                         panic(l)
98                 }
99                 if port == 0 {
100                         port = _port
101                 } else if port != _port {
102                         panic("mismatched ports")
103                 }
104                 return true
105         })
106         return
107 }
108
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110         dhtStats := s.Stats()
111         fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112         fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113         fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114         fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
115 }
116
117 // Writes out a human readable status of the client, such as for writing to a
118 // HTTP status page.
119 func (cl *Client) WriteStatus(_w io.Writer) {
120         cl.mu.RLock()
121         defer cl.mu.RUnlock()
122         w := bufio.NewWriter(_w)
123         defer w.Flush()
124         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128         cl.eachDhtServer(func(s *dht.Server) {
129                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130                 writeDhtServerStatus(w, s)
131         })
132         spew.Fdump(w, cl.stats)
133         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134         fmt.Fprintln(w)
135         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136                 return l.InfoHash().AsString() < r.InfoHash().AsString()
137         }).([]*Torrent) {
138                 if t.name() == "" {
139                         fmt.Fprint(w, "<unknown name>")
140                 } else {
141                         fmt.Fprint(w, t.name())
142                 }
143                 fmt.Fprint(w, "\n")
144                 if t.info != nil {
145                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
146                 } else {
147                         w.WriteString("<missing metainfo>")
148                 }
149                 fmt.Fprint(w, "\n")
150                 t.writeStatus(w)
151                 fmt.Fprintln(w)
152         }
153 }
154
155 const debugLogValue = "debug"
156
157 func (cl *Client) debugLogFilter(m *log.Msg) bool {
158         if !cl.config.Debug {
159                 _, ok := m.Values()[debugLogValue]
160                 return !ok
161         }
162         return true
163 }
164
165 func (cl *Client) initLogger() {
166         cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
167 }
168
169 func (cl *Client) announceKey() int32 {
170         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
171 }
172
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
174         if cfg == nil {
175                 cfg = NewDefaultClientConfig()
176         }
177         defer func() {
178                 if err != nil {
179                         cl = nil
180                 }
181         }()
182         cl = &Client{
183                 config:            cfg,
184                 dopplegangerAddrs: make(map[string]struct{}),
185                 torrents:          make(map[metainfo.Hash]*Torrent),
186         }
187         go cl.acceptLimitClearer()
188         cl.initLogger()
189         defer func() {
190                 if err == nil {
191                         return
192                 }
193                 cl.Close()
194         }()
195         cl.extensionBytes = defaultPeerExtensionBytes()
196         cl.event.L = &cl.mu
197         storageImpl := cfg.DefaultStorage
198         if storageImpl == nil {
199                 // We'd use mmap but HFS+ doesn't support sparse files.
200                 storageImpl = storage.NewFile(cfg.DataDir)
201                 cl.onClose = append(cl.onClose, func() {
202                         if err := storageImpl.Close(); err != nil {
203                                 log.Printf("error closing default storage: %s", err)
204                         }
205                 })
206         }
207         cl.defaultStorage = storage.NewClient(storageImpl)
208         if cfg.IPBlocklist != nil {
209                 cl.ipBlockList = cfg.IPBlocklist
210         }
211
212         if cfg.PeerID != "" {
213                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
214         } else {
215                 o := copy(cl.peerID[:], cfg.Bep20)
216                 _, err = rand.Read(cl.peerID[o:])
217                 if err != nil {
218                         panic("error generating peer id")
219                 }
220         }
221
222         cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
223         if err != nil {
224                 return
225         }
226         // Check for panics.
227         cl.LocalPort()
228
229         for _, s := range cl.conns {
230                 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
231                         go cl.acceptConnections(s)
232                 }
233         }
234
235         go cl.forwardPort()
236         if !cfg.NoDHT {
237                 for _, s := range cl.conns {
238                         if pc, ok := s.(net.PacketConn); ok {
239                                 ds, err := cl.newDhtServer(pc)
240                                 if err != nil {
241                                         panic(err)
242                                 }
243                                 cl.dhtServers = append(cl.dhtServers, ds)
244                         }
245                 }
246         }
247
248         return
249 }
250
251 func (cl *Client) enabledPeerNetworks() (ns []string) {
252         for _, n := range allPeerNetworks {
253                 if peerNetworkEnabled(n, cl.config) {
254                         ns = append(ns, n)
255                 }
256         }
257         return
258 }
259
260 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
261         cfg := dht.ServerConfig{
262                 IPBlocklist:    cl.ipBlockList,
263                 Conn:           conn,
264                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
265                 PublicIP: func() net.IP {
266                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
267                                 return cl.config.PublicIp6
268                         }
269                         return cl.config.PublicIp4
270                 }(),
271                 StartingNodes: cl.config.DhtStartingNodes,
272         }
273         s, err = dht.NewServer(&cfg)
274         if err == nil {
275                 go func() {
276                         if _, err := s.Bootstrap(); err != nil {
277                                 log.Printf("error bootstrapping dht: %s", err)
278                         }
279                 }()
280         }
281         return
282 }
283
284 func firstNonEmptyString(ss ...string) string {
285         for _, s := range ss {
286                 if s != "" {
287                         return s
288                 }
289         }
290         return ""
291 }
292
293 func (cl *Client) Closed() <-chan struct{} {
294         cl.mu.Lock()
295         defer cl.mu.Unlock()
296         return cl.closed.C()
297 }
298
299 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
300         for _, ds := range cl.dhtServers {
301                 f(ds)
302         }
303 }
304
305 func (cl *Client) closeSockets() {
306         cl.eachListener(func(l socket) bool {
307                 l.Close()
308                 return true
309         })
310         cl.conns = nil
311 }
312
313 // Stops the client. All connections to peers are closed and all activity will
314 // come to a halt.
315 func (cl *Client) Close() {
316         cl.mu.Lock()
317         defer cl.mu.Unlock()
318         cl.closed.Set()
319         cl.eachDhtServer(func(s *dht.Server) { s.Close() })
320         cl.closeSockets()
321         for _, t := range cl.torrents {
322                 t.close()
323         }
324         for _, f := range cl.onClose {
325                 f()
326         }
327         cl.event.Broadcast()
328 }
329
330 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
331         if cl.ipBlockList == nil {
332                 return
333         }
334         return cl.ipBlockList.Lookup(ip)
335 }
336
337 func (cl *Client) ipIsBlocked(ip net.IP) bool {
338         _, blocked := cl.ipBlockRange(ip)
339         return blocked
340 }
341
342 func (cl *Client) waitAccept() {
343         for {
344                 for _, t := range cl.torrents {
345                         if t.wantConns() {
346                                 return
347                         }
348                 }
349                 if cl.closed.IsSet() {
350                         return
351                 }
352                 cl.event.Wait()
353         }
354 }
355
356 func (cl *Client) rejectAccepted(conn net.Conn) bool {
357         ra := conn.RemoteAddr()
358         rip := missinggo.AddrIP(ra)
359         if cl.config.DisableIPv4Peers && rip.To4() != nil {
360                 return true
361         }
362         if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
363                 return true
364         }
365         if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
366                 return true
367         }
368         if cl.rateLimitAccept(rip) {
369                 return true
370         }
371         return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
372 }
373
374 func (cl *Client) acceptConnections(l net.Listener) {
375         for {
376                 conn, err := l.Accept()
377                 conn = pproffd.WrapNetConn(conn)
378                 cl.mu.RLock()
379                 closed := cl.closed.IsSet()
380                 reject := false
381                 if conn != nil {
382                         reject = cl.rejectAccepted(conn)
383                 }
384                 cl.mu.RUnlock()
385                 if closed {
386                         if conn != nil {
387                                 conn.Close()
388                         }
389                         return
390                 }
391                 if err != nil {
392                         log.Print(err)
393                         // I think something harsher should happen here? Our accept
394                         // routine just fucked off.
395                         return
396                 }
397                 go func() {
398                         if reject {
399                                 torrent.Add("rejected accepted connections", 1)
400                                 conn.Close()
401                         } else {
402                                 go cl.incomingConnection(conn)
403                         }
404                         log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
405                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
406                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
407                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
408                 }()
409         }
410 }
411
412 func (cl *Client) incomingConnection(nc net.Conn) {
413         defer nc.Close()
414         if tc, ok := nc.(*net.TCPConn); ok {
415                 tc.SetLinger(0)
416         }
417         c := cl.newConnection(nc, false)
418         c.Discovery = peerSourceIncoming
419         cl.runReceivedConn(c)
420 }
421
422 // Returns a handle to the given torrent, if it's present in the client.
423 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
424         cl.mu.Lock()
425         defer cl.mu.Unlock()
426         t, ok = cl.torrents[ih]
427         return
428 }
429
430 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
431         return cl.torrents[ih]
432 }
433
434 type dialResult struct {
435         Conn net.Conn
436 }
437
438 func countDialResult(err error) {
439         if err == nil {
440                 torrent.Add("successful dials", 1)
441         } else {
442                 torrent.Add("unsuccessful dials", 1)
443         }
444 }
445
446 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
447         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
448         if ret < minDialTimeout {
449                 ret = minDialTimeout
450         }
451         return
452 }
453
454 // Returns whether an address is known to connect to a client with our own ID.
455 func (cl *Client) dopplegangerAddr(addr string) bool {
456         _, ok := cl.dopplegangerAddrs[addr]
457         return ok
458 }
459
460 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
461         switch {
462         case allowIpv4 && allowIpv6:
463                 return ""
464         case allowIpv4 && !allowIpv6:
465                 return "4"
466         case !allowIpv4 && allowIpv6:
467                 return "6"
468         default:
469                 panic("unhandled ip network combination")
470         }
471 }
472
473 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
474         return sock.DialContext(ctx, "", addr)
475 }
476
477 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
478
479 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
480         c := func(s string) bool {
481                 return strings.Contains(network, s)
482         }
483         if cfg.DisableUTP {
484                 if c("udp") || c("utp") {
485                         return false
486                 }
487         }
488         if cfg.DisableTCP && c("tcp") {
489                 return false
490         }
491         if cfg.DisableIPv6 && c("6") {
492                 return false
493         }
494         return true
495 }
496
497 // Returns a connection over UTP or TCP, whichever is first to connect.
498 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
499         ctx, cancel := context.WithCancel(ctx)
500         // As soon as we return one connection, cancel the others.
501         defer cancel()
502         left := 0
503         resCh := make(chan dialResult, left)
504         dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
505                 left++
506                 go func() {
507                         c, err := f(ctx, addr)
508                         // This is a bit optimistic, but it looks non-trivial to thread
509                         // this through the proxy code. Set it now in case we close the
510                         // connection forthwith.
511                         if tc, ok := c.(*net.TCPConn); ok {
512                                 tc.SetLinger(0)
513                         }
514                         countDialResult(err)
515                         resCh <- dialResult{c}
516                 }()
517         }
518         func() {
519                 cl.mu.Lock()
520                 defer cl.mu.Unlock()
521                 cl.eachListener(func(s socket) bool {
522                         if peerNetworkEnabled(s.Addr().Network(), cl.config) {
523                                 dial(s.dial)
524                         }
525                         return true
526                 })
527         }()
528         var res dialResult
529         // Wait for a successful connection.
530         func() {
531                 defer perf.ScopeTimer()()
532                 for ; left > 0 && res.Conn == nil; left-- {
533                         res = <-resCh
534                 }
535         }()
536         // There are still incompleted dials.
537         go func() {
538                 for ; left > 0; left-- {
539                         conn := (<-resCh).Conn
540                         if conn != nil {
541                                 conn.Close()
542                         }
543                 }
544         }()
545         if res.Conn != nil {
546                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
547         }
548         return res.Conn
549 }
550
551 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
552         if _, ok := t.halfOpen[addr]; !ok {
553                 panic("invariant broken")
554         }
555         delete(t.halfOpen, addr)
556         t.openNewConns()
557 }
558
559 // Performs initiator handshakes and returns a connection. Returns nil
560 // *connection if no connection for valid reasons.
561 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
562         c = cl.newConnection(nc, true)
563         c.headerEncrypted = encryptHeader
564         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
565         defer cancel()
566         dl, ok := ctx.Deadline()
567         if !ok {
568                 panic(ctx)
569         }
570         err = nc.SetDeadline(dl)
571         if err != nil {
572                 panic(err)
573         }
574         ok, err = cl.initiateHandshakes(c, t)
575         if !ok {
576                 c = nil
577         }
578         return
579 }
580
581 // Returns nil connection and nil error if no connection could be established
582 // for valid reasons.
583 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
584         nc := cl.dialFirst(ctx, addr)
585         if nc == nil {
586                 return
587         }
588         defer func() {
589                 if c == nil || err != nil {
590                         nc.Close()
591                 }
592         }()
593         return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
594 }
595
596 // Returns nil connection and nil error if no connection could be established
597 // for valid reasons.
598 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
599         ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
600                 cl.mu.RLock()
601                 defer cl.mu.RUnlock()
602                 return t.dialTimeout()
603         }())
604         defer cancel()
605         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
606         c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
607         if err != nil {
608                 return
609         }
610         if c != nil {
611                 torrent.Add("initiated conn with preferred header obfuscation", 1)
612                 return
613         }
614         if cl.config.ForceEncryption {
615                 // We should have just tried with an obfuscated header. A plaintext
616                 // header can't result in an encrypted connection, so we're done.
617                 if !obfuscatedHeaderFirst {
618                         panic(cl.config.EncryptionPolicy)
619                 }
620                 return
621         }
622         // Try again with encryption if we didn't earlier, or without if we did.
623         c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
624         if c != nil {
625                 torrent.Add("initiated conn with fallback header obfuscation", 1)
626         }
627         return
628 }
629
630 // Called to dial out and run a connection. The addr we're given is already
631 // considered half-open.
632 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
633         c, err := cl.establishOutgoingConn(t, addr)
634         cl.mu.Lock()
635         defer cl.mu.Unlock()
636         // Don't release lock between here and addConnection, unless it's for
637         // failure.
638         cl.noLongerHalfOpen(t, addr)
639         if err != nil {
640                 if cl.config.Debug {
641                         log.Printf("error establishing outgoing connection: %s", err)
642                 }
643                 return
644         }
645         if c == nil {
646                 return
647         }
648         defer c.Close()
649         c.Discovery = ps
650         cl.runHandshookConn(c, t)
651 }
652
653 // The port number for incoming peer connections. 0 if the client isn't
654 // listening.
655 func (cl *Client) incomingPeerPort() int {
656         return cl.LocalPort()
657 }
658
659 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
660         if c.headerEncrypted {
661                 var rw io.ReadWriter
662                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
663                         struct {
664                                 io.Reader
665                                 io.Writer
666                         }{c.r, c.w},
667                         t.infoHash[:],
668                         nil,
669                         func() mse.CryptoMethod {
670                                 switch {
671                                 case cl.config.ForceEncryption:
672                                         return mse.CryptoMethodRC4
673                                 case cl.config.DisableEncryption:
674                                         return mse.CryptoMethodPlaintext
675                                 default:
676                                         return mse.AllSupportedCrypto
677                                 }
678                         }(),
679                 )
680                 c.setRW(rw)
681                 if err != nil {
682                         return
683                 }
684         }
685         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
686         if ih != t.infoHash {
687                 ok = false
688         }
689         return
690 }
691
692 // Calls f with any secret keys.
693 func (cl *Client) forSkeys(f func([]byte) bool) {
694         cl.mu.Lock()
695         defer cl.mu.Unlock()
696         for ih := range cl.torrents {
697                 if !f(ih[:]) {
698                         break
699                 }
700         }
701 }
702
703 // Do encryption and bittorrent handshakes as receiver.
704 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
705         defer perf.ScopeTimerErr(&err)()
706         var rw io.ReadWriter
707         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
708         c.setRW(rw)
709         if err != nil {
710                 if err == mse.ErrNoSecretKeyMatch {
711                         err = nil
712                 }
713                 return
714         }
715         if cl.config.ForceEncryption && !c.headerEncrypted {
716                 err = errors.New("connection not encrypted")
717                 return
718         }
719         ih, ok, err := cl.connBTHandshake(c, nil)
720         if err != nil {
721                 err = fmt.Errorf("error during bt handshake: %s", err)
722                 return
723         }
724         if !ok {
725                 return
726         }
727         cl.mu.Lock()
728         t = cl.torrents[ih]
729         cl.mu.Unlock()
730         return
731 }
732
733 // Returns !ok if handshake failed for valid reasons.
734 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
735         res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
736         if err != nil || !ok {
737                 return
738         }
739         ret = res.Hash
740         c.PeerExtensionBytes = res.PeerExtensionBits
741         c.PeerID = res.PeerID
742         c.completedHandshake = time.Now()
743         return
744 }
745
746 func (cl *Client) runReceivedConn(c *connection) {
747         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
748         if err != nil {
749                 panic(err)
750         }
751         t, err := cl.receiveHandshakes(c)
752         if err != nil {
753                 log.Fmsg(
754                         "error receiving handshakes: %s", err,
755                 ).AddValue(
756                         debugLogValue,
757                 ).Add(
758                         "network", c.remoteAddr().Network(),
759                 ).Log(cl.logger)
760                 torrent.Add("error receiving handshake", 1)
761                 cl.mu.Lock()
762                 cl.onBadAccept(c.remoteAddr())
763                 cl.mu.Unlock()
764                 return
765         }
766         if t == nil {
767                 torrent.Add("received handshake for unloaded torrent", 1)
768                 cl.mu.Lock()
769                 cl.onBadAccept(c.remoteAddr())
770                 cl.mu.Unlock()
771                 return
772         }
773         torrent.Add("received handshake for loaded torrent", 1)
774         cl.mu.Lock()
775         defer cl.mu.Unlock()
776         cl.runHandshookConn(c, t)
777 }
778
779 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
780         c.setTorrent(t)
781         if c.PeerID == cl.peerID {
782                 if c.outgoing {
783                         connsToSelf.Add(1)
784                         addr := c.conn.RemoteAddr().String()
785                         cl.dopplegangerAddrs[addr] = struct{}{}
786                 } else {
787                         // Because the remote address is not necessarily the same as its
788                         // client's torrent listen address, we won't record the remote address
789                         // as a doppleganger. Instead, the initiator can record *us* as the
790                         // doppleganger.
791                 }
792                 return
793         }
794         c.conn.SetWriteDeadline(time.Time{})
795         c.r = deadlineReader{c.conn, c.r}
796         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
797         if connIsIpv6(c.conn) {
798                 torrent.Add("completed handshake over ipv6", 1)
799         }
800         if err := t.addConnection(c); err != nil {
801                 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
802                 return
803         }
804         defer t.dropConnection(c)
805         go c.writer(time.Minute)
806         cl.sendInitialMessages(c, t)
807         err := c.mainReadLoop()
808         if err != nil && cl.config.Debug {
809                 log.Printf("error during connection main read loop: %s", err)
810         }
811 }
812
813 // See the order given in Transmission's tr_peerMsgsNew.
814 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
815         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
816                 conn.Post(pp.Message{
817                         Type:       pp.Extended,
818                         ExtendedID: pp.HandshakeExtendedID,
819                         ExtendedPayload: func() []byte {
820                                 msg := pp.ExtendedHandshakeMessage{
821                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
822                                                 pp.ExtensionNameMetadata: metadataExtendedId,
823                                         },
824                                         V:            cl.config.ExtendedHandshakeClientVersion,
825                                         Reqq:         64, // TODO: Really?
826                                         YourIp:       pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
827                                         Encryption:   !cl.config.DisableEncryption,
828                                         Port:         cl.incomingPeerPort(),
829                                         MetadataSize: torrent.metadataSize(),
830                                         // TODO: We can figured these out specific to the socket
831                                         // used.
832                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
833                                         Ipv6: cl.config.PublicIp6.To16(),
834                                 }
835                                 if !cl.config.DisablePEX {
836                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
837                                 }
838                                 return bencode.MustMarshal(msg)
839                         }(),
840                 })
841         }
842         func() {
843                 if conn.fastEnabled() {
844                         if torrent.haveAllPieces() {
845                                 conn.Post(pp.Message{Type: pp.HaveAll})
846                                 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
847                                 return
848                         } else if !torrent.haveAnyPieces() {
849                                 conn.Post(pp.Message{Type: pp.HaveNone})
850                                 conn.sentHaves.Clear()
851                                 return
852                         }
853                 }
854                 conn.PostBitfield()
855         }()
856         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
857                 conn.Post(pp.Message{
858                         Type: pp.Port,
859                         Port: cl.dhtPort(),
860                 })
861         }
862 }
863
864 func (cl *Client) dhtPort() (ret uint16) {
865         cl.eachDhtServer(func(s *dht.Server) {
866                 ret = uint16(missinggo.AddrPort(s.Addr()))
867         })
868         return
869 }
870
871 func (cl *Client) haveDhtServer() (ret bool) {
872         cl.eachDhtServer(func(_ *dht.Server) {
873                 ret = true
874         })
875         return
876 }
877
878 // Process incoming ut_metadata message.
879 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
880         var d map[string]int
881         err := bencode.Unmarshal(payload, &d)
882         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
883         } else if err != nil {
884                 return fmt.Errorf("error unmarshalling bencode: %s", err)
885         }
886         msgType, ok := d["msg_type"]
887         if !ok {
888                 return errors.New("missing msg_type field")
889         }
890         piece := d["piece"]
891         switch msgType {
892         case pp.DataMetadataExtensionMsgType:
893                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
894                 if !c.requestedMetadataPiece(piece) {
895                         return fmt.Errorf("got unexpected piece %d", piece)
896                 }
897                 c.metadataRequests[piece] = false
898                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
899                 if begin < 0 || begin >= len(payload) {
900                         return fmt.Errorf("data has bad offset in payload: %d", begin)
901                 }
902                 t.saveMetadataPiece(piece, payload[begin:])
903                 c.lastUsefulChunkReceived = time.Now()
904                 return t.maybeCompleteMetadata()
905         case pp.RequestMetadataExtensionMsgType:
906                 if !t.haveMetadataPiece(piece) {
907                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
908                         return nil
909                 }
910                 start := (1 << 14) * piece
911                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
912                 return nil
913         case pp.RejectMetadataExtensionMsgType:
914                 return nil
915         default:
916                 return errors.New("unknown msg_type value")
917         }
918 }
919
920 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
921         if port == 0 {
922                 return true
923         }
924         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
925                 return true
926         }
927         if _, ok := cl.ipBlockRange(ip); ok {
928                 return true
929         }
930         if _, ok := cl.badPeerIPs[ip.String()]; ok {
931                 return true
932         }
933         return false
934 }
935
936 // Return a Torrent ready for insertion into a Client.
937 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
938         // use provided storage, if provided
939         storageClient := cl.defaultStorage
940         if specStorage != nil {
941                 storageClient = storage.NewClient(specStorage)
942         }
943
944         t = &Torrent{
945                 cl:       cl,
946                 infoHash: ih,
947                 peers: prioritizedPeers{
948                         om: btree.New(32),
949                         getPrio: func(p Peer) peerPriority {
950                                 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
951                         },
952                 },
953                 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
954
955                 halfOpen:          make(map[string]Peer),
956                 pieceStateChanges: pubsub.NewPubSub(),
957
958                 storageOpener:       storageClient,
959                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
960
961                 networkingEnabled: true,
962                 requestStrategy:   3,
963                 metadataChanged: sync.Cond{
964                         L: &cl.mu,
965                 },
966                 duplicateRequestTimeout: 1 * time.Second,
967         }
968         t.logger = cl.logger.Clone().AddValue(t)
969         t.setChunkSize(defaultChunkSize)
970         return
971 }
972
973 // A file-like handle to some torrent data resource.
974 type Handle interface {
975         io.Reader
976         io.Seeker
977         io.Closer
978         io.ReaderAt
979 }
980
981 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
982         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
983 }
984
985 // Adds a torrent by InfoHash with a custom Storage implementation.
986 // If the torrent already exists then this Storage is ignored and the
987 // existing torrent returned with `new` set to `false`
988 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
989         cl.mu.Lock()
990         defer cl.mu.Unlock()
991         t, ok := cl.torrents[infoHash]
992         if ok {
993                 return
994         }
995         new = true
996
997         t = cl.newTorrent(infoHash, specStorage)
998         cl.eachDhtServer(func(s *dht.Server) {
999                 go t.dhtAnnouncer(s)
1000         })
1001         cl.torrents[infoHash] = t
1002         cl.clearAcceptLimits()
1003         t.updateWantPeersEvent()
1004         // Tickle Client.waitAccept, new torrent may want conns.
1005         cl.event.Broadcast()
1006         return
1007 }
1008
1009 // Add or merge a torrent spec. If the torrent is already present, the
1010 // trackers will be merged with the existing ones. If the Info isn't yet
1011 // known, it will be set. The display name is replaced if the new spec
1012 // provides one. Returns new if the torrent wasn't already in the client.
1013 // Note that any `Storage` defined on the spec will be ignored if the
1014 // torrent is already present (i.e. `new` return value is `true`)
1015 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1016         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1017         if spec.DisplayName != "" {
1018                 t.SetDisplayName(spec.DisplayName)
1019         }
1020         if spec.InfoBytes != nil {
1021                 err = t.SetInfoBytes(spec.InfoBytes)
1022                 if err != nil {
1023                         return
1024                 }
1025         }
1026         cl.mu.Lock()
1027         defer cl.mu.Unlock()
1028         if spec.ChunkSize != 0 {
1029                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1030         }
1031         t.addTrackers(spec.Trackers)
1032         t.maybeNewConns()
1033         return
1034 }
1035
1036 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1037         t, ok := cl.torrents[infoHash]
1038         if !ok {
1039                 err = fmt.Errorf("no such torrent")
1040                 return
1041         }
1042         err = t.close()
1043         if err != nil {
1044                 panic(err)
1045         }
1046         delete(cl.torrents, infoHash)
1047         return
1048 }
1049
1050 func (cl *Client) allTorrentsCompleted() bool {
1051         for _, t := range cl.torrents {
1052                 if !t.haveInfo() {
1053                         return false
1054                 }
1055                 if !t.haveAllPieces() {
1056                         return false
1057                 }
1058         }
1059         return true
1060 }
1061
1062 // Returns true when all torrents are completely downloaded and false if the
1063 // client is stopped before that.
1064 func (cl *Client) WaitAll() bool {
1065         cl.mu.Lock()
1066         defer cl.mu.Unlock()
1067         for !cl.allTorrentsCompleted() {
1068                 if cl.closed.IsSet() {
1069                         return false
1070                 }
1071                 cl.event.Wait()
1072         }
1073         return true
1074 }
1075
1076 // Returns handles to all the torrents loaded in the Client.
1077 func (cl *Client) Torrents() []*Torrent {
1078         cl.mu.Lock()
1079         defer cl.mu.Unlock()
1080         return cl.torrentsAsSlice()
1081 }
1082
1083 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1084         for _, t := range cl.torrents {
1085                 ret = append(ret, t)
1086         }
1087         return
1088 }
1089
1090 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1091         spec, err := TorrentSpecFromMagnetURI(uri)
1092         if err != nil {
1093                 return
1094         }
1095         T, _, err = cl.AddTorrentSpec(spec)
1096         return
1097 }
1098
1099 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1100         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1101         var ss []string
1102         slices.MakeInto(&ss, mi.Nodes)
1103         cl.AddDHTNodes(ss)
1104         return
1105 }
1106
1107 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1108         mi, err := metainfo.LoadFromFile(filename)
1109         if err != nil {
1110                 return
1111         }
1112         return cl.AddTorrent(mi)
1113 }
1114
1115 func (cl *Client) DhtServers() []*dht.Server {
1116         return cl.dhtServers
1117 }
1118
1119 func (cl *Client) AddDHTNodes(nodes []string) {
1120         for _, n := range nodes {
1121                 hmp := missinggo.SplitHostMaybePort(n)
1122                 ip := net.ParseIP(hmp.Host)
1123                 if ip == nil {
1124                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1125                         continue
1126                 }
1127                 ni := krpc.NodeInfo{
1128                         Addr: krpc.NodeAddr{
1129                                 IP:   ip,
1130                                 Port: hmp.Port,
1131                         },
1132                 }
1133                 cl.eachDhtServer(func(s *dht.Server) {
1134                         s.AddNode(ni)
1135                 })
1136         }
1137 }
1138
1139 func (cl *Client) banPeerIP(ip net.IP) {
1140         if cl.badPeerIPs == nil {
1141                 cl.badPeerIPs = make(map[string]struct{})
1142         }
1143         cl.badPeerIPs[ip.String()] = struct{}{}
1144 }
1145
1146 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1147         c = &connection{
1148                 conn:            nc,
1149                 outgoing:        outgoing,
1150                 Choked:          true,
1151                 PeerChoked:      true,
1152                 PeerMaxRequests: 250,
1153                 writeBuffer:     new(bytes.Buffer),
1154         }
1155         c.writerCond.L = &cl.mu
1156         c.setRW(connStatsReadWriter{nc, c})
1157         c.r = &rateLimitedReader{
1158                 l: cl.config.DownloadRateLimiter,
1159                 r: c.r,
1160         }
1161         return
1162 }
1163
1164 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1165         cl.mu.Lock()
1166         defer cl.mu.Unlock()
1167         t := cl.torrent(ih)
1168         if t == nil {
1169                 return
1170         }
1171         t.addPeers([]Peer{{
1172                 IP:     p.IP,
1173                 Port:   p.Port,
1174                 Source: peerSourceDHTAnnouncePeer,
1175         }})
1176 }
1177
1178 func firstNotNil(ips ...net.IP) net.IP {
1179         for _, ip := range ips {
1180                 if ip != nil {
1181                         return ip
1182                 }
1183         }
1184         return nil
1185 }
1186
1187 func (cl *Client) eachListener(f func(socket) bool) {
1188         for _, s := range cl.conns {
1189                 if !f(s) {
1190                         break
1191                 }
1192         }
1193 }
1194
1195 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1196         cl.eachListener(func(l socket) bool {
1197                 ret = l
1198                 return !f(l)
1199         })
1200         return
1201 }
1202
1203 func (cl *Client) publicIp(peer net.IP) net.IP {
1204         // TODO: Use BEP 10 to determine how peers are seeing us.
1205         if peer.To4() != nil {
1206                 return firstNotNil(
1207                         cl.config.PublicIp4,
1208                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1209                 )
1210         } else {
1211                 return firstNotNil(
1212                         cl.config.PublicIp6,
1213                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1214                 )
1215         }
1216 }
1217
1218 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1219         return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1220                 return f(missinggo.AddrIP(l.Addr()))
1221         }).Addr())
1222 }
1223
1224 // Our IP as a peer should see it.
1225 func (cl *Client) publicAddr(peer net.IP) ipPort {
1226         return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1227 }
1228
1229 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1230         cl.mu.Lock()
1231         defer cl.mu.Unlock()
1232         cl.eachListener(func(l socket) bool {
1233                 ret = append(ret, l.Addr())
1234                 return true
1235         })
1236         return
1237 }
1238
1239 func (cl *Client) onBadAccept(addr net.Addr) {
1240         ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1241         if cl.acceptLimiter == nil {
1242                 cl.acceptLimiter = make(map[ipStr]int)
1243         }
1244         cl.acceptLimiter[ipStr(ip.String())]++
1245 }
1246
1247 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1248         if ip4 := ip.To4(); ip4 != nil {
1249                 return ip4.Mask(net.CIDRMask(24, 32))
1250         }
1251         return ip
1252 }
1253
1254 func (cl *Client) clearAcceptLimits() {
1255         cl.acceptLimiter = nil
1256 }
1257
1258 func (cl *Client) acceptLimitClearer() {
1259         for {
1260                 select {
1261                 case <-cl.closed.LockedChan(&cl.mu):
1262                         return
1263                 case <-time.After(15 * time.Minute):
1264                         cl.mu.Lock()
1265                         cl.clearAcceptLimits()
1266                         cl.mu.Unlock()
1267                 }
1268         }
1269 }
1270
1271 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1272         if cl.config.DisableAcceptRateLimiting {
1273                 return false
1274         }
1275         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1276 }