]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
4815d53a8d40917057742d6c1bc3243cc45f8022
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "context"
6         "crypto/rand"
7         "encoding/binary"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "math"
13         "net"
14         "net/http"
15         "sort"
16         "strconv"
17         "strings"
18         "time"
19
20         "github.com/anacrolix/chansync/events"
21         "github.com/anacrolix/dht/v2"
22         "github.com/anacrolix/dht/v2/krpc"
23         "github.com/anacrolix/log"
24         "github.com/anacrolix/missinggo/perf"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/v2"
27         "github.com/anacrolix/missinggo/v2/bitmap"
28         "github.com/anacrolix/missinggo/v2/pproffd"
29         "github.com/anacrolix/sync"
30         request_strategy "github.com/anacrolix/torrent/request-strategy"
31         "github.com/davecgh/go-spew/spew"
32         "github.com/dustin/go-humanize"
33         "github.com/google/btree"
34         "github.com/pion/datachannel"
35         "golang.org/x/time/rate"
36
37         "github.com/anacrolix/chansync"
38
39         "github.com/anacrolix/torrent/bencode"
40         "github.com/anacrolix/torrent/internal/limiter"
41         "github.com/anacrolix/torrent/iplist"
42         "github.com/anacrolix/torrent/metainfo"
43         "github.com/anacrolix/torrent/mse"
44         pp "github.com/anacrolix/torrent/peer_protocol"
45         "github.com/anacrolix/torrent/storage"
46         "github.com/anacrolix/torrent/tracker"
47         "github.com/anacrolix/torrent/webtorrent"
48 )
49
50 // Clients contain zero or more Torrents. A Client manages a blocklist, the
51 // TCP/UDP protocol ports, and DHT as desired.
52 type Client struct {
53         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
54         // fields. See #262.
55         stats ConnStats
56
57         _mu    lockWithDeferreds
58         event  sync.Cond
59         closed chansync.SetOnce
60
61         config *ClientConfig
62         logger log.Logger
63
64         peerID         PeerID
65         defaultStorage *storage.Client
66         onClose        []func()
67         dialers        []Dialer
68         listeners      []Listener
69         dhtServers     []DhtServer
70         ipBlockList    iplist.Ranger
71
72         // Set of addresses that have our client ID. This intentionally will
73         // include ourselves if we end up trying to connect to our own address
74         // through legitimate channels.
75         dopplegangerAddrs map[string]struct{}
76         badPeerIPs        map[string]struct{}
77         torrents          map[InfoHash]*Torrent
78         pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
79
80         acceptLimiter   map[ipStr]int
81         dialRateLimiter *rate.Limiter
82         numHalfOpen     int
83
84         websocketTrackers websocketTrackers
85
86         activeAnnounceLimiter limiter.Instance
87         webseedHttpClient     *http.Client
88 }
89
90 type ipStr string
91
92 func (cl *Client) BadPeerIPs() (ips []string) {
93         cl.rLock()
94         ips = cl.badPeerIPsLocked()
95         cl.rUnlock()
96         return
97 }
98
99 func (cl *Client) badPeerIPsLocked() (ips []string) {
100         ips = make([]string, len(cl.badPeerIPs))
101         i := 0
102         for k := range cl.badPeerIPs {
103                 ips[i] = k
104                 i += 1
105         }
106         return
107 }
108
109 func (cl *Client) PeerID() PeerID {
110         return cl.peerID
111 }
112
113 // Returns the port number for the first listener that has one. No longer assumes that all port
114 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 // found.
116 func (cl *Client) LocalPort() (port int) {
117         for i := 0; i < len(cl.listeners); i += 1 {
118                 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
119                         return
120                 }
121         }
122         return
123 }
124
125 func writeDhtServerStatus(w io.Writer, s DhtServer) {
126         dhtStats := s.Stats()
127         fmt.Fprintf(w, " ID: %x\n", s.ID())
128         spew.Fdump(w, dhtStats)
129 }
130
131 // Writes out a human readable status of the client, such as for writing to a
132 // HTTP status page.
133 func (cl *Client) WriteStatus(_w io.Writer) {
134         cl.rLock()
135         defer cl.rUnlock()
136         w := bufio.NewWriter(_w)
137         defer w.Flush()
138         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
139         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
140         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
141         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
142         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
143         cl.eachDhtServer(func(s DhtServer) {
144                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
145                 writeDhtServerStatus(w, s)
146         })
147         spew.Fdump(w, &cl.stats)
148         torrentsSlice := cl.torrentsAsSlice()
149         fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150         fmt.Fprintln(w)
151         sort.Slice(torrentsSlice, func(l, r int) bool {
152                 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153         })
154         for _, t := range torrentsSlice {
155                 if t.name() == "" {
156                         fmt.Fprint(w, "<unknown name>")
157                 } else {
158                         fmt.Fprint(w, t.name())
159                 }
160                 fmt.Fprint(w, "\n")
161                 if t.info != nil {
162                         fmt.Fprintf(
163                                 w,
164                                 "%f%% of %d bytes (%s)",
165                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166                                 *t.length,
167                                 humanize.Bytes(uint64(*t.length)))
168                 } else {
169                         w.WriteString("<missing metainfo>")
170                 }
171                 fmt.Fprint(w, "\n")
172                 t.writeStatus(w)
173                 fmt.Fprintln(w)
174         }
175 }
176
177 func (cl *Client) initLogger() {
178         logger := cl.config.Logger
179         if logger.IsZero() {
180                 logger = log.Default
181         }
182         if cl.config.Debug {
183                 logger = logger.FilterLevel(log.Debug)
184         }
185         cl.logger = logger.WithValues(cl)
186 }
187
188 func (cl *Client) announceKey() int32 {
189         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
190 }
191
192 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
193 func (cl *Client) init(cfg *ClientConfig) {
194         cl.config = cfg
195         cl.dopplegangerAddrs = make(map[string]struct{})
196         cl.torrents = make(map[metainfo.Hash]*Torrent)
197         cl.dialRateLimiter = rate.NewLimiter(10, 10)
198         cl.activeAnnounceLimiter.SlotsPerKey = 2
199         cl.event.L = cl.locker()
200         cl.ipBlockList = cfg.IPBlocklist
201         cl.webseedHttpClient = &http.Client{
202                 Transport: &http.Transport{
203                         Proxy:           cfg.HTTPProxy,
204                         MaxConnsPerHost: 10,
205                 },
206         }
207 }
208
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
210         if cfg == nil {
211                 cfg = NewDefaultClientConfig()
212                 cfg.ListenPort = 0
213         }
214         var client Client
215         client.init(cfg)
216         cl = &client
217         go cl.acceptLimitClearer()
218         cl.initLogger()
219         defer func() {
220                 if err != nil {
221                         cl.Close()
222                         cl = nil
223                 }
224         }()
225
226         storageImpl := cfg.DefaultStorage
227         if storageImpl == nil {
228                 // We'd use mmap by default but HFS+ doesn't support sparse files.
229                 storageImplCloser := storage.NewFile(cfg.DataDir)
230                 cl.onClose = append(cl.onClose, func() {
231                         if err := storageImplCloser.Close(); err != nil {
232                                 cl.logger.Printf("error closing default storage: %s", err)
233                         }
234                 })
235                 storageImpl = storageImplCloser
236         }
237         cl.defaultStorage = storage.NewClient(storageImpl)
238
239         if cfg.PeerID != "" {
240                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
241         } else {
242                 o := copy(cl.peerID[:], cfg.Bep20)
243                 _, err = rand.Read(cl.peerID[o:])
244                 if err != nil {
245                         panic("error generating peer id")
246                 }
247         }
248
249         sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback, cl.logger)
250         if err != nil {
251                 return
252         }
253
254         // Check for panics.
255         cl.LocalPort()
256
257         for _, _s := range sockets {
258                 s := _s // Go is fucking retarded.
259                 cl.onClose = append(cl.onClose, func() { go s.Close() })
260                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261                         cl.dialers = append(cl.dialers, s)
262                         cl.listeners = append(cl.listeners, s)
263                         if cl.config.AcceptPeerConnections {
264                                 go cl.acceptConnections(s)
265                         }
266                 }
267         }
268
269         go cl.forwardPort()
270         if !cfg.NoDHT {
271                 for _, s := range sockets {
272                         if pc, ok := s.(net.PacketConn); ok {
273                                 ds, err := cl.NewAnacrolixDhtServer(pc)
274                                 if err != nil {
275                                         panic(err)
276                                 }
277                                 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
279                         }
280                 }
281         }
282
283         cl.websocketTrackers = websocketTrackers{
284                 PeerId: cl.peerID,
285                 Logger: cl.logger,
286                 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
287                         cl.lock()
288                         defer cl.unlock()
289                         t, ok := cl.torrents[infoHash]
290                         if !ok {
291                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
292                         }
293                         return t.announceRequest(event), nil
294                 },
295                 Proxy: cl.config.HTTPProxy,
296                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
297                         cl.lock()
298                         defer cl.unlock()
299                         t, ok := cl.torrents[dcc.InfoHash]
300                         if !ok {
301                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
302                                         "got webrtc conn for unloaded torrent with infohash %x",
303                                         dcc.InfoHash,
304                                 )
305                                 dc.Close()
306                                 return
307                         }
308                         go t.onWebRtcConn(dc, dcc)
309                 },
310         }
311
312         return
313 }
314
315 func (cl *Client) AddDhtServer(d DhtServer) {
316         cl.dhtServers = append(cl.dhtServers, d)
317 }
318
319 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
320 // given address for any Torrent.
321 func (cl *Client) AddDialer(d Dialer) {
322         cl.lock()
323         defer cl.unlock()
324         cl.dialers = append(cl.dialers, d)
325         for _, t := range cl.torrents {
326                 t.openNewConns()
327         }
328 }
329
330 func (cl *Client) Listeners() []Listener {
331         return cl.listeners
332 }
333
334 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 // yourself.
336 func (cl *Client) AddListener(l Listener) {
337         cl.listeners = append(cl.listeners, l)
338         if cl.config.AcceptPeerConnections {
339                 go cl.acceptConnections(l)
340         }
341 }
342
343 func (cl *Client) firewallCallback(net.Addr) bool {
344         cl.rLock()
345         block := !cl.wantConns() || !cl.config.AcceptPeerConnections
346         cl.rUnlock()
347         if block {
348                 torrent.Add("connections firewalled", 1)
349         } else {
350                 torrent.Add("connections not firewalled", 1)
351         }
352         return block
353 }
354
355 func (cl *Client) listenOnNetwork(n network) bool {
356         if n.Ipv4 && cl.config.DisableIPv4 {
357                 return false
358         }
359         if n.Ipv6 && cl.config.DisableIPv6 {
360                 return false
361         }
362         if n.Tcp && cl.config.DisableTCP {
363                 return false
364         }
365         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
366                 return false
367         }
368         return true
369 }
370
371 func (cl *Client) listenNetworks() (ns []network) {
372         for _, n := range allPeerNetworks {
373                 if cl.listenOnNetwork(n) {
374                         ns = append(ns, n)
375                 }
376         }
377         return
378 }
379
380 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
381 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
382         cfg := dht.ServerConfig{
383                 IPBlocklist:    cl.ipBlockList,
384                 Conn:           conn,
385                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
386                 PublicIP: func() net.IP {
387                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
388                                 return cl.config.PublicIp6
389                         }
390                         return cl.config.PublicIp4
391                 }(),
392                 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
393                 OnQuery:       cl.config.DHTOnQuery,
394                 Logger:        cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395         }
396         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
397                 f(&cfg)
398         }
399         s, err = dht.NewServer(&cfg)
400         if err == nil {
401                 go func() {
402                         ts, err := s.Bootstrap()
403                         if err != nil {
404                                 cl.logger.Printf("error bootstrapping dht: %s", err)
405                         }
406                         log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
407                 }()
408         }
409         return
410 }
411
412 func (cl *Client) Closed() events.Done {
413         return cl.closed.Done()
414 }
415
416 func (cl *Client) eachDhtServer(f func(DhtServer)) {
417         for _, ds := range cl.dhtServers {
418                 f(ds)
419         }
420 }
421
422 // Stops the client. All connections to peers are closed and all activity will come to a halt.
423 func (cl *Client) Close() (errs []error) {
424         var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
425         cl.lock()
426         for _, t := range cl.torrents {
427                 err := t.close(&closeGroup)
428                 if err != nil {
429                         errs = append(errs, err)
430                 }
431         }
432         for i := range cl.onClose {
433                 cl.onClose[len(cl.onClose)-1-i]()
434         }
435         cl.closed.Set()
436         cl.unlock()
437         cl.event.Broadcast()
438         closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
439         return
440 }
441
442 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
443         if cl.ipBlockList == nil {
444                 return
445         }
446         return cl.ipBlockList.Lookup(ip)
447 }
448
449 func (cl *Client) ipIsBlocked(ip net.IP) bool {
450         _, blocked := cl.ipBlockRange(ip)
451         return blocked
452 }
453
454 func (cl *Client) wantConns() bool {
455         if cl.config.AlwaysWantConns {
456                 return true
457         }
458         for _, t := range cl.torrents {
459                 if t.wantConns() {
460                         return true
461                 }
462         }
463         return false
464 }
465
466 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
467 func (cl *Client) rejectAccepted(conn net.Conn) error {
468         if !cl.wantConns() {
469                 return errors.New("don't want conns right now")
470         }
471         ra := conn.RemoteAddr()
472         if rip := addrIpOrNil(ra); rip != nil {
473                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
474                         return errors.New("ipv4 peers disabled")
475                 }
476                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
477                         return errors.New("ipv4 disabled")
478                 }
479                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
480                         return errors.New("ipv6 disabled")
481                 }
482                 if cl.rateLimitAccept(rip) {
483                         return errors.New("source IP accepted rate limited")
484                 }
485                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
486                         return errors.New("bad source addr")
487                 }
488         }
489         return nil
490 }
491
492 func (cl *Client) acceptConnections(l Listener) {
493         for {
494                 conn, err := l.Accept()
495                 torrent.Add("client listener accepts", 1)
496                 conn = pproffd.WrapNetConn(conn)
497                 cl.rLock()
498                 closed := cl.closed.IsSet()
499                 var reject error
500                 if !closed && conn != nil {
501                         reject = cl.rejectAccepted(conn)
502                 }
503                 cl.rUnlock()
504                 if closed {
505                         if conn != nil {
506                                 conn.Close()
507                         }
508                         return
509                 }
510                 if err != nil {
511                         log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
512                         continue
513                 }
514                 go func() {
515                         if reject != nil {
516                                 torrent.Add("rejected accepted connections", 1)
517                                 cl.logger.LazyLog(log.Debug, func() log.Msg {
518                                         return log.Fmsg("rejecting accepted conn: %v", reject)
519                                 })
520                                 conn.Close()
521                         } else {
522                                 go cl.incomingConnection(conn)
523                         }
524                         cl.logger.LazyLog(log.Debug, func() log.Msg {
525                                 return log.Fmsg("accepted %q connection at %q from %q",
526                                         l.Addr().Network(),
527                                         conn.LocalAddr(),
528                                         conn.RemoteAddr(),
529                                 )
530                         })
531                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
532                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
533                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
534                 }()
535         }
536 }
537
538 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
539 func regularNetConnPeerConnConnString(nc net.Conn) string {
540         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
541 }
542
543 func (cl *Client) incomingConnection(nc net.Conn) {
544         defer nc.Close()
545         if tc, ok := nc.(*net.TCPConn); ok {
546                 tc.SetLinger(0)
547         }
548         c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
549                 regularNetConnPeerConnConnString(nc))
550         defer func() {
551                 cl.lock()
552                 defer cl.unlock()
553                 c.close()
554         }()
555         c.Discovery = PeerSourceIncoming
556         cl.runReceivedConn(c)
557 }
558
559 // Returns a handle to the given torrent, if it's present in the client.
560 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
561         cl.lock()
562         defer cl.unlock()
563         t, ok = cl.torrents[ih]
564         return
565 }
566
567 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
568         return cl.torrents[ih]
569 }
570
571 type DialResult struct {
572         Conn   net.Conn
573         Dialer Dialer
574 }
575
576 func countDialResult(err error) {
577         if err == nil {
578                 torrent.Add("successful dials", 1)
579         } else {
580                 torrent.Add("unsuccessful dials", 1)
581         }
582 }
583
584 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
585         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
586         if ret < minDialTimeout {
587                 ret = minDialTimeout
588         }
589         return
590 }
591
592 // Returns whether an address is known to connect to a client with our own ID.
593 func (cl *Client) dopplegangerAddr(addr string) bool {
594         _, ok := cl.dopplegangerAddrs[addr]
595         return ok
596 }
597
598 // Returns a connection over UTP or TCP, whichever is first to connect.
599 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
600         return DialFirst(ctx, addr, cl.dialers)
601 }
602
603 // Returns a connection over UTP or TCP, whichever is first to connect.
604 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605         {
606                 t := perf.NewTimer(perf.CallerName(0))
607                 defer func() {
608                         if res.Conn == nil {
609                                 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610                         } else {
611                                 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
612                         }
613                 }()
614         }
615         ctx, cancel := context.WithCancel(ctx)
616         // As soon as we return one connection, cancel the others.
617         defer cancel()
618         left := 0
619         resCh := make(chan DialResult, left)
620         for _, _s := range dialers {
621                 left++
622                 s := _s
623                 go func() {
624                         resCh <- DialResult{
625                                 dialFromSocket(ctx, s, addr),
626                                 s,
627                         }
628                 }()
629         }
630         // Wait for a successful connection.
631         func() {
632                 defer perf.ScopeTimer()()
633                 for ; left > 0 && res.Conn == nil; left-- {
634                         res = <-resCh
635                 }
636         }()
637         // There are still incompleted dials.
638         go func() {
639                 for ; left > 0; left-- {
640                         conn := (<-resCh).Conn
641                         if conn != nil {
642                                 conn.Close()
643                         }
644                 }
645         }()
646         if res.Conn != nil {
647                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
648         }
649         return res
650 }
651
652 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
653         c, err := s.Dial(ctx, addr)
654         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
655         // it now in case we close the connection forthwith.
656         if tc, ok := c.(*net.TCPConn); ok {
657                 tc.SetLinger(0)
658         }
659         countDialResult(err)
660         return c
661 }
662
663 func forgettableDialError(err error) bool {
664         return strings.Contains(err.Error(), "no suitable address found")
665 }
666
667 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
668         if _, ok := t.halfOpen[addr]; !ok {
669                 panic("invariant broken")
670         }
671         delete(t.halfOpen, addr)
672         cl.numHalfOpen--
673         for _, t := range cl.torrents {
674                 t.openNewConns()
675         }
676 }
677
678 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
679 // for valid reasons.
680 func (cl *Client) initiateProtocolHandshakes(
681         ctx context.Context,
682         nc net.Conn,
683         t *Torrent,
684         outgoing, encryptHeader bool,
685         remoteAddr PeerRemoteAddr,
686         network, connString string,
687 ) (
688         c *PeerConn, err error,
689 ) {
690         c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
691         c.headerEncrypted = encryptHeader
692         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693         defer cancel()
694         dl, ok := ctx.Deadline()
695         if !ok {
696                 panic(ctx)
697         }
698         err = nc.SetDeadline(dl)
699         if err != nil {
700                 panic(err)
701         }
702         err = cl.initiateHandshakes(c, t)
703         return
704 }
705
706 // Returns nil connection and nil error if no connection could be established for valid reasons.
707 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
708         dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
709                 cl.rLock()
710                 defer cl.rUnlock()
711                 return t.dialTimeout()
712         }())
713         defer cancel()
714         dr := cl.dialFirst(dialCtx, addr.String())
715         nc := dr.Conn
716         if nc == nil {
717                 if dialCtx.Err() != nil {
718                         return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719                 }
720                 return nil, errors.New("dial failed")
721         }
722         c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
723         if err != nil {
724                 nc.Close()
725         }
726         return c, err
727 }
728
729 // Returns nil connection and nil error if no connection could be established
730 // for valid reasons.
731 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
732         torrent.Add("establish outgoing connection", 1)
733         obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
734         c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735         if err == nil {
736                 torrent.Add("initiated conn with preferred header obfuscation", 1)
737                 return
738         }
739         // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
740         if cl.config.HeaderObfuscationPolicy.RequirePreferred {
741                 // We should have just tried with the preferred header obfuscation. If it was required,
742                 // there's nothing else to try.
743                 return
744         }
745         // Try again with encryption if we didn't earlier, or without if we did.
746         c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747         if err == nil {
748                 torrent.Add("initiated conn with fallback header obfuscation", 1)
749         }
750         // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
751         return
752 }
753
754 // Called to dial out and run a connection. The addr we're given is already
755 // considered half-open.
756 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
757         cl.dialRateLimiter.Wait(context.Background())
758         c, err := cl.establishOutgoingConn(t, addr)
759         if err == nil {
760                 c.conn.SetWriteDeadline(time.Time{})
761         }
762         cl.lock()
763         defer cl.unlock()
764         // Don't release lock between here and addPeerConn, unless it's for
765         // failure.
766         cl.noLongerHalfOpen(t, addr.String())
767         if err != nil {
768                 if cl.config.Debug {
769                         cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
770                 }
771                 return
772         }
773         defer c.close()
774         c.Discovery = ps
775         c.trusted = trusted
776         t.runHandshookConnLoggingErr(c)
777 }
778
779 // The port number for incoming peer connections. 0 if the client isn't listening.
780 func (cl *Client) incomingPeerPort() int {
781         return cl.LocalPort()
782 }
783
784 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
785         if c.headerEncrypted {
786                 var rw io.ReadWriter
787                 var err error
788                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
789                         struct {
790                                 io.Reader
791                                 io.Writer
792                         }{c.r, c.w},
793                         t.infoHash[:],
794                         nil,
795                         cl.config.CryptoProvides,
796                 )
797                 c.setRW(rw)
798                 if err != nil {
799                         return fmt.Errorf("header obfuscation handshake: %w", err)
800                 }
801         }
802         ih, err := cl.connBtHandshake(c, &t.infoHash)
803         if err != nil {
804                 return fmt.Errorf("bittorrent protocol handshake: %w", err)
805         }
806         if ih != t.infoHash {
807                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
808         }
809         return nil
810 }
811
812 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
813 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
814 func (cl *Client) forSkeys(f func([]byte) bool) {
815         cl.rLock()
816         defer cl.rUnlock()
817         if false { // Emulate the bug from #114
818                 var firstIh InfoHash
819                 for ih := range cl.torrents {
820                         firstIh = ih
821                         break
822                 }
823                 for range cl.torrents {
824                         if !f(firstIh[:]) {
825                                 break
826                         }
827                 }
828                 return
829         }
830         for ih := range cl.torrents {
831                 if !f(ih[:]) {
832                         break
833                 }
834         }
835 }
836
837 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
838         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
839                 return ret
840         }
841         return cl.forSkeys
842 }
843
844 // Do encryption and bittorrent handshakes as receiver.
845 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
846         defer perf.ScopeTimerErr(&err)()
847         var rw io.ReadWriter
848         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
849         c.setRW(rw)
850         if err == nil || err == mse.ErrNoSecretKeyMatch {
851                 if c.headerEncrypted {
852                         torrent.Add("handshakes received encrypted", 1)
853                 } else {
854                         torrent.Add("handshakes received unencrypted", 1)
855                 }
856         } else {
857                 torrent.Add("handshakes received with error while handling encryption", 1)
858         }
859         if err != nil {
860                 if err == mse.ErrNoSecretKeyMatch {
861                         err = nil
862                 }
863                 return
864         }
865         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
866                 err = errors.New("connection does not have required header obfuscation")
867                 return
868         }
869         ih, err := cl.connBtHandshake(c, nil)
870         if err != nil {
871                 return nil, fmt.Errorf("during bt handshake: %w", err)
872         }
873         cl.lock()
874         t = cl.torrents[ih]
875         cl.unlock()
876         return
877 }
878
879 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
880
881 func init() {
882         torrent.Set(
883                 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
884                 &successfulPeerWireProtocolHandshakePeerReservedBytes)
885 }
886
887 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
888         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
889         if err != nil {
890                 return
891         }
892         successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
893         ret = res.Hash
894         c.PeerExtensionBytes = res.PeerExtensionBits
895         c.PeerID = res.PeerID
896         c.completedHandshake = time.Now()
897         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
898                 cb(c, res.Hash)
899         }
900         return
901 }
902
903 func (cl *Client) runReceivedConn(c *PeerConn) {
904         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
905         if err != nil {
906                 panic(err)
907         }
908         t, err := cl.receiveHandshakes(c)
909         if err != nil {
910                 cl.logger.LazyLog(log.Debug, func() log.Msg {
911                         return log.Fmsg(
912                                 "error receiving handshakes on %v: %s", c, err,
913                         ).Add(
914                                 "network", c.Network,
915                         )
916                 })
917                 torrent.Add("error receiving handshake", 1)
918                 cl.lock()
919                 cl.onBadAccept(c.RemoteAddr)
920                 cl.unlock()
921                 return
922         }
923         if t == nil {
924                 torrent.Add("received handshake for unloaded torrent", 1)
925                 cl.logger.LazyLog(log.Debug, func() log.Msg {
926                         return log.Fmsg("received handshake for unloaded torrent")
927                 })
928                 cl.lock()
929                 cl.onBadAccept(c.RemoteAddr)
930                 cl.unlock()
931                 return
932         }
933         torrent.Add("received handshake for loaded torrent", 1)
934         c.conn.SetWriteDeadline(time.Time{})
935         cl.lock()
936         defer cl.unlock()
937         t.runHandshookConnLoggingErr(c)
938 }
939
940 // Client lock must be held before entering this.
941 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
942         c.setTorrent(t)
943         for i, b := range cl.config.MinPeerExtensions {
944                 if c.PeerExtensionBytes[i]&b != b {
945                         return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
946                 }
947         }
948         if c.PeerID == cl.peerID {
949                 if c.outgoing {
950                         connsToSelf.Add(1)
951                         addr := c.RemoteAddr.String()
952                         cl.dopplegangerAddrs[addr] = struct{}{}
953                 } /* else {
954                         // Because the remote address is not necessarily the same as its client's torrent listen
955                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
956                         // can record *us* as the doppleganger.
957                 } */
958                 t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
959                 return nil
960         }
961         c.r = deadlineReader{c.conn, c.r}
962         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
963         if connIsIpv6(c.conn) {
964                 torrent.Add("completed handshake over ipv6", 1)
965         }
966         if err := t.addPeerConn(c); err != nil {
967                 return fmt.Errorf("adding connection: %w", err)
968         }
969         defer t.dropConnection(c)
970         c.startWriter()
971         cl.sendInitialMessages(c, t)
972         c.initUpdateRequestsTimer()
973         err := c.mainReadLoop()
974         if err != nil {
975                 return fmt.Errorf("main read loop: %w", err)
976         }
977         return nil
978 }
979
980 const check = false
981
982 func (p *Peer) initUpdateRequestsTimer() {
983         if check {
984                 if p.updateRequestsTimer != nil {
985                         panic(p.updateRequestsTimer)
986                 }
987         }
988         p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
989 }
990
991 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
992
993 func (c *Peer) updateRequestsTimerFunc() {
994         c.locker().Lock()
995         defer c.locker().Unlock()
996         if c.closed.IsSet() {
997                 return
998         }
999         if c.isLowOnRequests() {
1000                 // If there are no outstanding requests, then a request update should have already run.
1001                 return
1002         }
1003         if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1004                 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1005                 // already been fired.
1006                 torrent.Add("spurious timer requests updates", 1)
1007                 return
1008         }
1009         c.updateRequests(peerUpdateRequestsTimerReason)
1010 }
1011
1012 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1013 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1014 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1015 const localClientReqq = 1 << 5
1016
1017 // See the order given in Transmission's tr_peerMsgsNew.
1018 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1019         if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1020                 conn.write(pp.Message{
1021                         Type:       pp.Extended,
1022                         ExtendedID: pp.HandshakeExtendedID,
1023                         ExtendedPayload: func() []byte {
1024                                 msg := pp.ExtendedHandshakeMessage{
1025                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
1026                                                 pp.ExtensionNameMetadata: metadataExtendedId,
1027                                         },
1028                                         V:            cl.config.ExtendedHandshakeClientVersion,
1029                                         Reqq:         localClientReqq,
1030                                         YourIp:       pp.CompactIp(conn.remoteIp()),
1031                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1032                                         Port:         cl.incomingPeerPort(),
1033                                         MetadataSize: torrent.metadataSize(),
1034                                         // TODO: We can figured these out specific to the socket
1035                                         // used.
1036                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1037                                         Ipv6: cl.config.PublicIp6.To16(),
1038                                 }
1039                                 if !cl.config.DisablePEX {
1040                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
1041                                 }
1042                                 return bencode.MustMarshal(msg)
1043                         }(),
1044                 })
1045         }
1046         func() {
1047                 if conn.fastEnabled() {
1048                         if torrent.haveAllPieces() {
1049                                 conn.write(pp.Message{Type: pp.HaveAll})
1050                                 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1051                                 return
1052                         } else if !torrent.haveAnyPieces() {
1053                                 conn.write(pp.Message{Type: pp.HaveNone})
1054                                 conn.sentHaves.Clear()
1055                                 return
1056                         }
1057                 }
1058                 conn.postBitfield()
1059         }()
1060         if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1061                 conn.write(pp.Message{
1062                         Type: pp.Port,
1063                         Port: cl.dhtPort(),
1064                 })
1065         }
1066 }
1067
1068 func (cl *Client) dhtPort() (ret uint16) {
1069         if len(cl.dhtServers) == 0 {
1070                 return
1071         }
1072         return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1073 }
1074
1075 func (cl *Client) haveDhtServer() bool {
1076         return len(cl.dhtServers) > 0
1077 }
1078
1079 // Process incoming ut_metadata message.
1080 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1081         var d pp.ExtendedMetadataRequestMsg
1082         err := bencode.Unmarshal(payload, &d)
1083         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1084         } else if err != nil {
1085                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1086         }
1087         piece := d.Piece
1088         switch d.Type {
1089         case pp.DataMetadataExtensionMsgType:
1090                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1091                 if !c.requestedMetadataPiece(piece) {
1092                         return fmt.Errorf("got unexpected piece %d", piece)
1093                 }
1094                 c.metadataRequests[piece] = false
1095                 begin := len(payload) - d.PieceSize()
1096                 if begin < 0 || begin >= len(payload) {
1097                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1098                 }
1099                 t.saveMetadataPiece(piece, payload[begin:])
1100                 c.lastUsefulChunkReceived = time.Now()
1101                 err = t.maybeCompleteMetadata()
1102                 if err != nil {
1103                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1104                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1105                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1106                         // log consumers can filter for this message.
1107                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1108                 }
1109                 return err
1110         case pp.RequestMetadataExtensionMsgType:
1111                 if !t.haveMetadataPiece(piece) {
1112                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1113                         return nil
1114                 }
1115                 start := (1 << 14) * piece
1116                 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1117                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1118                 return nil
1119         case pp.RejectMetadataExtensionMsgType:
1120                 return nil
1121         default:
1122                 return errors.New("unknown msg_type value")
1123         }
1124 }
1125
1126 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1127         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1128                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1129         }
1130         return false
1131 }
1132
1133 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1134         if port == 0 {
1135                 return true
1136         }
1137         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1138                 return true
1139         }
1140         if _, ok := cl.ipBlockRange(ip); ok {
1141                 return true
1142         }
1143         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1144                 return true
1145         }
1146         return false
1147 }
1148
1149 // Return a Torrent ready for insertion into a Client.
1150 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1151         return cl.newTorrentOpt(AddTorrentOpts{
1152                 InfoHash: ih,
1153                 Storage:  specStorage,
1154         })
1155 }
1156
1157 // Return a Torrent ready for insertion into a Client.
1158 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1159         // use provided storage, if provided
1160         storageClient := cl.defaultStorage
1161         if opts.Storage != nil {
1162                 storageClient = storage.NewClient(opts.Storage)
1163         }
1164
1165         t = &Torrent{
1166                 cl:       cl,
1167                 infoHash: opts.InfoHash,
1168                 peers: prioritizedPeers{
1169                         om: btree.New(32),
1170                         getPrio: func(p PeerInfo) peerPriority {
1171                                 ipPort := p.addr()
1172                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1173                         },
1174                 },
1175                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1176
1177                 halfOpen:          make(map[string]PeerInfo),
1178                 pieceStateChanges: pubsub.NewPubSub(),
1179
1180                 storageOpener:       storageClient,
1181                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1182
1183                 metadataChanged: sync.Cond{
1184                         L: cl.locker(),
1185                 },
1186                 webSeeds:     make(map[string]*Peer),
1187                 gotMetainfoC: make(chan struct{}),
1188         }
1189         t.networkingEnabled.Set()
1190         t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
1191         if opts.ChunkSize == 0 {
1192                 opts.ChunkSize = defaultChunkSize
1193         }
1194         t.setChunkSize(opts.ChunkSize)
1195         return
1196 }
1197
1198 // A file-like handle to some torrent data resource.
1199 type Handle interface {
1200         io.Reader
1201         io.Seeker
1202         io.Closer
1203         io.ReaderAt
1204 }
1205
1206 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1207         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1208 }
1209
1210 // Adds a torrent by InfoHash with a custom Storage implementation.
1211 // If the torrent already exists then this Storage is ignored and the
1212 // existing torrent returned with `new` set to `false`
1213 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1214         cl.lock()
1215         defer cl.unlock()
1216         t, ok := cl.torrents[infoHash]
1217         if ok {
1218                 return
1219         }
1220         new = true
1221
1222         t = cl.newTorrent(infoHash, specStorage)
1223         cl.eachDhtServer(func(s DhtServer) {
1224                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1225                         go t.dhtAnnouncer(s)
1226                 }
1227         })
1228         cl.torrents[infoHash] = t
1229         cl.clearAcceptLimits()
1230         t.updateWantPeersEvent()
1231         // Tickle Client.waitAccept, new torrent may want conns.
1232         cl.event.Broadcast()
1233         return
1234 }
1235
1236 // Adds a torrent by InfoHash with a custom Storage implementation.
1237 // If the torrent already exists then this Storage is ignored and the
1238 // existing torrent returned with `new` set to `false`
1239 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1240         infoHash := opts.InfoHash
1241         cl.lock()
1242         defer cl.unlock()
1243         t, ok := cl.torrents[infoHash]
1244         if ok {
1245                 return
1246         }
1247         new = true
1248
1249         t = cl.newTorrentOpt(opts)
1250         cl.eachDhtServer(func(s DhtServer) {
1251                 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1252                         go t.dhtAnnouncer(s)
1253                 }
1254         })
1255         cl.torrents[infoHash] = t
1256         cl.clearAcceptLimits()
1257         t.updateWantPeersEvent()
1258         // Tickle Client.waitAccept, new torrent may want conns.
1259         cl.event.Broadcast()
1260         return
1261 }
1262
1263 type AddTorrentOpts struct {
1264         InfoHash  InfoHash
1265         Storage   storage.ClientImpl
1266         ChunkSize pp.Integer
1267 }
1268
1269 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1270 // Torrent.MergeSpec.
1271 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1272         t, new = cl.AddTorrentOpt(AddTorrentOpts{
1273                 InfoHash:  spec.InfoHash,
1274                 Storage:   spec.Storage,
1275                 ChunkSize: spec.ChunkSize,
1276         })
1277         modSpec := *spec
1278         if new {
1279                 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1280                 // it.
1281                 modSpec.ChunkSize = 0
1282         }
1283         err = t.MergeSpec(&modSpec)
1284         if err != nil && new {
1285                 t.Drop()
1286         }
1287         return
1288 }
1289
1290 type stringAddr string
1291
1292 var _ net.Addr = stringAddr("")
1293
1294 func (stringAddr) Network() string   { return "" }
1295 func (me stringAddr) String() string { return string(me) }
1296
1297 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1298 // spec.DisallowDataDownload/Upload will be read and applied
1299 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1300 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1301         if spec.DisplayName != "" {
1302                 t.SetDisplayName(spec.DisplayName)
1303         }
1304         if spec.InfoBytes != nil {
1305                 err := t.SetInfoBytes(spec.InfoBytes)
1306                 if err != nil {
1307                         return err
1308                 }
1309         }
1310         cl := t.cl
1311         cl.AddDhtNodes(spec.DhtNodes)
1312         cl.lock()
1313         defer cl.unlock()
1314         t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1315         useTorrentSources(spec.Sources, t)
1316         for _, url := range spec.Webseeds {
1317                 t.addWebSeed(url)
1318         }
1319         for _, peerAddr := range spec.PeerAddrs {
1320                 t.addPeer(PeerInfo{
1321                         Addr:    stringAddr(peerAddr),
1322                         Source:  PeerSourceDirect,
1323                         Trusted: true,
1324                 })
1325         }
1326         if spec.ChunkSize != 0 {
1327                 panic("chunk size cannot be changed for existing Torrent")
1328         }
1329         t.addTrackers(spec.Trackers)
1330         t.maybeNewConns()
1331         t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1332         t.dataUploadDisallowed = spec.DisallowDataUpload
1333         return nil
1334 }
1335
1336 func useTorrentSources(sources []string, t *Torrent) {
1337         // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1338         ctx := context.Background()
1339         for i := 0; i < len(sources); i += 1 {
1340                 s := sources[i]
1341                 go func() {
1342                         if err := useTorrentSource(ctx, s, t); err != nil {
1343                                 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1344                         } else {
1345                                 t.logger.Printf("successfully used source %q", s)
1346                         }
1347                 }()
1348         }
1349 }
1350
1351 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1352         ctx, cancel := context.WithCancel(ctx)
1353         defer cancel()
1354         go func() {
1355                 select {
1356                 case <-t.GotInfo():
1357                 case <-t.Closed():
1358                 case <-ctx.Done():
1359                 }
1360                 cancel()
1361         }()
1362         var req *http.Request
1363         if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1364                 panic(err)
1365         }
1366         var resp *http.Response
1367         if resp, err = http.DefaultClient.Do(req); err != nil {
1368                 return
1369         }
1370         var mi metainfo.MetaInfo
1371         err = bencode.NewDecoder(resp.Body).Decode(&mi)
1372         resp.Body.Close()
1373         if err != nil {
1374                 if ctx.Err() != nil {
1375                         return nil
1376                 }
1377                 return
1378         }
1379         return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1380 }
1381
1382 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1383         t, ok := cl.torrents[infoHash]
1384         if !ok {
1385                 err = fmt.Errorf("no such torrent")
1386                 return
1387         }
1388         err = t.close(wg)
1389         if err != nil {
1390                 panic(err)
1391         }
1392         delete(cl.torrents, infoHash)
1393         return
1394 }
1395
1396 func (cl *Client) allTorrentsCompleted() bool {
1397         for _, t := range cl.torrents {
1398                 if !t.haveInfo() {
1399                         return false
1400                 }
1401                 if !t.haveAllPieces() {
1402                         return false
1403                 }
1404         }
1405         return true
1406 }
1407
1408 // Returns true when all torrents are completely downloaded and false if the
1409 // client is stopped before that.
1410 func (cl *Client) WaitAll() bool {
1411         cl.lock()
1412         defer cl.unlock()
1413         for !cl.allTorrentsCompleted() {
1414                 if cl.closed.IsSet() {
1415                         return false
1416                 }
1417                 cl.event.Wait()
1418         }
1419         return true
1420 }
1421
1422 // Returns handles to all the torrents loaded in the Client.
1423 func (cl *Client) Torrents() []*Torrent {
1424         cl.lock()
1425         defer cl.unlock()
1426         return cl.torrentsAsSlice()
1427 }
1428
1429 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1430         for _, t := range cl.torrents {
1431                 ret = append(ret, t)
1432         }
1433         return
1434 }
1435
1436 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1437         spec, err := TorrentSpecFromMagnetUri(uri)
1438         if err != nil {
1439                 return
1440         }
1441         T, _, err = cl.AddTorrentSpec(spec)
1442         return
1443 }
1444
1445 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1446         ts, err := TorrentSpecFromMetaInfoErr(mi)
1447         if err != nil {
1448                 return
1449         }
1450         T, _, err = cl.AddTorrentSpec(ts)
1451         return
1452 }
1453
1454 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1455         mi, err := metainfo.LoadFromFile(filename)
1456         if err != nil {
1457                 return
1458         }
1459         return cl.AddTorrent(mi)
1460 }
1461
1462 func (cl *Client) DhtServers() []DhtServer {
1463         return cl.dhtServers
1464 }
1465
1466 func (cl *Client) AddDhtNodes(nodes []string) {
1467         for _, n := range nodes {
1468                 hmp := missinggo.SplitHostMaybePort(n)
1469                 ip := net.ParseIP(hmp.Host)
1470                 if ip == nil {
1471                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1472                         continue
1473                 }
1474                 ni := krpc.NodeInfo{
1475                         Addr: krpc.NodeAddr{
1476                                 IP:   ip,
1477                                 Port: hmp.Port,
1478                         },
1479                 }
1480                 cl.eachDhtServer(func(s DhtServer) {
1481                         s.AddNode(ni)
1482                 })
1483         }
1484 }
1485
1486 func (cl *Client) banPeerIP(ip net.IP) {
1487         cl.logger.Printf("banning ip %v", ip)
1488         if cl.badPeerIPs == nil {
1489                 cl.badPeerIPs = make(map[string]struct{})
1490         }
1491         cl.badPeerIPs[ip.String()] = struct{}{}
1492 }
1493
1494 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1495         if network == "" {
1496                 panic(remoteAddr)
1497         }
1498         c = &PeerConn{
1499                 Peer: Peer{
1500                         outgoing:        outgoing,
1501                         choking:         true,
1502                         peerChoking:     true,
1503                         PeerMaxRequests: 250,
1504
1505                         RemoteAddr: remoteAddr,
1506                         Network:    network,
1507                         callbacks:  &cl.config.Callbacks,
1508                 },
1509                 connString: connString,
1510                 conn:       nc,
1511         }
1512         c.peerImpl = c
1513         c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1514         c.setRW(connStatsReadWriter{nc, c})
1515         c.r = &rateLimitedReader{
1516                 l: cl.config.DownloadRateLimiter,
1517                 r: c.r,
1518         }
1519         c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1520         for _, f := range cl.config.Callbacks.NewPeer {
1521                 f(&c.Peer)
1522         }
1523         return
1524 }
1525
1526 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1527         cl.lock()
1528         defer cl.unlock()
1529         t := cl.torrent(ih)
1530         if t == nil {
1531                 return
1532         }
1533         t.addPeers([]PeerInfo{{
1534                 Addr:   ipPortAddr{ip, port},
1535                 Source: PeerSourceDhtAnnouncePeer,
1536         }})
1537 }
1538
1539 func firstNotNil(ips ...net.IP) net.IP {
1540         for _, ip := range ips {
1541                 if ip != nil {
1542                         return ip
1543                 }
1544         }
1545         return nil
1546 }
1547
1548 func (cl *Client) eachListener(f func(Listener) bool) {
1549         for _, s := range cl.listeners {
1550                 if !f(s) {
1551                         break
1552                 }
1553         }
1554 }
1555
1556 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1557         for i := 0; i < len(cl.listeners); i += 1 {
1558                 if ret = cl.listeners[i]; f(ret) {
1559                         return
1560                 }
1561         }
1562         return nil
1563 }
1564
1565 func (cl *Client) publicIp(peer net.IP) net.IP {
1566         // TODO: Use BEP 10 to determine how peers are seeing us.
1567         if peer.To4() != nil {
1568                 return firstNotNil(
1569                         cl.config.PublicIp4,
1570                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1571                 )
1572         }
1573
1574         return firstNotNil(
1575                 cl.config.PublicIp6,
1576                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1577         )
1578 }
1579
1580 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1581         l := cl.findListener(
1582                 func(l Listener) bool {
1583                         return f(addrIpOrNil(l.Addr()))
1584                 },
1585         )
1586         if l == nil {
1587                 return nil
1588         }
1589         return addrIpOrNil(l.Addr())
1590 }
1591
1592 // Our IP as a peer should see it.
1593 func (cl *Client) publicAddr(peer net.IP) IpPort {
1594         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1595 }
1596
1597 // ListenAddrs addresses currently being listened to.
1598 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1599         cl.lock()
1600         ret = make([]net.Addr, len(cl.listeners))
1601         for i := 0; i < len(cl.listeners); i += 1 {
1602                 ret[i] = cl.listeners[i].Addr()
1603         }
1604         cl.unlock()
1605         return
1606 }
1607
1608 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1609         ipa, ok := tryIpPortFromNetAddr(addr)
1610         if !ok {
1611                 return
1612         }
1613         ip := maskIpForAcceptLimiting(ipa.IP)
1614         if cl.acceptLimiter == nil {
1615                 cl.acceptLimiter = make(map[ipStr]int)
1616         }
1617         cl.acceptLimiter[ipStr(ip.String())]++
1618 }
1619
1620 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1621         if ip4 := ip.To4(); ip4 != nil {
1622                 return ip4.Mask(net.CIDRMask(24, 32))
1623         }
1624         return ip
1625 }
1626
1627 func (cl *Client) clearAcceptLimits() {
1628         cl.acceptLimiter = nil
1629 }
1630
1631 func (cl *Client) acceptLimitClearer() {
1632         for {
1633                 select {
1634                 case <-cl.closed.Done():
1635                         return
1636                 case <-time.After(15 * time.Minute):
1637                         cl.lock()
1638                         cl.clearAcceptLimits()
1639                         cl.unlock()
1640                 }
1641         }
1642 }
1643
1644 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1645         if cl.config.DisableAcceptRateLimiting {
1646                 return false
1647         }
1648         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1649 }
1650
1651 func (cl *Client) rLock() {
1652         cl._mu.RLock()
1653 }
1654
1655 func (cl *Client) rUnlock() {
1656         cl._mu.RUnlock()
1657 }
1658
1659 func (cl *Client) lock() {
1660         cl._mu.Lock()
1661 }
1662
1663 func (cl *Client) unlock() {
1664         cl._mu.Unlock()
1665 }
1666
1667 func (cl *Client) locker() *lockWithDeferreds {
1668         return &cl._mu
1669 }
1670
1671 func (cl *Client) String() string {
1672         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1673 }
1674
1675 // Returns connection-level aggregate stats at the Client level. See the comment on
1676 // TorrentStats.ConnStats.
1677 func (cl *Client) ConnStats() ConnStats {
1678         return cl.stats.Copy()
1679 }