20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
85 webseedHttpClient *http.Client
90 func (cl *Client) BadPeerIPs() (ips []string) {
92 ips = cl.badPeerIPsLocked()
97 func (cl *Client) badPeerIPsLocked() (ips []string) {
98 ips = make([]string, len(cl.badPeerIPs))
100 for k := range cl.badPeerIPs {
107 func (cl *Client) PeerID() PeerID {
111 // Returns the port number for the first listener that has one. No longer assumes that all port
112 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
114 func (cl *Client) LocalPort() (port int) {
115 for i := 0; i < len(cl.listeners); i += 1 {
116 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
123 func writeDhtServerStatus(w io.Writer, s DhtServer) {
124 dhtStats := s.Stats()
125 fmt.Fprintf(w, " ID: %x\n", s.ID())
126 spew.Fdump(w, dhtStats)
129 // Writes out a human readable status of the client, such as for writing to a
131 func (cl *Client) WriteStatus(_w io.Writer) {
134 w := bufio.NewWriter(_w)
136 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
137 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
138 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
139 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
140 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
141 cl.eachDhtServer(func(s DhtServer) {
142 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
143 writeDhtServerStatus(w, s)
145 spew.Fdump(w, &cl.stats)
146 torrentsSlice := cl.torrentsAsSlice()
147 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
149 sort.Slice(torrentsSlice, func(l, r int) bool {
150 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
152 for _, t := range torrentsSlice {
154 fmt.Fprint(w, "<unknown name>")
156 fmt.Fprint(w, t.name())
162 "%f%% of %d bytes (%s)",
163 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
165 humanize.Bytes(uint64(*t.length)))
167 w.WriteString("<missing metainfo>")
175 // Filters things that are less than warning from UPnP discovery.
176 func upnpDiscoverLogFilter(m log.Msg) bool {
177 level, ok := m.GetLevel()
178 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
181 func (cl *Client) initLogger() {
182 logger := cl.config.Logger
185 if !cl.config.Debug {
186 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
189 cl.logger = logger.WithValues(cl)
192 func (cl *Client) announceKey() int32 {
193 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
196 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
197 func (cl *Client) init(cfg *ClientConfig) {
199 cl.dopplegangerAddrs = make(map[string]struct{})
200 cl.torrents = make(map[metainfo.Hash]*Torrent)
201 cl.dialRateLimiter = rate.NewLimiter(10, 10)
202 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 cl.event.L = cl.locker()
204 cl.ipBlockList = cfg.IPBlocklist
205 cl.webseedHttpClient = &http.Client{
206 Transport: &http.Transport{
207 Proxy: cfg.HTTPProxy,
213 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
215 cfg = NewDefaultClientConfig()
221 go cl.acceptLimitClearer()
230 storageImpl := cfg.DefaultStorage
231 if storageImpl == nil {
232 // We'd use mmap by default but HFS+ doesn't support sparse files.
233 storageImplCloser := storage.NewFile(cfg.DataDir)
234 cl.onClose = append(cl.onClose, func() {
235 if err := storageImplCloser.Close(); err != nil {
236 cl.logger.Printf("error closing default storage: %s", err)
239 storageImpl = storageImplCloser
241 cl.defaultStorage = storage.NewClient(storageImpl)
243 if cfg.PeerID != "" {
244 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
246 o := copy(cl.peerID[:], cfg.Bep20)
247 _, err = rand.Read(cl.peerID[o:])
249 panic("error generating peer id")
253 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
261 for _, _s := range sockets {
262 s := _s // Go is fucking retarded.
263 cl.onClose = append(cl.onClose, func() { go s.Close() })
264 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
265 cl.dialers = append(cl.dialers, s)
266 cl.listeners = append(cl.listeners, s)
267 if cl.config.AcceptPeerConnections {
268 go cl.acceptConnections(s)
275 for _, s := range sockets {
276 if pc, ok := s.(net.PacketConn); ok {
277 ds, err := cl.NewAnacrolixDhtServer(pc)
281 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
282 cl.onClose = append(cl.onClose, func() { ds.Close() })
287 cl.websocketTrackers = websocketTrackers{
290 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
293 t, ok := cl.torrents[infoHash]
295 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
297 return t.announceRequest(event), nil
299 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
302 t, ok := cl.torrents[dcc.InfoHash]
304 cl.logger.WithDefaultLevel(log.Warning).Printf(
305 "got webrtc conn for unloaded torrent with infohash %x",
311 go t.onWebRtcConn(dc, dcc)
318 func (cl *Client) AddDhtServer(d DhtServer) {
319 cl.dhtServers = append(cl.dhtServers, d)
322 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
323 // given address for any Torrent.
324 func (cl *Client) AddDialer(d Dialer) {
327 cl.dialers = append(cl.dialers, d)
328 for _, t := range cl.torrents {
333 func (cl *Client) Listeners() []Listener {
337 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
339 func (cl *Client) AddListener(l Listener) {
340 cl.listeners = append(cl.listeners, l)
341 if cl.config.AcceptPeerConnections {
342 go cl.acceptConnections(l)
346 func (cl *Client) firewallCallback(net.Addr) bool {
348 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
351 torrent.Add("connections firewalled", 1)
353 torrent.Add("connections not firewalled", 1)
358 func (cl *Client) listenOnNetwork(n network) bool {
359 if n.Ipv4 && cl.config.DisableIPv4 {
362 if n.Ipv6 && cl.config.DisableIPv6 {
365 if n.Tcp && cl.config.DisableTCP {
368 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
374 func (cl *Client) listenNetworks() (ns []network) {
375 for _, n := range allPeerNetworks {
376 if cl.listenOnNetwork(n) {
383 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
384 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
385 cfg := dht.ServerConfig{
386 IPBlocklist: cl.ipBlockList,
388 OnAnnouncePeer: cl.onDHTAnnouncePeer,
389 PublicIP: func() net.IP {
390 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
391 return cl.config.PublicIp6
393 return cl.config.PublicIp4
395 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
396 OnQuery: cl.config.DHTOnQuery,
397 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
399 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
402 s, err = dht.NewServer(&cfg)
405 ts, err := s.Bootstrap()
407 cl.logger.Printf("error bootstrapping dht: %s", err)
409 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
415 func (cl *Client) Closed() events.Done {
416 return cl.closed.Done()
419 func (cl *Client) eachDhtServer(f func(DhtServer)) {
420 for _, ds := range cl.dhtServers {
425 // Stops the client. All connections to peers are closed and all activity will come to a halt.
426 func (cl *Client) Close() (errs []error) {
427 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
429 for _, t := range cl.torrents {
430 err := t.close(&closeGroup)
432 errs = append(errs, err)
435 for i := range cl.onClose {
436 cl.onClose[len(cl.onClose)-1-i]()
441 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
445 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
446 if cl.ipBlockList == nil {
449 return cl.ipBlockList.Lookup(ip)
452 func (cl *Client) ipIsBlocked(ip net.IP) bool {
453 _, blocked := cl.ipBlockRange(ip)
457 func (cl *Client) wantConns() bool {
458 if cl.config.AlwaysWantConns {
461 for _, t := range cl.torrents {
469 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
470 func (cl *Client) rejectAccepted(conn net.Conn) error {
472 return errors.New("don't want conns right now")
474 ra := conn.RemoteAddr()
475 if rip := addrIpOrNil(ra); rip != nil {
476 if cl.config.DisableIPv4Peers && rip.To4() != nil {
477 return errors.New("ipv4 peers disabled")
479 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
480 return errors.New("ipv4 disabled")
482 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
483 return errors.New("ipv6 disabled")
485 if cl.rateLimitAccept(rip) {
486 return errors.New("source IP accepted rate limited")
488 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
489 return errors.New("bad source addr")
495 func (cl *Client) acceptConnections(l Listener) {
497 conn, err := l.Accept()
498 torrent.Add("client listener accepts", 1)
499 conn = pproffd.WrapNetConn(conn)
501 closed := cl.closed.IsSet()
503 if !closed && conn != nil {
504 reject = cl.rejectAccepted(conn)
514 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add("rejected accepted connections", 1)
520 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
523 go cl.incomingConnection(conn)
525 log.Fmsg("accepted %q connection at %q from %q",
529 ).SetLevel(log.Debug).Log(cl.logger)
530 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
531 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
532 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
537 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
538 func regularNetConnPeerConnConnString(nc net.Conn) string {
539 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
542 func (cl *Client) incomingConnection(nc net.Conn) {
544 if tc, ok := nc.(*net.TCPConn); ok {
547 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
548 regularNetConnPeerConnConnString(nc))
554 c.Discovery = PeerSourceIncoming
555 cl.runReceivedConn(c)
558 // Returns a handle to the given torrent, if it's present in the client.
559 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
562 t, ok = cl.torrents[ih]
566 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
567 return cl.torrents[ih]
570 type DialResult struct {
575 func countDialResult(err error) {
577 torrent.Add("successful dials", 1)
579 torrent.Add("unsuccessful dials", 1)
583 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
584 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
585 if ret < minDialTimeout {
591 // Returns whether an address is known to connect to a client with our own ID.
592 func (cl *Client) dopplegangerAddr(addr string) bool {
593 _, ok := cl.dopplegangerAddrs[addr]
597 // Returns a connection over UTP or TCP, whichever is first to connect.
598 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
599 return DialFirst(ctx, addr, cl.dialers)
602 // Returns a connection over UTP or TCP, whichever is first to connect.
603 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605 t := perf.NewTimer(perf.CallerName(0))
608 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
614 ctx, cancel := context.WithCancel(ctx)
615 // As soon as we return one connection, cancel the others.
618 resCh := make(chan DialResult, left)
619 for _, _s := range dialers {
624 dialFromSocket(ctx, s, addr),
629 // Wait for a successful connection.
631 defer perf.ScopeTimer()()
632 for ; left > 0 && res.Conn == nil; left-- {
636 // There are still incompleted dials.
638 for ; left > 0; left-- {
639 conn := (<-resCh).Conn
646 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
651 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
662 func forgettableDialError(err error) bool {
663 return strings.Contains(err.Error(), "no suitable address found")
666 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
667 if _, ok := t.halfOpen[addr]; !ok {
668 panic("invariant broken")
670 delete(t.halfOpen, addr)
672 for _, t := range cl.torrents {
677 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
678 // for valid reasons.
679 func (cl *Client) initiateProtocolHandshakes(
683 outgoing, encryptHeader bool,
684 remoteAddr PeerRemoteAddr,
685 network, connString string,
687 c *PeerConn, err error,
689 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
690 c.headerEncrypted = encryptHeader
691 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693 dl, ok := ctx.Deadline()
697 err = nc.SetDeadline(dl)
701 err = cl.initiateHandshakes(c, t)
705 // Returns nil connection and nil error if no connection could be established for valid reasons.
706 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
707 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
710 return t.dialTimeout()
713 dr := cl.dialFirst(dialCtx, addr.String())
716 if dialCtx.Err() != nil {
717 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719 return nil, errors.New("dial failed")
721 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
728 // Returns nil connection and nil error if no connection could be established
729 // for valid reasons.
730 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
731 torrent.Add("establish outgoing connection", 1)
732 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
733 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with preferred header obfuscation", 1)
738 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
739 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
740 // We should have just tried with the preferred header obfuscation. If it was required,
741 // there's nothing else to try.
744 // Try again with encryption if we didn't earlier, or without if we did.
745 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with fallback header obfuscation", 1)
749 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
753 // Called to dial out and run a connection. The addr we're given is already
754 // considered half-open.
755 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
756 cl.dialRateLimiter.Wait(context.Background())
757 c, err := cl.establishOutgoingConn(t, addr)
759 c.conn.SetWriteDeadline(time.Time{})
763 // Don't release lock between here and addPeerConn, unless it's for
765 cl.noLongerHalfOpen(t, addr.String())
768 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
775 t.runHandshookConnLoggingErr(c)
778 // The port number for incoming peer connections. 0 if the client isn't listening.
779 func (cl *Client) incomingPeerPort() int {
780 return cl.LocalPort()
783 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
784 if c.headerEncrypted {
787 rw, c.cryptoMethod, err = mse.InitiateHandshake(
794 cl.config.CryptoProvides,
798 return fmt.Errorf("header obfuscation handshake: %w", err)
801 ih, err := cl.connBtHandshake(c, &t.infoHash)
803 return fmt.Errorf("bittorrent protocol handshake: %w", err)
805 if ih != t.infoHash {
806 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
811 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
812 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
813 func (cl *Client) forSkeys(f func([]byte) bool) {
816 if false { // Emulate the bug from #114
818 for ih := range cl.torrents {
822 for range cl.torrents {
829 for ih := range cl.torrents {
836 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
837 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
843 // Do encryption and bittorrent handshakes as receiver.
844 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
845 defer perf.ScopeTimerErr(&err)()
847 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
849 if err == nil || err == mse.ErrNoSecretKeyMatch {
850 if c.headerEncrypted {
851 torrent.Add("handshakes received encrypted", 1)
853 torrent.Add("handshakes received unencrypted", 1)
856 torrent.Add("handshakes received with error while handling encryption", 1)
859 if err == mse.ErrNoSecretKeyMatch {
864 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
865 err = errors.New("connection does not have required header obfuscation")
868 ih, err := cl.connBtHandshake(c, nil)
870 return nil, fmt.Errorf("during bt handshake: %w", err)
878 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
882 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
883 &successfulPeerWireProtocolHandshakePeerReservedBytes)
886 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
887 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
891 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
893 c.PeerExtensionBytes = res.PeerExtensionBits
894 c.PeerID = res.PeerID
895 c.completedHandshake = time.Now()
896 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
902 func (cl *Client) runReceivedConn(c *PeerConn) {
903 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
907 t, err := cl.receiveHandshakes(c)
910 "error receiving handshakes on %v: %s", c, err,
911 ).SetLevel(log.Debug).
913 "network", c.Network,
915 torrent.Add("error receiving handshake", 1)
917 cl.onBadAccept(c.RemoteAddr)
922 torrent.Add("received handshake for unloaded torrent", 1)
923 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
925 cl.onBadAccept(c.RemoteAddr)
929 torrent.Add("received handshake for loaded torrent", 1)
930 c.conn.SetWriteDeadline(time.Time{})
933 t.runHandshookConnLoggingErr(c)
936 // Client lock must be held before entering this.
937 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
939 for i, b := range cl.config.MinPeerExtensions {
940 if c.PeerExtensionBytes[i]&b != b {
941 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
944 if c.PeerID == cl.peerID {
947 addr := c.RemoteAddr.String()
948 cl.dopplegangerAddrs[addr] = struct{}{}
950 // Because the remote address is not necessarily the same as its client's torrent listen
951 // address, we won't record the remote address as a doppleganger. Instead, the initiator
952 // can record *us* as the doppleganger.
954 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
957 c.r = deadlineReader{c.conn, c.r}
958 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
959 if connIsIpv6(c.conn) {
960 torrent.Add("completed handshake over ipv6", 1)
962 if err := t.addPeerConn(c); err != nil {
963 return fmt.Errorf("adding connection: %w", err)
965 defer t.dropConnection(c)
967 cl.sendInitialMessages(c, t)
968 c.initUpdateRequestsTimer()
969 err := c.mainReadLoop()
971 return fmt.Errorf("main read loop: %w", err)
978 func (p *Peer) initUpdateRequestsTimer() {
980 if p.updateRequestsTimer != nil {
981 panic(p.updateRequestsTimer)
984 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
985 p.updateRequestsTimer.Stop()
988 func (c *Peer) updateRequestsTimerFunc() {
990 defer c.locker().Unlock()
991 if c.closed.IsSet() {
994 if c.needRequestUpdate != "" {
997 if c.isLowOnRequests() {
998 // If there are no outstanding requests, then a request update should have already run.
1001 c.updateRequests("updateRequestsTimer")
1004 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1005 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1006 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1007 const localClientReqq = 1 << 5
1009 // See the order given in Transmission's tr_peerMsgsNew.
1010 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1011 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1012 conn.write(pp.Message{
1014 ExtendedID: pp.HandshakeExtendedID,
1015 ExtendedPayload: func() []byte {
1016 msg := pp.ExtendedHandshakeMessage{
1017 M: map[pp.ExtensionName]pp.ExtensionNumber{
1018 pp.ExtensionNameMetadata: metadataExtendedId,
1020 V: cl.config.ExtendedHandshakeClientVersion,
1021 Reqq: localClientReqq,
1022 YourIp: pp.CompactIp(conn.remoteIp()),
1023 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1024 Port: cl.incomingPeerPort(),
1025 MetadataSize: torrent.metadataSize(),
1026 // TODO: We can figured these out specific to the socket
1028 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1029 Ipv6: cl.config.PublicIp6.To16(),
1031 if !cl.config.DisablePEX {
1032 msg.M[pp.ExtensionNamePex] = pexExtendedId
1034 return bencode.MustMarshal(msg)
1039 if conn.fastEnabled() {
1040 if torrent.haveAllPieces() {
1041 conn.write(pp.Message{Type: pp.HaveAll})
1042 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1044 } else if !torrent.haveAnyPieces() {
1045 conn.write(pp.Message{Type: pp.HaveNone})
1046 conn.sentHaves.Clear()
1052 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1053 conn.write(pp.Message{
1060 func (cl *Client) dhtPort() (ret uint16) {
1061 if len(cl.dhtServers) == 0 {
1064 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1067 func (cl *Client) haveDhtServer() bool {
1068 return len(cl.dhtServers) > 0
1071 // Process incoming ut_metadata message.
1072 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1073 var d pp.ExtendedMetadataRequestMsg
1074 err := bencode.Unmarshal(payload, &d)
1075 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1076 } else if err != nil {
1077 return fmt.Errorf("error unmarshalling bencode: %s", err)
1081 case pp.DataMetadataExtensionMsgType:
1082 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1083 if !c.requestedMetadataPiece(piece) {
1084 return fmt.Errorf("got unexpected piece %d", piece)
1086 c.metadataRequests[piece] = false
1087 begin := len(payload) - d.PieceSize()
1088 if begin < 0 || begin >= len(payload) {
1089 return fmt.Errorf("data has bad offset in payload: %d", begin)
1091 t.saveMetadataPiece(piece, payload[begin:])
1092 c.lastUsefulChunkReceived = time.Now()
1093 err = t.maybeCompleteMetadata()
1095 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1096 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1097 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1098 // log consumers can filter for this message.
1099 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1102 case pp.RequestMetadataExtensionMsgType:
1103 if !t.haveMetadataPiece(piece) {
1104 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1107 start := (1 << 14) * piece
1108 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1109 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1111 case pp.RejectMetadataExtensionMsgType:
1114 return errors.New("unknown msg_type value")
1118 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1119 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1120 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1125 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1129 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1132 if _, ok := cl.ipBlockRange(ip); ok {
1135 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1141 // Return a Torrent ready for insertion into a Client.
1142 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1143 return cl.newTorrentOpt(AddTorrentOpts{
1145 Storage: specStorage,
1149 // Return a Torrent ready for insertion into a Client.
1150 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1151 // use provided storage, if provided
1152 storageClient := cl.defaultStorage
1153 if opts.Storage != nil {
1154 storageClient = storage.NewClient(opts.Storage)
1159 infoHash: opts.InfoHash,
1160 peers: prioritizedPeers{
1162 getPrio: func(p PeerInfo) peerPriority {
1164 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1167 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1169 halfOpen: make(map[string]PeerInfo),
1170 pieceStateChanges: pubsub.NewPubSub(),
1172 storageOpener: storageClient,
1173 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1175 metadataChanged: sync.Cond{
1178 webSeeds: make(map[string]*Peer),
1179 gotMetainfoC: make(chan struct{}),
1181 t.networkingEnabled.Set()
1182 t.logger = cl.logger.WithContextValue(t)
1183 if opts.ChunkSize == 0 {
1184 opts.ChunkSize = defaultChunkSize
1186 t.setChunkSize(opts.ChunkSize)
1190 // A file-like handle to some torrent data resource.
1191 type Handle interface {
1198 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1199 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1202 // Adds a torrent by InfoHash with a custom Storage implementation.
1203 // If the torrent already exists then this Storage is ignored and the
1204 // existing torrent returned with `new` set to `false`
1205 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1208 t, ok := cl.torrents[infoHash]
1214 t = cl.newTorrent(infoHash, specStorage)
1215 cl.eachDhtServer(func(s DhtServer) {
1216 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1217 go t.dhtAnnouncer(s)
1220 cl.torrents[infoHash] = t
1221 cl.clearAcceptLimits()
1222 t.updateWantPeersEvent()
1223 // Tickle Client.waitAccept, new torrent may want conns.
1224 cl.event.Broadcast()
1228 // Adds a torrent by InfoHash with a custom Storage implementation.
1229 // If the torrent already exists then this Storage is ignored and the
1230 // existing torrent returned with `new` set to `false`
1231 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1232 infoHash := opts.InfoHash
1235 t, ok := cl.torrents[infoHash]
1241 t = cl.newTorrentOpt(opts)
1242 cl.eachDhtServer(func(s DhtServer) {
1243 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1244 go t.dhtAnnouncer(s)
1247 cl.torrents[infoHash] = t
1248 cl.clearAcceptLimits()
1249 t.updateWantPeersEvent()
1250 // Tickle Client.waitAccept, new torrent may want conns.
1251 cl.event.Broadcast()
1255 type AddTorrentOpts struct {
1257 Storage storage.ClientImpl
1258 ChunkSize pp.Integer
1261 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1262 // Torrent.MergeSpec.
1263 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1264 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1265 InfoHash: spec.InfoHash,
1266 Storage: spec.Storage,
1267 ChunkSize: spec.ChunkSize,
1271 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1273 modSpec.ChunkSize = 0
1275 err = t.MergeSpec(&modSpec)
1276 if err != nil && new {
1282 type stringAddr string
1284 var _ net.Addr = stringAddr("")
1286 func (stringAddr) Network() string { return "" }
1287 func (me stringAddr) String() string { return string(me) }
1289 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1290 // spec.DisallowDataDownload/Upload will be read and applied
1291 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1292 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1293 if spec.DisplayName != "" {
1294 t.SetDisplayName(spec.DisplayName)
1296 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1297 if spec.InfoBytes != nil {
1298 err := t.SetInfoBytes(spec.InfoBytes)
1304 cl.AddDhtNodes(spec.DhtNodes)
1307 useTorrentSources(spec.Sources, t)
1308 for _, url := range spec.Webseeds {
1311 for _, peerAddr := range spec.PeerAddrs {
1313 Addr: stringAddr(peerAddr),
1314 Source: PeerSourceDirect,
1318 if spec.ChunkSize != 0 {
1319 panic("chunk size cannot be changed for existing Torrent")
1321 t.addTrackers(spec.Trackers)
1323 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1324 t.dataUploadDisallowed = spec.DisallowDataUpload
1328 func useTorrentSources(sources []string, t *Torrent) {
1329 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1330 ctx := context.Background()
1331 for i := 0; i < len(sources); i += 1 {
1334 if err := useTorrentSource(ctx, s, t); err != nil {
1335 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1337 t.logger.Printf("successfully used source %q", s)
1343 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1344 ctx, cancel := context.WithCancel(ctx)
1354 var req *http.Request
1355 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1358 var resp *http.Response
1359 if resp, err = http.DefaultClient.Do(req); err != nil {
1362 var mi metainfo.MetaInfo
1363 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1366 if ctx.Err() != nil {
1371 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1374 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1375 t, ok := cl.torrents[infoHash]
1377 err = fmt.Errorf("no such torrent")
1384 delete(cl.torrents, infoHash)
1388 func (cl *Client) allTorrentsCompleted() bool {
1389 for _, t := range cl.torrents {
1393 if !t.haveAllPieces() {
1400 // Returns true when all torrents are completely downloaded and false if the
1401 // client is stopped before that.
1402 func (cl *Client) WaitAll() bool {
1405 for !cl.allTorrentsCompleted() {
1406 if cl.closed.IsSet() {
1414 // Returns handles to all the torrents loaded in the Client.
1415 func (cl *Client) Torrents() []*Torrent {
1418 return cl.torrentsAsSlice()
1421 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1422 for _, t := range cl.torrents {
1423 ret = append(ret, t)
1428 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1429 spec, err := TorrentSpecFromMagnetUri(uri)
1433 T, _, err = cl.AddTorrentSpec(spec)
1437 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1438 ts, err := TorrentSpecFromMetaInfoErr(mi)
1442 T, _, err = cl.AddTorrentSpec(ts)
1446 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1447 mi, err := metainfo.LoadFromFile(filename)
1451 return cl.AddTorrent(mi)
1454 func (cl *Client) DhtServers() []DhtServer {
1455 return cl.dhtServers
1458 func (cl *Client) AddDhtNodes(nodes []string) {
1459 for _, n := range nodes {
1460 hmp := missinggo.SplitHostMaybePort(n)
1461 ip := net.ParseIP(hmp.Host)
1463 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1466 ni := krpc.NodeInfo{
1467 Addr: krpc.NodeAddr{
1472 cl.eachDhtServer(func(s DhtServer) {
1478 func (cl *Client) banPeerIP(ip net.IP) {
1479 cl.logger.Printf("banning ip %v", ip)
1480 if cl.badPeerIPs == nil {
1481 cl.badPeerIPs = make(map[string]struct{})
1483 cl.badPeerIPs[ip.String()] = struct{}{}
1486 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1495 PeerMaxRequests: 250,
1497 RemoteAddr: remoteAddr,
1499 callbacks: &cl.config.Callbacks,
1501 connString: connString,
1505 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1506 c.setRW(connStatsReadWriter{nc, c})
1507 c.r = &rateLimitedReader{
1508 l: cl.config.DownloadRateLimiter,
1511 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1512 for _, f := range cl.config.Callbacks.NewPeer {
1518 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1525 t.addPeers([]PeerInfo{{
1526 Addr: ipPortAddr{ip, port},
1527 Source: PeerSourceDhtAnnouncePeer,
1531 func firstNotNil(ips ...net.IP) net.IP {
1532 for _, ip := range ips {
1540 func (cl *Client) eachListener(f func(Listener) bool) {
1541 for _, s := range cl.listeners {
1548 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1549 for i := 0; i < len(cl.listeners); i += 1 {
1550 if ret = cl.listeners[i]; f(ret) {
1557 func (cl *Client) publicIp(peer net.IP) net.IP {
1558 // TODO: Use BEP 10 to determine how peers are seeing us.
1559 if peer.To4() != nil {
1561 cl.config.PublicIp4,
1562 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1567 cl.config.PublicIp6,
1568 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1572 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1573 l := cl.findListener(
1574 func(l Listener) bool {
1575 return f(addrIpOrNil(l.Addr()))
1581 return addrIpOrNil(l.Addr())
1584 // Our IP as a peer should see it.
1585 func (cl *Client) publicAddr(peer net.IP) IpPort {
1586 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1589 // ListenAddrs addresses currently being listened to.
1590 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1592 ret = make([]net.Addr, len(cl.listeners))
1593 for i := 0; i < len(cl.listeners); i += 1 {
1594 ret[i] = cl.listeners[i].Addr()
1600 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1601 ipa, ok := tryIpPortFromNetAddr(addr)
1605 ip := maskIpForAcceptLimiting(ipa.IP)
1606 if cl.acceptLimiter == nil {
1607 cl.acceptLimiter = make(map[ipStr]int)
1609 cl.acceptLimiter[ipStr(ip.String())]++
1612 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1613 if ip4 := ip.To4(); ip4 != nil {
1614 return ip4.Mask(net.CIDRMask(24, 32))
1619 func (cl *Client) clearAcceptLimits() {
1620 cl.acceptLimiter = nil
1623 func (cl *Client) acceptLimitClearer() {
1626 case <-cl.closed.Done():
1628 case <-time.After(15 * time.Minute):
1630 cl.clearAcceptLimits()
1636 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1637 if cl.config.DisableAcceptRateLimiting {
1640 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1643 func (cl *Client) rLock() {
1647 func (cl *Client) rUnlock() {
1651 func (cl *Client) lock() {
1655 func (cl *Client) unlock() {
1659 func (cl *Client) locker() *lockWithDeferreds {
1663 func (cl *Client) String() string {
1664 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1667 // Returns connection-level aggregate stats at the Client level. See the comment on
1668 // TorrentStats.ConnStats.
1669 func (cl *Client) ConnStats() ConnStats {
1670 return cl.stats.Copy()