]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Extract request strategy stuff into a separate module
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/binary"
9         "errors"
10         "fmt"
11         "io"
12         "net"
13         "net/http"
14         "strconv"
15         "strings"
16         "time"
17
18         "github.com/anacrolix/dht/v2"
19         "github.com/anacrolix/dht/v2/krpc"
20         "github.com/anacrolix/log"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/anacrolix/missinggo/v2/pproffd"
26         "github.com/anacrolix/sync"
27         "github.com/anacrolix/torrent/internal/limiter"
28         request_strategy "github.com/anacrolix/torrent/request-strategy"
29         "github.com/anacrolix/torrent/tracker"
30         "github.com/anacrolix/torrent/webtorrent"
31         "github.com/davecgh/go-spew/spew"
32         "github.com/dustin/go-humanize"
33         "github.com/google/btree"
34         "github.com/pion/datachannel"
35         "golang.org/x/time/rate"
36         "golang.org/x/xerrors"
37
38         "github.com/anacrolix/missinggo/v2"
39         "github.com/anacrolix/missinggo/v2/conntrack"
40
41         "github.com/anacrolix/torrent/bencode"
42         "github.com/anacrolix/torrent/iplist"
43         "github.com/anacrolix/torrent/metainfo"
44         "github.com/anacrolix/torrent/mse"
45         pp "github.com/anacrolix/torrent/peer_protocol"
46         "github.com/anacrolix/torrent/storage"
47 )
48
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
51 type Client struct {
52         // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
53         // fields. See #262.
54         stats ConnStats
55
56         _mu    lockWithDeferreds
57         event  sync.Cond
58         closed missinggo.Event
59
60         config *ClientConfig
61         logger log.Logger
62
63         peerID         PeerID
64         defaultStorage *storage.Client
65         onClose        []func()
66         dialers        []Dialer
67         listeners      []Listener
68         dhtServers     []DhtServer
69         ipBlockList    iplist.Ranger
70
71         // Set of addresses that have our client ID. This intentionally will
72         // include ourselves if we end up trying to connect to our own address
73         // through legitimate channels.
74         dopplegangerAddrs map[string]struct{}
75         badPeerIPs        map[string]struct{}
76         torrents          map[InfoHash]*Torrent
77
78         acceptLimiter   map[ipStr]int
79         dialRateLimiter *rate.Limiter
80         numHalfOpen     int
81
82         websocketTrackers websocketTrackers
83
84         activeAnnounceLimiter limiter.Instance
85
86         pieceRequestOrder request_strategy.ClientPieceOrder
87 }
88
89 type ipStr string
90
91 func (cl *Client) BadPeerIPs() []string {
92         cl.rLock()
93         defer cl.rUnlock()
94         return cl.badPeerIPsLocked()
95 }
96
97 func (cl *Client) badPeerIPsLocked() []string {
98         return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 }
100
101 func (cl *Client) PeerID() PeerID {
102         return cl.peerID
103 }
104
105 // Returns the port number for the first listener that has one. No longer assumes that all port
106 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
107 // found.
108 func (cl *Client) LocalPort() (port int) {
109         cl.eachListener(func(l Listener) bool {
110                 port = addrPortOrZero(l.Addr())
111                 return port == 0
112         })
113         return
114 }
115
116 func writeDhtServerStatus(w io.Writer, s DhtServer) {
117         dhtStats := s.Stats()
118         fmt.Fprintf(w, " ID: %x\n", s.ID())
119         spew.Fdump(w, dhtStats)
120 }
121
122 // Writes out a human readable status of the client, such as for writing to a
123 // HTTP status page.
124 func (cl *Client) WriteStatus(_w io.Writer) {
125         cl.rLock()
126         defer cl.rUnlock()
127         w := bufio.NewWriter(_w)
128         defer w.Flush()
129         fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
130         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
131         fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
132         fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
133         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
134         cl.eachDhtServer(func(s DhtServer) {
135                 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
136                 writeDhtServerStatus(w, s)
137         })
138         spew.Fdump(w, &cl.stats)
139         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
140         fmt.Fprintln(w)
141         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
142                 return l.InfoHash().AsString() < r.InfoHash().AsString()
143         }).([]*Torrent) {
144                 if t.name() == "" {
145                         fmt.Fprint(w, "<unknown name>")
146                 } else {
147                         fmt.Fprint(w, t.name())
148                 }
149                 fmt.Fprint(w, "\n")
150                 if t.info != nil {
151                         fmt.Fprintf(
152                                 w,
153                                 "%f%% of %d bytes (%s)",
154                                 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
155                                 *t.length,
156                                 humanize.Bytes(uint64(*t.length)))
157                 } else {
158                         w.WriteString("<missing metainfo>")
159                 }
160                 fmt.Fprint(w, "\n")
161                 t.writeStatus(w)
162                 fmt.Fprintln(w)
163         }
164 }
165
166 // Filters things that are less than warning from UPnP discovery.
167 func upnpDiscoverLogFilter(m log.Msg) bool {
168         level, ok := m.GetLevel()
169         return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 }
171
172 func (cl *Client) initLogger() {
173         logger := cl.config.Logger
174         if logger.IsZero() {
175                 logger = log.Default
176                 if !cl.config.Debug {
177                         logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178                 }
179         }
180         cl.logger = logger.WithValues(cl)
181 }
182
183 func (cl *Client) announceKey() int32 {
184         return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 }
186
187 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
188         if cfg == nil {
189                 cfg = NewDefaultClientConfig()
190                 cfg.ListenPort = 0
191         }
192         defer func() {
193                 if err != nil {
194                         cl = nil
195                 }
196         }()
197         cl = &Client{
198                 config:            cfg,
199                 dopplegangerAddrs: make(map[string]struct{}),
200                 torrents:          make(map[metainfo.Hash]*Torrent),
201                 dialRateLimiter:   rate.NewLimiter(10, 10),
202         }
203         cl.activeAnnounceLimiter.SlotsPerKey = 2
204         go cl.acceptLimitClearer()
205         cl.initLogger()
206         defer func() {
207                 if err == nil {
208                         return
209                 }
210                 cl.Close()
211         }()
212         cl.event.L = cl.locker()
213         storageImpl := cfg.DefaultStorage
214         if storageImpl == nil {
215                 // We'd use mmap by default but HFS+ doesn't support sparse files.
216                 storageImplCloser := storage.NewFile(cfg.DataDir)
217                 cl.onClose = append(cl.onClose, func() {
218                         if err := storageImplCloser.Close(); err != nil {
219                                 cl.logger.Printf("error closing default storage: %s", err)
220                         }
221                 })
222                 storageImpl = storageImplCloser
223         }
224         cl.defaultStorage = storage.NewClient(storageImpl)
225         if cfg.IPBlocklist != nil {
226                 cl.ipBlockList = cfg.IPBlocklist
227         }
228
229         if cfg.PeerID != "" {
230                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
231         } else {
232                 o := copy(cl.peerID[:], cfg.Bep20)
233                 _, err = rand.Read(cl.peerID[o:])
234                 if err != nil {
235                         panic("error generating peer id")
236                 }
237         }
238
239         sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
240         if err != nil {
241                 return
242         }
243
244         // Check for panics.
245         cl.LocalPort()
246
247         for _, _s := range sockets {
248                 s := _s // Go is fucking retarded.
249                 cl.onClose = append(cl.onClose, func() { s.Close() })
250                 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
251                         cl.dialers = append(cl.dialers, s)
252                         cl.listeners = append(cl.listeners, s)
253                         go cl.acceptConnections(s)
254                 }
255         }
256
257         go cl.forwardPort()
258         if !cfg.NoDHT {
259                 for _, s := range sockets {
260                         if pc, ok := s.(net.PacketConn); ok {
261                                 ds, err := cl.newAnacrolixDhtServer(pc)
262                                 if err != nil {
263                                         panic(err)
264                                 }
265                                 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
266                                 cl.onClose = append(cl.onClose, func() { ds.Close() })
267                         }
268                 }
269         }
270
271         cl.websocketTrackers = websocketTrackers{
272                 PeerId: cl.peerID,
273                 Logger: cl.logger,
274                 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
275                         cl.lock()
276                         defer cl.unlock()
277                         t, ok := cl.torrents[infoHash]
278                         if !ok {
279                                 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
280                         }
281                         return t.announceRequest(event), nil
282                 },
283                 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
284                         cl.lock()
285                         defer cl.unlock()
286                         t, ok := cl.torrents[dcc.InfoHash]
287                         if !ok {
288                                 cl.logger.WithDefaultLevel(log.Warning).Printf(
289                                         "got webrtc conn for unloaded torrent with infohash %x",
290                                         dcc.InfoHash,
291                                 )
292                                 dc.Close()
293                                 return
294                         }
295                         go t.onWebRtcConn(dc, dcc)
296                 },
297         }
298
299         go cl.requester()
300
301         return
302 }
303
304 func (cl *Client) AddDhtServer(d DhtServer) {
305         cl.dhtServers = append(cl.dhtServers, d)
306 }
307
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
311         cl.lock()
312         defer cl.unlock()
313         cl.dialers = append(cl.dialers, d)
314         for _, t := range cl.torrents {
315                 t.openNewConns()
316         }
317 }
318
319 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
320 // yourself.
321 func (cl *Client) AddListener(l Listener) {
322         cl.listeners = append(cl.listeners, l)
323         go cl.acceptConnections(l)
324 }
325
326 func (cl *Client) firewallCallback(net.Addr) bool {
327         cl.rLock()
328         block := !cl.wantConns()
329         cl.rUnlock()
330         if block {
331                 torrent.Add("connections firewalled", 1)
332         } else {
333                 torrent.Add("connections not firewalled", 1)
334         }
335         return block
336 }
337
338 func (cl *Client) listenOnNetwork(n network) bool {
339         if n.Ipv4 && cl.config.DisableIPv4 {
340                 return false
341         }
342         if n.Ipv6 && cl.config.DisableIPv6 {
343                 return false
344         }
345         if n.Tcp && cl.config.DisableTCP {
346                 return false
347         }
348         if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
349                 return false
350         }
351         return true
352 }
353
354 func (cl *Client) listenNetworks() (ns []network) {
355         for _, n := range allPeerNetworks {
356                 if cl.listenOnNetwork(n) {
357                         ns = append(ns, n)
358                 }
359         }
360         return
361 }
362
363 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
364         cfg := dht.ServerConfig{
365                 IPBlocklist:    cl.ipBlockList,
366                 Conn:           conn,
367                 OnAnnouncePeer: cl.onDHTAnnouncePeer,
368                 PublicIP: func() net.IP {
369                         if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
370                                 return cl.config.PublicIp6
371                         }
372                         return cl.config.PublicIp4
373                 }(),
374                 StartingNodes:      cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
375                 ConnectionTracking: cl.config.ConnTracker,
376                 OnQuery:            cl.config.DHTOnQuery,
377                 Logger:             cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
378         }
379         if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
380                 f(&cfg)
381         }
382         s, err = dht.NewServer(&cfg)
383         if err == nil {
384                 go func() {
385                         ts, err := s.Bootstrap()
386                         if err != nil {
387                                 cl.logger.Printf("error bootstrapping dht: %s", err)
388                         }
389                         log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
390                 }()
391         }
392         return
393 }
394
395 func (cl *Client) Closed() <-chan struct{} {
396         cl.lock()
397         defer cl.unlock()
398         return cl.closed.C()
399 }
400
401 func (cl *Client) eachDhtServer(f func(DhtServer)) {
402         for _, ds := range cl.dhtServers {
403                 f(ds)
404         }
405 }
406
407 // Stops the client. All connections to peers are closed and all activity will
408 // come to a halt.
409 func (cl *Client) Close() {
410         cl.lock()
411         defer cl.unlock()
412         cl.closed.Set()
413         for _, t := range cl.torrents {
414                 t.close()
415         }
416         for i := range cl.onClose {
417                 cl.onClose[len(cl.onClose)-1-i]()
418         }
419         cl.event.Broadcast()
420 }
421
422 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
423         if cl.ipBlockList == nil {
424                 return
425         }
426         return cl.ipBlockList.Lookup(ip)
427 }
428
429 func (cl *Client) ipIsBlocked(ip net.IP) bool {
430         _, blocked := cl.ipBlockRange(ip)
431         return blocked
432 }
433
434 func (cl *Client) wantConns() bool {
435         for _, t := range cl.torrents {
436                 if t.wantConns() {
437                         return true
438                 }
439         }
440         return false
441 }
442
443 func (cl *Client) waitAccept() {
444         for {
445                 if cl.closed.IsSet() {
446                         return
447                 }
448                 if cl.wantConns() {
449                         return
450                 }
451                 cl.event.Wait()
452         }
453 }
454
455 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
456 func (cl *Client) rejectAccepted(conn net.Conn) error {
457         ra := conn.RemoteAddr()
458         if rip := addrIpOrNil(ra); rip != nil {
459                 if cl.config.DisableIPv4Peers && rip.To4() != nil {
460                         return errors.New("ipv4 peers disabled")
461                 }
462                 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
463                         return errors.New("ipv4 disabled")
464
465                 }
466                 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
467                         return errors.New("ipv6 disabled")
468                 }
469                 if cl.rateLimitAccept(rip) {
470                         return errors.New("source IP accepted rate limited")
471                 }
472                 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
473                         return errors.New("bad source addr")
474                 }
475         }
476         return nil
477 }
478
479 func (cl *Client) acceptConnections(l Listener) {
480         for {
481                 conn, err := l.Accept()
482                 torrent.Add("client listener accepts", 1)
483                 conn = pproffd.WrapNetConn(conn)
484                 cl.rLock()
485                 closed := cl.closed.IsSet()
486                 var reject error
487                 if conn != nil {
488                         reject = cl.rejectAccepted(conn)
489                 }
490                 cl.rUnlock()
491                 if closed {
492                         if conn != nil {
493                                 conn.Close()
494                         }
495                         return
496                 }
497                 if err != nil {
498                         log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
499                         continue
500                 }
501                 go func() {
502                         if reject != nil {
503                                 torrent.Add("rejected accepted connections", 1)
504                                 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
505                                 conn.Close()
506                         } else {
507                                 go cl.incomingConnection(conn)
508                         }
509                         log.Fmsg("accepted %q connection at %q from %q",
510                                 l.Addr().Network(),
511                                 conn.LocalAddr(),
512                                 conn.RemoteAddr(),
513                         ).SetLevel(log.Debug).Log(cl.logger)
514                         torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
515                         torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
516                         torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
517                 }()
518         }
519 }
520
521 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
522 func regularNetConnPeerConnConnString(nc net.Conn) string {
523         return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
524 }
525
526 func (cl *Client) incomingConnection(nc net.Conn) {
527         defer nc.Close()
528         if tc, ok := nc.(*net.TCPConn); ok {
529                 tc.SetLinger(0)
530         }
531         c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
532                 regularNetConnPeerConnConnString(nc))
533         defer c.close()
534         c.Discovery = PeerSourceIncoming
535         cl.runReceivedConn(c)
536 }
537
538 // Returns a handle to the given torrent, if it's present in the client.
539 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
540         cl.lock()
541         defer cl.unlock()
542         t, ok = cl.torrents[ih]
543         return
544 }
545
546 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
547         return cl.torrents[ih]
548 }
549
550 type dialResult struct {
551         Conn    net.Conn
552         Network string
553 }
554
555 func countDialResult(err error) {
556         if err == nil {
557                 torrent.Add("successful dials", 1)
558         } else {
559                 torrent.Add("unsuccessful dials", 1)
560         }
561 }
562
563 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
564         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
565         if ret < minDialTimeout {
566                 ret = minDialTimeout
567         }
568         return
569 }
570
571 // Returns whether an address is known to connect to a client with our own ID.
572 func (cl *Client) dopplegangerAddr(addr string) bool {
573         _, ok := cl.dopplegangerAddrs[addr]
574         return ok
575 }
576
577 // Returns a connection over UTP or TCP, whichever is first to connect.
578 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
579         {
580                 t := perf.NewTimer(perf.CallerName(0))
581                 defer func() {
582                         if res.Conn == nil {
583                                 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
584                         } else {
585                                 t.Mark("returned conn over " + res.Network)
586                         }
587                 }()
588         }
589         ctx, cancel := context.WithCancel(ctx)
590         // As soon as we return one connection, cancel the others.
591         defer cancel()
592         left := 0
593         resCh := make(chan dialResult, left)
594         func() {
595                 cl.lock()
596                 defer cl.unlock()
597                 cl.eachDialer(func(s Dialer) bool {
598                         func() {
599                                 left++
600                                 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
601                                 go func() {
602                                         resCh <- dialResult{
603                                                 cl.dialFromSocket(ctx, s, addr),
604                                                 s.LocalAddr().Network(),
605                                         }
606                                 }()
607                         }()
608                         return true
609                 })
610         }()
611         // Wait for a successful connection.
612         func() {
613                 defer perf.ScopeTimer()()
614                 for ; left > 0 && res.Conn == nil; left-- {
615                         res = <-resCh
616                 }
617         }()
618         // There are still incompleted dials.
619         go func() {
620                 for ; left > 0; left-- {
621                         conn := (<-resCh).Conn
622                         if conn != nil {
623                                 conn.Close()
624                         }
625                 }
626         }()
627         if res.Conn != nil {
628                 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
629         }
630         //if res.Conn != nil {
631         //      cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
632         //} else {
633         //      cl.logger.Printf("failed to dial %s", addr)
634         //}
635         return res
636 }
637
638 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
639         network := s.LocalAddr().Network()
640         cte := cl.config.ConnTracker.Wait(
641                 ctx,
642                 conntrack.Entry{network, s.LocalAddr().String(), addr},
643                 "dial torrent client",
644                 0,
645         )
646         // Try to avoid committing to a dial if the context is complete as it's difficult to determine
647         // which dial errors allow us to forget the connection tracking entry handle.
648         if ctx.Err() != nil {
649                 if cte != nil {
650                         cte.Forget()
651                 }
652                 return nil
653         }
654         c, err := s.Dial(ctx, addr)
655         // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
656         // it now in case we close the connection forthwith.
657         if tc, ok := c.(*net.TCPConn); ok {
658                 tc.SetLinger(0)
659         }
660         countDialResult(err)
661         if c == nil {
662                 if err != nil && forgettableDialError(err) {
663                         cte.Forget()
664                 } else {
665                         cte.Done()
666                 }
667                 return nil
668         }
669         return closeWrapper{c, func() error {
670                 err := c.Close()
671                 cte.Done()
672                 return err
673         }}
674 }
675
676 func forgettableDialError(err error) bool {
677         return strings.Contains(err.Error(), "no suitable address found")
678 }
679
680 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
681         if _, ok := t.halfOpen[addr]; !ok {
682                 panic("invariant broken")
683         }
684         delete(t.halfOpen, addr)
685         cl.numHalfOpen--
686         for _, t := range cl.torrents {
687                 t.openNewConns()
688         }
689 }
690
691 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
692 // for valid reasons.
693 func (cl *Client) initiateProtocolHandshakes(
694         ctx context.Context,
695         nc net.Conn,
696         t *Torrent,
697         outgoing, encryptHeader bool,
698         remoteAddr PeerRemoteAddr,
699         network, connString string,
700 ) (
701         c *PeerConn, err error,
702 ) {
703         c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
704         c.headerEncrypted = encryptHeader
705         ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
706         defer cancel()
707         dl, ok := ctx.Deadline()
708         if !ok {
709                 panic(ctx)
710         }
711         err = nc.SetDeadline(dl)
712         if err != nil {
713                 panic(err)
714         }
715         err = cl.initiateHandshakes(c, t)
716         return
717 }
718
719 // Returns nil connection and nil error if no connection could be established for valid reasons.
720 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
721         dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
722                 cl.rLock()
723                 defer cl.rUnlock()
724                 return t.dialTimeout()
725         }())
726         defer cancel()
727         dr := cl.dialFirst(dialCtx, addr.String())
728         nc := dr.Conn
729         if nc == nil {
730                 if dialCtx.Err() != nil {
731                         return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
732                 }
733                 return nil, errors.New("dial failed")
734         }
735         c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
736         if err != nil {
737                 nc.Close()
738         }
739         return c, err
740 }
741
742 // Returns nil connection and nil error if no connection could be established
743 // for valid reasons.
744 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
745         torrent.Add("establish outgoing connection", 1)
746         obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
747         c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
748         if err == nil {
749                 torrent.Add("initiated conn with preferred header obfuscation", 1)
750                 return
751         }
752         //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
753         if cl.config.HeaderObfuscationPolicy.RequirePreferred {
754                 // We should have just tried with the preferred header obfuscation. If it was required,
755                 // there's nothing else to try.
756                 return
757         }
758         // Try again with encryption if we didn't earlier, or without if we did.
759         c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
760         if err == nil {
761                 torrent.Add("initiated conn with fallback header obfuscation", 1)
762         }
763         //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
764         return
765 }
766
767 // Called to dial out and run a connection. The addr we're given is already
768 // considered half-open.
769 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
770         cl.dialRateLimiter.Wait(context.Background())
771         c, err := cl.establishOutgoingConn(t, addr)
772         cl.lock()
773         defer cl.unlock()
774         // Don't release lock between here and addPeerConn, unless it's for
775         // failure.
776         cl.noLongerHalfOpen(t, addr.String())
777         if err != nil {
778                 if cl.config.Debug {
779                         cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
780                 }
781                 return
782         }
783         defer c.close()
784         c.Discovery = ps
785         c.trusted = trusted
786         t.runHandshookConnLoggingErr(c)
787 }
788
789 // The port number for incoming peer connections. 0 if the client isn't listening.
790 func (cl *Client) incomingPeerPort() int {
791         return cl.LocalPort()
792 }
793
794 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
795         if c.headerEncrypted {
796                 var rw io.ReadWriter
797                 var err error
798                 rw, c.cryptoMethod, err = mse.InitiateHandshake(
799                         struct {
800                                 io.Reader
801                                 io.Writer
802                         }{c.r, c.w},
803                         t.infoHash[:],
804                         nil,
805                         cl.config.CryptoProvides,
806                 )
807                 c.setRW(rw)
808                 if err != nil {
809                         return xerrors.Errorf("header obfuscation handshake: %w", err)
810                 }
811         }
812         ih, err := cl.connBtHandshake(c, &t.infoHash)
813         if err != nil {
814                 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
815         }
816         if ih != t.infoHash {
817                 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
818         }
819         return nil
820 }
821
822 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
823 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
824 func (cl *Client) forSkeys(f func([]byte) bool) {
825         cl.rLock()
826         defer cl.rUnlock()
827         if false { // Emulate the bug from #114
828                 var firstIh InfoHash
829                 for ih := range cl.torrents {
830                         firstIh = ih
831                         break
832                 }
833                 for range cl.torrents {
834                         if !f(firstIh[:]) {
835                                 break
836                         }
837                 }
838                 return
839         }
840         for ih := range cl.torrents {
841                 if !f(ih[:]) {
842                         break
843                 }
844         }
845 }
846
847 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
848         if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
849                 return ret
850         }
851         return cl.forSkeys
852 }
853
854 // Do encryption and bittorrent handshakes as receiver.
855 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
856         defer perf.ScopeTimerErr(&err)()
857         var rw io.ReadWriter
858         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
859         c.setRW(rw)
860         if err == nil || err == mse.ErrNoSecretKeyMatch {
861                 if c.headerEncrypted {
862                         torrent.Add("handshakes received encrypted", 1)
863                 } else {
864                         torrent.Add("handshakes received unencrypted", 1)
865                 }
866         } else {
867                 torrent.Add("handshakes received with error while handling encryption", 1)
868         }
869         if err != nil {
870                 if err == mse.ErrNoSecretKeyMatch {
871                         err = nil
872                 }
873                 return
874         }
875         if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
876                 err = errors.New("connection does not have required header obfuscation")
877                 return
878         }
879         ih, err := cl.connBtHandshake(c, nil)
880         if err != nil {
881                 err = xerrors.Errorf("during bt handshake: %w", err)
882                 return
883         }
884         cl.lock()
885         t = cl.torrents[ih]
886         cl.unlock()
887         return
888 }
889
890 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
891         res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
892         if err != nil {
893                 return
894         }
895         ret = res.Hash
896         c.PeerExtensionBytes = res.PeerExtensionBits
897         c.PeerID = res.PeerID
898         c.completedHandshake = time.Now()
899         if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
900                 cb(c, res.Hash)
901         }
902         return
903 }
904
905 func (cl *Client) runReceivedConn(c *PeerConn) {
906         err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
907         if err != nil {
908                 panic(err)
909         }
910         t, err := cl.receiveHandshakes(c)
911         if err != nil {
912                 log.Fmsg(
913                         "error receiving handshakes on %v: %s", c, err,
914                 ).SetLevel(log.Debug).
915                         Add(
916                                 "network", c.Network,
917                         ).Log(cl.logger)
918                 torrent.Add("error receiving handshake", 1)
919                 cl.lock()
920                 cl.onBadAccept(c.RemoteAddr)
921                 cl.unlock()
922                 return
923         }
924         if t == nil {
925                 torrent.Add("received handshake for unloaded torrent", 1)
926                 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
927                 cl.lock()
928                 cl.onBadAccept(c.RemoteAddr)
929                 cl.unlock()
930                 return
931         }
932         torrent.Add("received handshake for loaded torrent", 1)
933         cl.lock()
934         defer cl.unlock()
935         t.runHandshookConnLoggingErr(c)
936 }
937
938 // Client lock must be held before entering this.
939 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
940         c.setTorrent(t)
941         if c.PeerID == cl.peerID {
942                 if c.outgoing {
943                         connsToSelf.Add(1)
944                         addr := c.conn.RemoteAddr().String()
945                         cl.dopplegangerAddrs[addr] = struct{}{}
946                 } else {
947                         // Because the remote address is not necessarily the same as its client's torrent listen
948                         // address, we won't record the remote address as a doppleganger. Instead, the initiator
949                         // can record *us* as the doppleganger.
950                 }
951                 return errors.New("local and remote peer ids are the same")
952         }
953         c.conn.SetWriteDeadline(time.Time{})
954         c.r = deadlineReader{c.conn, c.r}
955         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
956         if connIsIpv6(c.conn) {
957                 torrent.Add("completed handshake over ipv6", 1)
958         }
959         if err := t.addPeerConn(c); err != nil {
960                 return fmt.Errorf("adding connection: %w", err)
961         }
962         defer t.dropConnection(c)
963         go c.writer(time.Minute)
964         cl.sendInitialMessages(c, t)
965         err := c.mainReadLoop()
966         if err != nil {
967                 return fmt.Errorf("main read loop: %w", err)
968         }
969         return nil
970 }
971
972 // If peer requests are buffered on read, this instructs the amount of memory that might be used to
973 // cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks.
974 const localClientReqq = 1 << 5
975
976 // See the order given in Transmission's tr_peerMsgsNew.
977 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
978         if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
979                 conn.write(pp.Message{
980                         Type:       pp.Extended,
981                         ExtendedID: pp.HandshakeExtendedID,
982                         ExtendedPayload: func() []byte {
983                                 msg := pp.ExtendedHandshakeMessage{
984                                         M: map[pp.ExtensionName]pp.ExtensionNumber{
985                                                 pp.ExtensionNameMetadata: metadataExtendedId,
986                                         },
987                                         V:            cl.config.ExtendedHandshakeClientVersion,
988                                         Reqq:         localClientReqq,
989                                         YourIp:       pp.CompactIp(conn.remoteIp()),
990                                         Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
991                                         Port:         cl.incomingPeerPort(),
992                                         MetadataSize: torrent.metadataSize(),
993                                         // TODO: We can figured these out specific to the socket
994                                         // used.
995                                         Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
996                                         Ipv6: cl.config.PublicIp6.To16(),
997                                 }
998                                 if !cl.config.DisablePEX {
999                                         msg.M[pp.ExtensionNamePex] = pexExtendedId
1000                                 }
1001                                 return bencode.MustMarshal(msg)
1002                         }(),
1003                 })
1004         }
1005         func() {
1006                 if conn.fastEnabled() {
1007                         if torrent.haveAllPieces() {
1008                                 conn.write(pp.Message{Type: pp.HaveAll})
1009                                 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1010                                 return
1011                         } else if !torrent.haveAnyPieces() {
1012                                 conn.write(pp.Message{Type: pp.HaveNone})
1013                                 conn.sentHaves.Clear()
1014                                 return
1015                         }
1016                 }
1017                 conn.postBitfield()
1018         }()
1019         if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1020                 conn.write(pp.Message{
1021                         Type: pp.Port,
1022                         Port: cl.dhtPort(),
1023                 })
1024         }
1025 }
1026
1027 func (cl *Client) dhtPort() (ret uint16) {
1028         cl.eachDhtServer(func(s DhtServer) {
1029                 ret = uint16(missinggo.AddrPort(s.Addr()))
1030         })
1031         return
1032 }
1033
1034 func (cl *Client) haveDhtServer() (ret bool) {
1035         cl.eachDhtServer(func(_ DhtServer) {
1036                 ret = true
1037         })
1038         return
1039 }
1040
1041 // Process incoming ut_metadata message.
1042 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1043         var d map[string]int
1044         err := bencode.Unmarshal(payload, &d)
1045         if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1046         } else if err != nil {
1047                 return fmt.Errorf("error unmarshalling bencode: %s", err)
1048         }
1049         msgType, ok := d["msg_type"]
1050         if !ok {
1051                 return errors.New("missing msg_type field")
1052         }
1053         piece := d["piece"]
1054         switch msgType {
1055         case pp.DataMetadataExtensionMsgType:
1056                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1057                 if !c.requestedMetadataPiece(piece) {
1058                         return fmt.Errorf("got unexpected piece %d", piece)
1059                 }
1060                 c.metadataRequests[piece] = false
1061                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1062                 if begin < 0 || begin >= len(payload) {
1063                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1064                 }
1065                 t.saveMetadataPiece(piece, payload[begin:])
1066                 c.lastUsefulChunkReceived = time.Now()
1067                 err = t.maybeCompleteMetadata()
1068                 if err != nil {
1069                         // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1070                         // don't know who to blame. TODO: Also errors can be returned here that aren't related
1071                         // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1072                         // log consumers can filter for this message.
1073                         t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1074                 }
1075                 return err
1076         case pp.RequestMetadataExtensionMsgType:
1077                 if !t.haveMetadataPiece(piece) {
1078                         c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1079                         return nil
1080                 }
1081                 start := (1 << 14) * piece
1082                 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1083                 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1084                 return nil
1085         case pp.RejectMetadataExtensionMsgType:
1086                 return nil
1087         default:
1088                 return errors.New("unknown msg_type value")
1089         }
1090 }
1091
1092 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1093         if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1094                 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1095         }
1096         return false
1097 }
1098
1099 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1100         if port == 0 {
1101                 return true
1102         }
1103         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1104                 return true
1105         }
1106         if _, ok := cl.ipBlockRange(ip); ok {
1107                 return true
1108         }
1109         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1110                 return true
1111         }
1112         return false
1113 }
1114
1115 // Return a Torrent ready for insertion into a Client.
1116 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1117         // use provided storage, if provided
1118         storageClient := cl.defaultStorage
1119         if specStorage != nil {
1120                 storageClient = storage.NewClient(specStorage)
1121         }
1122
1123         t = &Torrent{
1124                 cl:       cl,
1125                 infoHash: ih,
1126                 peers: prioritizedPeers{
1127                         om: btree.New(32),
1128                         getPrio: func(p PeerInfo) peerPriority {
1129                                 ipPort := p.addr()
1130                                 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1131                         },
1132                 },
1133                 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1134
1135                 halfOpen:          make(map[string]PeerInfo),
1136                 pieceStateChanges: pubsub.NewPubSub(),
1137
1138                 storageOpener:       storageClient,
1139                 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1140
1141                 networkingEnabled: true,
1142                 metadataChanged: sync.Cond{
1143                         L: cl.locker(),
1144                 },
1145                 webSeeds: make(map[string]*Peer),
1146         }
1147         t._pendingPieces.NewSet = priorityBitmapStableNewSet
1148         t.logger = cl.logger.WithContextValue(t)
1149         t.setChunkSize(defaultChunkSize)
1150         return
1151 }
1152
1153 // A file-like handle to some torrent data resource.
1154 type Handle interface {
1155         io.Reader
1156         io.Seeker
1157         io.Closer
1158         io.ReaderAt
1159 }
1160
1161 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1162         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1163 }
1164
1165 // Adds a torrent by InfoHash with a custom Storage implementation.
1166 // If the torrent already exists then this Storage is ignored and the
1167 // existing torrent returned with `new` set to `false`
1168 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1169         cl.lock()
1170         defer cl.unlock()
1171         t, ok := cl.torrents[infoHash]
1172         if ok {
1173                 return
1174         }
1175         new = true
1176
1177         t = cl.newTorrent(infoHash, specStorage)
1178         cl.eachDhtServer(func(s DhtServer) {
1179                 go t.dhtAnnouncer(s)
1180         })
1181         cl.torrents[infoHash] = t
1182         cl.clearAcceptLimits()
1183         t.updateWantPeersEvent()
1184         // Tickle Client.waitAccept, new torrent may want conns.
1185         cl.event.Broadcast()
1186         return
1187 }
1188
1189 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1190 // Torrent.MergeSpec.
1191 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1192         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1193         err = t.MergeSpec(spec)
1194         if err != nil && new {
1195                 t.Drop()
1196         }
1197         return
1198 }
1199
1200 type stringAddr string
1201
1202 var _ net.Addr = stringAddr("")
1203
1204 func (stringAddr) Network() string   { return "" }
1205 func (me stringAddr) String() string { return string(me) }
1206
1207 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1208 // spec.DisallowDataDownload/Upload will be read and applied
1209 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1210 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1211         if spec.DisplayName != "" {
1212                 t.SetDisplayName(spec.DisplayName)
1213         }
1214         if spec.InfoBytes != nil {
1215                 err := t.SetInfoBytes(spec.InfoBytes)
1216                 if err != nil {
1217                         return err
1218                 }
1219         }
1220         cl := t.cl
1221         cl.AddDhtNodes(spec.DhtNodes)
1222         cl.lock()
1223         defer cl.unlock()
1224         useTorrentSources(spec.Sources, t)
1225         for _, url := range spec.Webseeds {
1226                 t.addWebSeed(url)
1227         }
1228         for _, peerAddr := range spec.PeerAddrs {
1229                 t.addPeer(PeerInfo{
1230                         Addr:    stringAddr(peerAddr),
1231                         Source:  PeerSourceDirect,
1232                         Trusted: true,
1233                 })
1234         }
1235         if spec.ChunkSize != 0 {
1236                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1237         }
1238         t.addTrackers(spec.Trackers)
1239         t.maybeNewConns()
1240         t.dataDownloadDisallowed = spec.DisallowDataDownload
1241         t.dataUploadDisallowed = spec.DisallowDataUpload
1242         return nil
1243 }
1244
1245 func useTorrentSources(sources []string, t *Torrent) {
1246         for _, s := range sources {
1247                 go func(s string) {
1248                         err := useTorrentSource(s, t)
1249                         if err != nil {
1250                                 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1251                         } else {
1252                                 t.logger.Printf("successfully used source %q", s)
1253                         }
1254                 }(s)
1255         }
1256 }
1257
1258 func useTorrentSource(source string, t *Torrent) error {
1259         req, err := http.NewRequest(http.MethodGet, source, nil)
1260         if err != nil {
1261                 panic(err)
1262         }
1263         ctx, cancel := context.WithCancel(context.Background())
1264         defer cancel()
1265         go func() {
1266                 select {
1267                 case <-t.GotInfo():
1268                 case <-t.Closed():
1269                 case <-ctx.Done():
1270                 }
1271                 cancel()
1272         }()
1273         req = req.WithContext(ctx)
1274         resp, err := http.DefaultClient.Do(req)
1275         if err != nil {
1276                 return err
1277         }
1278         mi, err := metainfo.Load(resp.Body)
1279         if err != nil {
1280                 if ctx.Err() != nil {
1281                         return nil
1282                 }
1283                 return err
1284         }
1285         return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1286 }
1287
1288 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1289         t, ok := cl.torrents[infoHash]
1290         if !ok {
1291                 err = fmt.Errorf("no such torrent")
1292                 return
1293         }
1294         err = t.close()
1295         if err != nil {
1296                 panic(err)
1297         }
1298         delete(cl.torrents, infoHash)
1299         return
1300 }
1301
1302 func (cl *Client) allTorrentsCompleted() bool {
1303         for _, t := range cl.torrents {
1304                 if !t.haveInfo() {
1305                         return false
1306                 }
1307                 if !t.haveAllPieces() {
1308                         return false
1309                 }
1310         }
1311         return true
1312 }
1313
1314 // Returns true when all torrents are completely downloaded and false if the
1315 // client is stopped before that.
1316 func (cl *Client) WaitAll() bool {
1317         cl.lock()
1318         defer cl.unlock()
1319         for !cl.allTorrentsCompleted() {
1320                 if cl.closed.IsSet() {
1321                         return false
1322                 }
1323                 cl.event.Wait()
1324         }
1325         return true
1326 }
1327
1328 // Returns handles to all the torrents loaded in the Client.
1329 func (cl *Client) Torrents() []*Torrent {
1330         cl.lock()
1331         defer cl.unlock()
1332         return cl.torrentsAsSlice()
1333 }
1334
1335 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1336         for _, t := range cl.torrents {
1337                 ret = append(ret, t)
1338         }
1339         return
1340 }
1341
1342 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1343         spec, err := TorrentSpecFromMagnetUri(uri)
1344         if err != nil {
1345                 return
1346         }
1347         T, _, err = cl.AddTorrentSpec(spec)
1348         return
1349 }
1350
1351 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1352         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1353         return
1354 }
1355
1356 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1357         mi, err := metainfo.LoadFromFile(filename)
1358         if err != nil {
1359                 return
1360         }
1361         return cl.AddTorrent(mi)
1362 }
1363
1364 func (cl *Client) DhtServers() []DhtServer {
1365         return cl.dhtServers
1366 }
1367
1368 func (cl *Client) AddDhtNodes(nodes []string) {
1369         for _, n := range nodes {
1370                 hmp := missinggo.SplitHostMaybePort(n)
1371                 ip := net.ParseIP(hmp.Host)
1372                 if ip == nil {
1373                         cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1374                         continue
1375                 }
1376                 ni := krpc.NodeInfo{
1377                         Addr: krpc.NodeAddr{
1378                                 IP:   ip,
1379                                 Port: hmp.Port,
1380                         },
1381                 }
1382                 cl.eachDhtServer(func(s DhtServer) {
1383                         s.AddNode(ni)
1384                 })
1385         }
1386 }
1387
1388 func (cl *Client) banPeerIP(ip net.IP) {
1389         cl.logger.Printf("banning ip %v", ip)
1390         if cl.badPeerIPs == nil {
1391                 cl.badPeerIPs = make(map[string]struct{})
1392         }
1393         cl.badPeerIPs[ip.String()] = struct{}{}
1394 }
1395
1396 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1397         if network == "" {
1398                 panic(remoteAddr)
1399         }
1400         c = &PeerConn{
1401                 Peer: Peer{
1402                         outgoing:        outgoing,
1403                         choking:         true,
1404                         peerChoking:     true,
1405                         PeerMaxRequests: 250,
1406
1407                         RemoteAddr: remoteAddr,
1408                         Network:    network,
1409                         callbacks:  &cl.config.Callbacks,
1410                 },
1411                 connString:  connString,
1412                 conn:        nc,
1413                 writeBuffer: new(bytes.Buffer),
1414         }
1415         c.peerImpl = c
1416         c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1417         c.writerCond.L = cl.locker()
1418         c.setRW(connStatsReadWriter{nc, c})
1419         c.r = &rateLimitedReader{
1420                 l: cl.config.DownloadRateLimiter,
1421                 r: c.r,
1422         }
1423         c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1424         for _, f := range cl.config.Callbacks.NewPeer {
1425                 f(&c.Peer)
1426         }
1427         return
1428 }
1429
1430 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1431         cl.lock()
1432         defer cl.unlock()
1433         t := cl.torrent(ih)
1434         if t == nil {
1435                 return
1436         }
1437         t.addPeers([]PeerInfo{{
1438                 Addr:   ipPortAddr{ip, port},
1439                 Source: PeerSourceDhtAnnouncePeer,
1440         }})
1441 }
1442
1443 func firstNotNil(ips ...net.IP) net.IP {
1444         for _, ip := range ips {
1445                 if ip != nil {
1446                         return ip
1447                 }
1448         }
1449         return nil
1450 }
1451
1452 func (cl *Client) eachDialer(f func(Dialer) bool) {
1453         for _, s := range cl.dialers {
1454                 if !f(s) {
1455                         break
1456                 }
1457         }
1458 }
1459
1460 func (cl *Client) eachListener(f func(Listener) bool) {
1461         for _, s := range cl.listeners {
1462                 if !f(s) {
1463                         break
1464                 }
1465         }
1466 }
1467
1468 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1469         cl.eachListener(func(l Listener) bool {
1470                 ret = l
1471                 return !f(l)
1472         })
1473         return
1474 }
1475
1476 func (cl *Client) publicIp(peer net.IP) net.IP {
1477         // TODO: Use BEP 10 to determine how peers are seeing us.
1478         if peer.To4() != nil {
1479                 return firstNotNil(
1480                         cl.config.PublicIp4,
1481                         cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1482                 )
1483         }
1484
1485         return firstNotNil(
1486                 cl.config.PublicIp6,
1487                 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1488         )
1489 }
1490
1491 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1492         l := cl.findListener(
1493                 func(l Listener) bool {
1494                         return f(addrIpOrNil(l.Addr()))
1495                 },
1496         )
1497         if l == nil {
1498                 return nil
1499         }
1500         return addrIpOrNil(l.Addr())
1501 }
1502
1503 // Our IP as a peer should see it.
1504 func (cl *Client) publicAddr(peer net.IP) IpPort {
1505         return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1506 }
1507
1508 // ListenAddrs addresses currently being listened to.
1509 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1510         cl.lock()
1511         defer cl.unlock()
1512         cl.eachListener(func(l Listener) bool {
1513                 ret = append(ret, l.Addr())
1514                 return true
1515         })
1516         return
1517 }
1518
1519 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1520         ipa, ok := tryIpPortFromNetAddr(addr)
1521         if !ok {
1522                 return
1523         }
1524         ip := maskIpForAcceptLimiting(ipa.IP)
1525         if cl.acceptLimiter == nil {
1526                 cl.acceptLimiter = make(map[ipStr]int)
1527         }
1528         cl.acceptLimiter[ipStr(ip.String())]++
1529 }
1530
1531 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1532         if ip4 := ip.To4(); ip4 != nil {
1533                 return ip4.Mask(net.CIDRMask(24, 32))
1534         }
1535         return ip
1536 }
1537
1538 func (cl *Client) clearAcceptLimits() {
1539         cl.acceptLimiter = nil
1540 }
1541
1542 func (cl *Client) acceptLimitClearer() {
1543         for {
1544                 select {
1545                 case <-cl.closed.LockedChan(cl.locker()):
1546                         return
1547                 case <-time.After(15 * time.Minute):
1548                         cl.lock()
1549                         cl.clearAcceptLimits()
1550                         cl.unlock()
1551                 }
1552         }
1553 }
1554
1555 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1556         if cl.config.DisableAcceptRateLimiting {
1557                 return false
1558         }
1559         return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1560 }
1561
1562 func (cl *Client) rLock() {
1563         cl._mu.RLock()
1564 }
1565
1566 func (cl *Client) rUnlock() {
1567         cl._mu.RUnlock()
1568 }
1569
1570 func (cl *Client) lock() {
1571         cl._mu.Lock()
1572 }
1573
1574 func (cl *Client) unlock() {
1575         cl._mu.Unlock()
1576 }
1577
1578 func (cl *Client) locker() *lockWithDeferreds {
1579         return &cl._mu
1580 }
1581
1582 func (cl *Client) String() string {
1583         return fmt.Sprintf("<%[1]T %[1]p>", cl)
1584 }
1585
1586 // Returns connection-level aggregate stats at the Client level. See the comment on
1587 // TorrentStats.ConnStats.
1588 func (cl *Client) ConnStats() ConnStats {
1589         return cl.stats.Copy()
1590 }