22 "github.com/anacrolix/chansync/events"
23 "github.com/anacrolix/dht/v2"
24 "github.com/anacrolix/dht/v2/krpc"
25 "github.com/anacrolix/log"
26 "github.com/anacrolix/missinggo/perf"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/v2"
29 "github.com/anacrolix/missinggo/v2/bitmap"
30 "github.com/anacrolix/missinggo/v2/pproffd"
31 "github.com/anacrolix/sync"
32 "github.com/anacrolix/torrent/generics"
33 "github.com/anacrolix/torrent/option"
34 request_strategy "github.com/anacrolix/torrent/request-strategy"
35 "github.com/davecgh/go-spew/spew"
36 "github.com/dustin/go-humanize"
37 "github.com/google/btree"
38 "github.com/pion/datachannel"
39 "golang.org/x/time/rate"
41 "github.com/anacrolix/chansync"
43 "github.com/anacrolix/torrent/bencode"
44 "github.com/anacrolix/torrent/internal/limiter"
45 "github.com/anacrolix/torrent/iplist"
46 "github.com/anacrolix/torrent/metainfo"
47 "github.com/anacrolix/torrent/mse"
48 pp "github.com/anacrolix/torrent/peer_protocol"
49 "github.com/anacrolix/torrent/storage"
50 "github.com/anacrolix/torrent/tracker"
51 "github.com/anacrolix/torrent/webtorrent"
54 // Clients contain zero or more Torrents. A Client manages a blocklist, the
55 // TCP/UDP protocol ports, and DHT as desired.
57 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
63 closed chansync.SetOnce
69 defaultStorage *storage.Client
73 dhtServers []DhtServer
74 ipBlockList iplist.Ranger
76 // Set of addresses that have our client ID. This intentionally will
77 // include ourselves if we end up trying to connect to our own address
78 // through legitimate channels.
79 dopplegangerAddrs map[string]struct{}
80 badPeerIPs map[netip.Addr]struct{}
81 torrents map[InfoHash]*Torrent
82 pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
84 acceptLimiter map[ipStr]int
85 dialRateLimiter *rate.Limiter
88 websocketTrackers websocketTrackers
90 activeAnnounceLimiter limiter.Instance
91 webseedHttpClient *http.Client
96 func (cl *Client) BadPeerIPs() (ips []string) {
98 ips = cl.badPeerIPsLocked()
103 func (cl *Client) badPeerIPsLocked() (ips []string) {
104 ips = make([]string, len(cl.badPeerIPs))
106 for k := range cl.badPeerIPs {
113 func (cl *Client) PeerID() PeerID {
117 // Returns the port number for the first listener that has one. No longer assumes that all port
118 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
120 func (cl *Client) LocalPort() (port int) {
121 for i := 0; i < len(cl.listeners); i += 1 {
122 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
129 func writeDhtServerStatus(w io.Writer, s DhtServer) {
130 dhtStats := s.Stats()
131 fmt.Fprintf(w, " ID: %x\n", s.ID())
132 spew.Fdump(w, dhtStats)
135 // Writes out a human readable status of the client, such as for writing to a
137 func (cl *Client) WriteStatus(_w io.Writer) {
140 w := bufio.NewWriter(_w)
142 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
143 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
144 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
145 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
146 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
147 cl.eachDhtServer(func(s DhtServer) {
148 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
149 writeDhtServerStatus(w, s)
151 spew.Fdump(w, &cl.stats)
152 torrentsSlice := cl.torrentsAsSlice()
153 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
155 sort.Slice(torrentsSlice, func(l, r int) bool {
156 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
158 for _, t := range torrentsSlice {
160 fmt.Fprint(w, "<unknown name>")
162 fmt.Fprint(w, t.name())
168 "%f%% of %d bytes (%s)",
169 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
171 humanize.Bytes(uint64(*t.length)))
173 w.WriteString("<missing metainfo>")
181 // Filters things that are less than warning from UPnP discovery.
182 func upnpDiscoverLogFilter(m log.Msg) bool {
183 level, ok := m.GetLevel()
184 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
187 func (cl *Client) initLogger() {
188 logger := cl.config.Logger
191 if !cl.config.Debug {
192 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
195 cl.logger = logger.WithValues(cl)
198 func (cl *Client) announceKey() int32 {
199 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
202 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
203 func (cl *Client) init(cfg *ClientConfig) {
205 generics.MakeMap(&cl.dopplegangerAddrs)
206 cl.torrents = make(map[metainfo.Hash]*Torrent)
207 cl.dialRateLimiter = rate.NewLimiter(10, 10)
208 cl.activeAnnounceLimiter.SlotsPerKey = 2
209 cl.event.L = cl.locker()
210 cl.ipBlockList = cfg.IPBlocklist
211 cl.webseedHttpClient = &http.Client{
212 Transport: &http.Transport{
213 Proxy: cfg.HTTPProxy,
219 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
221 cfg = NewDefaultClientConfig()
227 go cl.acceptLimitClearer()
236 storageImpl := cfg.DefaultStorage
237 if storageImpl == nil {
238 // We'd use mmap by default but HFS+ doesn't support sparse files.
239 storageImplCloser := storage.NewFile(cfg.DataDir)
240 cl.onClose = append(cl.onClose, func() {
241 if err := storageImplCloser.Close(); err != nil {
242 cl.logger.Printf("error closing default storage: %s", err)
245 storageImpl = storageImplCloser
247 cl.defaultStorage = storage.NewClient(storageImpl)
249 if cfg.PeerID != "" {
250 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
252 o := copy(cl.peerID[:], cfg.Bep20)
253 _, err = rand.Read(cl.peerID[o:])
255 panic("error generating peer id")
259 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
267 for _, _s := range sockets {
268 s := _s // Go is fucking retarded.
269 cl.onClose = append(cl.onClose, func() { go s.Close() })
270 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
271 cl.dialers = append(cl.dialers, s)
272 cl.listeners = append(cl.listeners, s)
273 if cl.config.AcceptPeerConnections {
274 go cl.acceptConnections(s)
281 for _, s := range sockets {
282 if pc, ok := s.(net.PacketConn); ok {
283 ds, err := cl.NewAnacrolixDhtServer(pc)
287 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
288 cl.onClose = append(cl.onClose, func() { ds.Close() })
293 cl.websocketTrackers = websocketTrackers{
296 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
299 t, ok := cl.torrents[infoHash]
301 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
303 return t.announceRequest(event), nil
305 Proxy: cl.config.HTTPProxy,
306 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
309 t, ok := cl.torrents[dcc.InfoHash]
311 cl.logger.WithDefaultLevel(log.Warning).Printf(
312 "got webrtc conn for unloaded torrent with infohash %x",
318 go t.onWebRtcConn(dc, dcc)
325 func (cl *Client) AddDhtServer(d DhtServer) {
326 cl.dhtServers = append(cl.dhtServers, d)
329 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
330 // given address for any Torrent.
331 func (cl *Client) AddDialer(d Dialer) {
334 cl.dialers = append(cl.dialers, d)
335 for _, t := range cl.torrents {
340 func (cl *Client) Listeners() []Listener {
344 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
346 func (cl *Client) AddListener(l Listener) {
347 cl.listeners = append(cl.listeners, l)
348 if cl.config.AcceptPeerConnections {
349 go cl.acceptConnections(l)
353 func (cl *Client) firewallCallback(net.Addr) bool {
355 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
358 torrent.Add("connections firewalled", 1)
360 torrent.Add("connections not firewalled", 1)
365 func (cl *Client) listenOnNetwork(n network) bool {
366 if n.Ipv4 && cl.config.DisableIPv4 {
369 if n.Ipv6 && cl.config.DisableIPv6 {
372 if n.Tcp && cl.config.DisableTCP {
375 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
381 func (cl *Client) listenNetworks() (ns []network) {
382 for _, n := range allPeerNetworks {
383 if cl.listenOnNetwork(n) {
390 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
391 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
392 cfg := dht.ServerConfig{
393 IPBlocklist: cl.ipBlockList,
395 OnAnnouncePeer: cl.onDHTAnnouncePeer,
396 PublicIP: func() net.IP {
397 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
398 return cl.config.PublicIp6
400 return cl.config.PublicIp4
402 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
403 OnQuery: cl.config.DHTOnQuery,
404 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
406 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
409 s, err = dht.NewServer(&cfg)
412 ts, err := s.Bootstrap()
414 cl.logger.Printf("error bootstrapping dht: %s", err)
416 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
422 func (cl *Client) Closed() events.Done {
423 return cl.closed.Done()
426 func (cl *Client) eachDhtServer(f func(DhtServer)) {
427 for _, ds := range cl.dhtServers {
432 // Stops the client. All connections to peers are closed and all activity will come to a halt.
433 func (cl *Client) Close() (errs []error) {
434 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
436 for _, t := range cl.torrents {
437 err := t.close(&closeGroup)
439 errs = append(errs, err)
442 for i := range cl.onClose {
443 cl.onClose[len(cl.onClose)-1-i]()
448 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
452 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
453 if cl.ipBlockList == nil {
456 return cl.ipBlockList.Lookup(ip)
459 func (cl *Client) ipIsBlocked(ip net.IP) bool {
460 _, blocked := cl.ipBlockRange(ip)
464 func (cl *Client) wantConns() bool {
465 if cl.config.AlwaysWantConns {
468 for _, t := range cl.torrents {
476 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
477 func (cl *Client) rejectAccepted(conn net.Conn) error {
479 return errors.New("don't want conns right now")
481 ra := conn.RemoteAddr()
482 if rip := addrIpOrNil(ra); rip != nil {
483 if cl.config.DisableIPv4Peers && rip.To4() != nil {
484 return errors.New("ipv4 peers disabled")
486 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
487 return errors.New("ipv4 disabled")
489 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
490 return errors.New("ipv6 disabled")
492 if cl.rateLimitAccept(rip) {
493 return errors.New("source IP accepted rate limited")
495 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
496 return errors.New("bad source addr")
502 func (cl *Client) acceptConnections(l Listener) {
504 conn, err := l.Accept()
505 torrent.Add("client listener accepts", 1)
506 conn = pproffd.WrapNetConn(conn)
508 closed := cl.closed.IsSet()
510 if !closed && conn != nil {
511 reject = cl.rejectAccepted(conn)
521 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
526 torrent.Add("rejected accepted connections", 1)
527 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
530 go cl.incomingConnection(conn)
532 log.Fmsg("accepted %q connection at %q from %q",
536 ).SetLevel(log.Debug).Log(cl.logger)
537 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
538 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
539 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
544 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
545 func regularNetConnPeerConnConnString(nc net.Conn) string {
546 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
549 func (cl *Client) incomingConnection(nc net.Conn) {
551 if tc, ok := nc.(*net.TCPConn); ok {
554 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
555 regularNetConnPeerConnConnString(nc))
561 c.Discovery = PeerSourceIncoming
562 cl.runReceivedConn(c)
565 // Returns a handle to the given torrent, if it's present in the client.
566 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
569 t, ok = cl.torrents[ih]
573 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
574 return cl.torrents[ih]
577 type DialResult struct {
582 func countDialResult(err error) {
584 torrent.Add("successful dials", 1)
586 torrent.Add("unsuccessful dials", 1)
590 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
591 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
592 if ret < minDialTimeout {
598 // Returns whether an address is known to connect to a client with our own ID.
599 func (cl *Client) dopplegangerAddr(addr string) bool {
600 _, ok := cl.dopplegangerAddrs[addr]
604 // Returns a connection over UTP or TCP, whichever is first to connect.
605 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
606 return DialFirst(ctx, addr, cl.dialers)
609 // Returns a connection over UTP or TCP, whichever is first to connect.
610 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
612 t := perf.NewTimer(perf.CallerName(0))
615 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
617 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
621 ctx, cancel := context.WithCancel(ctx)
622 // As soon as we return one connection, cancel the others.
625 resCh := make(chan DialResult, left)
626 for _, _s := range dialers {
631 dialFromSocket(ctx, s, addr),
636 // Wait for a successful connection.
638 defer perf.ScopeTimer()()
639 for ; left > 0 && res.Conn == nil; left-- {
643 // There are still incompleted dials.
645 for ; left > 0; left-- {
646 conn := (<-resCh).Conn
653 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
658 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
659 c, err := s.Dial(ctx, addr)
660 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
661 // it now in case we close the connection forthwith.
662 if tc, ok := c.(*net.TCPConn); ok {
669 func forgettableDialError(err error) bool {
670 return strings.Contains(err.Error(), "no suitable address found")
673 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
674 if _, ok := t.halfOpen[addr]; !ok {
675 panic("invariant broken")
677 delete(t.halfOpen, addr)
679 for _, t := range cl.torrents {
684 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
685 // for valid reasons.
686 func (cl *Client) initiateProtocolHandshakes(
690 outgoing, encryptHeader bool,
691 remoteAddr PeerRemoteAddr,
692 network, connString string,
694 c *PeerConn, err error,
696 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
697 c.headerEncrypted = encryptHeader
698 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
700 dl, ok := ctx.Deadline()
704 err = nc.SetDeadline(dl)
708 err = cl.initiateHandshakes(c, t)
712 // Returns nil connection and nil error if no connection could be established for valid reasons.
713 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
714 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
717 return t.dialTimeout()
720 dr := cl.dialFirst(dialCtx, addr.String())
723 if dialCtx.Err() != nil {
724 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
726 return nil, errors.New("dial failed")
728 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
735 // Returns nil connection and nil error if no connection could be established
736 // for valid reasons.
737 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
738 torrent.Add("establish outgoing connection", 1)
739 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
740 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
742 torrent.Add("initiated conn with preferred header obfuscation", 1)
745 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
746 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
747 // We should have just tried with the preferred header obfuscation. If it was required,
748 // there's nothing else to try.
751 // Try again with encryption if we didn't earlier, or without if we did.
752 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
754 torrent.Add("initiated conn with fallback header obfuscation", 1)
756 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
760 // Called to dial out and run a connection. The addr we're given is already
761 // considered half-open.
762 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
763 cl.dialRateLimiter.Wait(context.Background())
764 c, err := cl.establishOutgoingConn(t, addr)
766 c.conn.SetWriteDeadline(time.Time{})
770 // Don't release lock between here and addPeerConn, unless it's for
772 cl.noLongerHalfOpen(t, addr.String())
775 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
782 t.runHandshookConnLoggingErr(c)
785 // The port number for incoming peer connections. 0 if the client isn't listening.
786 func (cl *Client) incomingPeerPort() int {
787 return cl.LocalPort()
790 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
791 if c.headerEncrypted {
794 rw, c.cryptoMethod, err = mse.InitiateHandshake(
801 cl.config.CryptoProvides,
805 return fmt.Errorf("header obfuscation handshake: %w", err)
808 ih, err := cl.connBtHandshake(c, &t.infoHash)
810 return fmt.Errorf("bittorrent protocol handshake: %w", err)
812 if ih != t.infoHash {
813 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
818 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
819 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
820 func (cl *Client) forSkeys(f func([]byte) bool) {
823 if false { // Emulate the bug from #114
825 for ih := range cl.torrents {
829 for range cl.torrents {
836 for ih := range cl.torrents {
843 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
844 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
850 // Do encryption and bittorrent handshakes as receiver.
851 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
852 defer perf.ScopeTimerErr(&err)()
854 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
856 if err == nil || err == mse.ErrNoSecretKeyMatch {
857 if c.headerEncrypted {
858 torrent.Add("handshakes received encrypted", 1)
860 torrent.Add("handshakes received unencrypted", 1)
863 torrent.Add("handshakes received with error while handling encryption", 1)
866 if err == mse.ErrNoSecretKeyMatch {
871 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
872 err = errors.New("connection does not have required header obfuscation")
875 ih, err := cl.connBtHandshake(c, nil)
877 return nil, fmt.Errorf("during bt handshake: %w", err)
885 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
889 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
890 &successfulPeerWireProtocolHandshakePeerReservedBytes)
893 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
894 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
898 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
900 c.PeerExtensionBytes = res.PeerExtensionBits
901 c.PeerID = res.PeerID
902 c.completedHandshake = time.Now()
903 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
909 func (cl *Client) runReceivedConn(c *PeerConn) {
910 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
914 t, err := cl.receiveHandshakes(c)
917 "error receiving handshakes on %v: %s", c, err,
918 ).SetLevel(log.Debug).
920 "network", c.Network,
922 torrent.Add("error receiving handshake", 1)
924 cl.onBadAccept(c.RemoteAddr)
929 torrent.Add("received handshake for unloaded torrent", 1)
930 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
932 cl.onBadAccept(c.RemoteAddr)
936 torrent.Add("received handshake for loaded torrent", 1)
937 c.conn.SetWriteDeadline(time.Time{})
940 t.runHandshookConnLoggingErr(c)
943 // Client lock must be held before entering this.
944 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
946 for i, b := range cl.config.MinPeerExtensions {
947 if c.PeerExtensionBytes[i]&b != b {
948 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
951 if c.PeerID == cl.peerID {
954 addr := c.RemoteAddr.String()
955 cl.dopplegangerAddrs[addr] = struct{}{}
957 // Because the remote address is not necessarily the same as its client's torrent listen
958 // address, we won't record the remote address as a doppleganger. Instead, the initiator
959 // can record *us* as the doppleganger.
961 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
964 c.r = deadlineReader{c.conn, c.r}
965 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
966 if connIsIpv6(c.conn) {
967 torrent.Add("completed handshake over ipv6", 1)
969 if err := t.addPeerConn(c); err != nil {
970 return fmt.Errorf("adding connection: %w", err)
972 defer t.dropConnection(c)
974 cl.sendInitialMessages(c, t)
975 c.initUpdateRequestsTimer()
976 err := c.mainReadLoop()
978 return fmt.Errorf("main read loop: %w", err)
985 func (p *Peer) initUpdateRequestsTimer() {
987 if p.updateRequestsTimer != nil {
988 panic(p.updateRequestsTimer)
991 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
994 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
996 func (c *Peer) updateRequestsTimerFunc() {
998 defer c.locker().Unlock()
999 if c.closed.IsSet() {
1002 if c.isLowOnRequests() {
1003 // If there are no outstanding requests, then a request update should have already run.
1006 if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1007 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1008 // already been fired.
1009 torrent.Add("spurious timer requests updates", 1)
1012 c.updateRequests(peerUpdateRequestsTimerReason)
1015 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1016 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1017 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1018 const localClientReqq = 1 << 5
1020 // See the order given in Transmission's tr_peerMsgsNew.
1021 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1022 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1023 conn.write(pp.Message{
1025 ExtendedID: pp.HandshakeExtendedID,
1026 ExtendedPayload: func() []byte {
1027 msg := pp.ExtendedHandshakeMessage{
1028 M: map[pp.ExtensionName]pp.ExtensionNumber{
1029 pp.ExtensionNameMetadata: metadataExtendedId,
1031 V: cl.config.ExtendedHandshakeClientVersion,
1032 Reqq: localClientReqq,
1033 YourIp: pp.CompactIp(conn.remoteIp()),
1034 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1035 Port: cl.incomingPeerPort(),
1036 MetadataSize: torrent.metadataSize(),
1037 // TODO: We can figured these out specific to the socket
1039 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1040 Ipv6: cl.config.PublicIp6.To16(),
1042 if !cl.config.DisablePEX {
1043 msg.M[pp.ExtensionNamePex] = pexExtendedId
1045 return bencode.MustMarshal(msg)
1050 if conn.fastEnabled() {
1051 if torrent.haveAllPieces() {
1052 conn.write(pp.Message{Type: pp.HaveAll})
1053 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1055 } else if !torrent.haveAnyPieces() {
1056 conn.write(pp.Message{Type: pp.HaveNone})
1057 conn.sentHaves.Clear()
1063 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1064 conn.write(pp.Message{
1071 func (cl *Client) dhtPort() (ret uint16) {
1072 if len(cl.dhtServers) == 0 {
1075 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1078 func (cl *Client) haveDhtServer() bool {
1079 return len(cl.dhtServers) > 0
1082 // Process incoming ut_metadata message.
1083 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1084 var d pp.ExtendedMetadataRequestMsg
1085 err := bencode.Unmarshal(payload, &d)
1086 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1087 } else if err != nil {
1088 return fmt.Errorf("error unmarshalling bencode: %s", err)
1092 case pp.DataMetadataExtensionMsgType:
1093 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1094 if !c.requestedMetadataPiece(piece) {
1095 return fmt.Errorf("got unexpected piece %d", piece)
1097 c.metadataRequests[piece] = false
1098 begin := len(payload) - d.PieceSize()
1099 if begin < 0 || begin >= len(payload) {
1100 return fmt.Errorf("data has bad offset in payload: %d", begin)
1102 t.saveMetadataPiece(piece, payload[begin:])
1103 c.lastUsefulChunkReceived = time.Now()
1104 err = t.maybeCompleteMetadata()
1106 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1107 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1108 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1109 // log consumers can filter for this message.
1110 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1113 case pp.RequestMetadataExtensionMsgType:
1114 if !t.haveMetadataPiece(piece) {
1115 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1118 start := (1 << 14) * piece
1119 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1120 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1122 case pp.RejectMetadataExtensionMsgType:
1125 return errors.New("unknown msg_type value")
1129 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1130 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1131 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1136 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1140 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1143 if _, ok := cl.ipBlockRange(ip); ok {
1146 ipAddr, ok := netip.AddrFromSlice(ip)
1150 if _, ok := cl.badPeerIPs[ipAddr]; ok {
1156 // Return a Torrent ready for insertion into a Client.
1157 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1158 return cl.newTorrentOpt(AddTorrentOpts{
1160 Storage: specStorage,
1164 // Return a Torrent ready for insertion into a Client.
1165 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1166 // use provided storage, if provided
1167 storageClient := cl.defaultStorage
1168 if opts.Storage != nil {
1169 storageClient = storage.NewClient(opts.Storage)
1174 infoHash: opts.InfoHash,
1175 peers: prioritizedPeers{
1177 getPrio: func(p PeerInfo) peerPriority {
1179 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1182 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1184 halfOpen: make(map[string]PeerInfo),
1185 pieceStateChanges: pubsub.NewPubSub(),
1187 storageOpener: storageClient,
1188 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1190 metadataChanged: sync.Cond{
1193 webSeeds: make(map[string]*Peer),
1194 gotMetainfoC: make(chan struct{}),
1196 t.smartBanCache.Hash = sha1.Sum
1197 t.smartBanCache.Init()
1198 t.networkingEnabled.Set()
1199 t.logger = cl.logger.WithContextValue(t)
1200 if opts.ChunkSize == 0 {
1201 opts.ChunkSize = defaultChunkSize
1203 t.setChunkSize(opts.ChunkSize)
1207 // A file-like handle to some torrent data resource.
1208 type Handle interface {
1215 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1216 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1219 // Adds a torrent by InfoHash with a custom Storage implementation.
1220 // If the torrent already exists then this Storage is ignored and the
1221 // existing torrent returned with `new` set to `false`
1222 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1225 t, ok := cl.torrents[infoHash]
1231 t = cl.newTorrent(infoHash, specStorage)
1232 cl.eachDhtServer(func(s DhtServer) {
1233 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1234 go t.dhtAnnouncer(s)
1237 cl.torrents[infoHash] = t
1238 cl.clearAcceptLimits()
1239 t.updateWantPeersEvent()
1240 // Tickle Client.waitAccept, new torrent may want conns.
1241 cl.event.Broadcast()
1245 // Adds a torrent by InfoHash with a custom Storage implementation.
1246 // If the torrent already exists then this Storage is ignored and the
1247 // existing torrent returned with `new` set to `false`
1248 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1249 infoHash := opts.InfoHash
1252 t, ok := cl.torrents[infoHash]
1258 t = cl.newTorrentOpt(opts)
1259 cl.eachDhtServer(func(s DhtServer) {
1260 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1261 go t.dhtAnnouncer(s)
1264 cl.torrents[infoHash] = t
1265 cl.clearAcceptLimits()
1266 t.updateWantPeersEvent()
1267 // Tickle Client.waitAccept, new torrent may want conns.
1268 cl.event.Broadcast()
1272 type AddTorrentOpts struct {
1274 Storage storage.ClientImpl
1275 ChunkSize pp.Integer
1278 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1279 // Torrent.MergeSpec.
1280 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1281 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1282 InfoHash: spec.InfoHash,
1283 Storage: spec.Storage,
1284 ChunkSize: spec.ChunkSize,
1288 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1290 modSpec.ChunkSize = 0
1292 err = t.MergeSpec(&modSpec)
1293 if err != nil && new {
1299 type stringAddr string
1301 var _ net.Addr = stringAddr("")
1303 func (stringAddr) Network() string { return "" }
1304 func (me stringAddr) String() string { return string(me) }
1306 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1307 // spec.DisallowDataDownload/Upload will be read and applied
1308 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1309 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1310 if spec.DisplayName != "" {
1311 t.SetDisplayName(spec.DisplayName)
1313 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1314 if spec.InfoBytes != nil {
1315 err := t.SetInfoBytes(spec.InfoBytes)
1321 cl.AddDhtNodes(spec.DhtNodes)
1324 useTorrentSources(spec.Sources, t)
1325 for _, url := range spec.Webseeds {
1328 for _, peerAddr := range spec.PeerAddrs {
1330 Addr: stringAddr(peerAddr),
1331 Source: PeerSourceDirect,
1335 if spec.ChunkSize != 0 {
1336 panic("chunk size cannot be changed for existing Torrent")
1338 t.addTrackers(spec.Trackers)
1340 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1341 t.dataUploadDisallowed = spec.DisallowDataUpload
1345 func useTorrentSources(sources []string, t *Torrent) {
1346 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1347 ctx := context.Background()
1348 for i := 0; i < len(sources); i += 1 {
1351 if err := useTorrentSource(ctx, s, t); err != nil {
1352 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1354 t.logger.Printf("successfully used source %q", s)
1360 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1361 ctx, cancel := context.WithCancel(ctx)
1371 var req *http.Request
1372 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1375 var resp *http.Response
1376 if resp, err = http.DefaultClient.Do(req); err != nil {
1379 var mi metainfo.MetaInfo
1380 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1383 if ctx.Err() != nil {
1388 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1391 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1392 t, ok := cl.torrents[infoHash]
1394 err = fmt.Errorf("no such torrent")
1401 delete(cl.torrents, infoHash)
1405 func (cl *Client) allTorrentsCompleted() bool {
1406 for _, t := range cl.torrents {
1410 if !t.haveAllPieces() {
1417 // Returns true when all torrents are completely downloaded and false if the
1418 // client is stopped before that.
1419 func (cl *Client) WaitAll() bool {
1422 for !cl.allTorrentsCompleted() {
1423 if cl.closed.IsSet() {
1431 // Returns handles to all the torrents loaded in the Client.
1432 func (cl *Client) Torrents() []*Torrent {
1435 return cl.torrentsAsSlice()
1438 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1439 for _, t := range cl.torrents {
1440 ret = append(ret, t)
1445 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1446 spec, err := TorrentSpecFromMagnetUri(uri)
1450 T, _, err = cl.AddTorrentSpec(spec)
1454 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1455 ts, err := TorrentSpecFromMetaInfoErr(mi)
1459 T, _, err = cl.AddTorrentSpec(ts)
1463 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1464 mi, err := metainfo.LoadFromFile(filename)
1468 return cl.AddTorrent(mi)
1471 func (cl *Client) DhtServers() []DhtServer {
1472 return cl.dhtServers
1475 func (cl *Client) AddDhtNodes(nodes []string) {
1476 for _, n := range nodes {
1477 hmp := missinggo.SplitHostMaybePort(n)
1478 ip := net.ParseIP(hmp.Host)
1480 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1483 ni := krpc.NodeInfo{
1484 Addr: krpc.NodeAddr{
1489 cl.eachDhtServer(func(s DhtServer) {
1495 func (cl *Client) banPeerIP(ip net.IP) {
1496 generics.MakeMapIfNilAndSet(&cl.badPeerIPs, netip.MustParseAddr(ip.String()), struct{}{})
1499 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1508 PeerMaxRequests: 250,
1510 RemoteAddr: remoteAddr,
1512 callbacks: &cl.config.Callbacks,
1514 connString: connString,
1517 // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
1518 if remoteAddr != nil {
1519 netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String())
1521 c.bannableAddr = option.Some(netipAddrPort.Addr())
1525 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1526 c.setRW(connStatsReadWriter{nc, c})
1527 c.r = &rateLimitedReader{
1528 l: cl.config.DownloadRateLimiter,
1531 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1532 for _, f := range cl.config.Callbacks.NewPeer {
1538 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1545 t.addPeers([]PeerInfo{{
1546 Addr: ipPortAddr{ip, port},
1547 Source: PeerSourceDhtAnnouncePeer,
1551 func firstNotNil(ips ...net.IP) net.IP {
1552 for _, ip := range ips {
1560 func (cl *Client) eachListener(f func(Listener) bool) {
1561 for _, s := range cl.listeners {
1568 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1569 for i := 0; i < len(cl.listeners); i += 1 {
1570 if ret = cl.listeners[i]; f(ret) {
1577 func (cl *Client) publicIp(peer net.IP) net.IP {
1578 // TODO: Use BEP 10 to determine how peers are seeing us.
1579 if peer.To4() != nil {
1581 cl.config.PublicIp4,
1582 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1587 cl.config.PublicIp6,
1588 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1592 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1593 l := cl.findListener(
1594 func(l Listener) bool {
1595 return f(addrIpOrNil(l.Addr()))
1601 return addrIpOrNil(l.Addr())
1604 // Our IP as a peer should see it.
1605 func (cl *Client) publicAddr(peer net.IP) IpPort {
1606 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1609 // ListenAddrs addresses currently being listened to.
1610 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1612 ret = make([]net.Addr, len(cl.listeners))
1613 for i := 0; i < len(cl.listeners); i += 1 {
1614 ret[i] = cl.listeners[i].Addr()
1620 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1621 ipa, ok := tryIpPortFromNetAddr(addr)
1625 ip := maskIpForAcceptLimiting(ipa.IP)
1626 if cl.acceptLimiter == nil {
1627 cl.acceptLimiter = make(map[ipStr]int)
1629 cl.acceptLimiter[ipStr(ip.String())]++
1632 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1633 if ip4 := ip.To4(); ip4 != nil {
1634 return ip4.Mask(net.CIDRMask(24, 32))
1639 func (cl *Client) clearAcceptLimits() {
1640 cl.acceptLimiter = nil
1643 func (cl *Client) acceptLimitClearer() {
1646 case <-cl.closed.Done():
1648 case <-time.After(15 * time.Minute):
1650 cl.clearAcceptLimits()
1656 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1657 if cl.config.DisableAcceptRateLimiting {
1660 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1663 func (cl *Client) rLock() {
1667 func (cl *Client) rUnlock() {
1671 func (cl *Client) lock() {
1675 func (cl *Client) unlock() {
1679 func (cl *Client) locker() *lockWithDeferreds {
1683 func (cl *Client) String() string {
1684 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1687 // Returns connection-level aggregate stats at the Client level. See the comment on
1688 // TorrentStats.ConnStats.
1689 func (cl *Client) ConnStats() ConnStats {
1690 return cl.stats.Copy()