20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 request_strategy "github.com/anacrolix/torrent/request-strategy"
31 "github.com/davecgh/go-spew/spew"
32 "github.com/dustin/go-humanize"
33 "github.com/google/btree"
34 "github.com/pion/datachannel"
35 "golang.org/x/time/rate"
37 "github.com/anacrolix/chansync"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/internal/limiter"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
46 "github.com/anacrolix/torrent/tracker"
47 "github.com/anacrolix/torrent/webtorrent"
50 // Clients contain zero or more Torrents. A Client manages a blocklist, the
51 // TCP/UDP protocol ports, and DHT as desired.
53 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59 closed chansync.SetOnce
65 defaultStorage *storage.Client
69 dhtServers []DhtServer
70 ipBlockList iplist.Ranger
72 // Set of addresses that have our client ID. This intentionally will
73 // include ourselves if we end up trying to connect to our own address
74 // through legitimate channels.
75 dopplegangerAddrs map[string]struct{}
76 badPeerIPs map[string]struct{}
77 torrents map[InfoHash]*Torrent
78 pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
80 acceptLimiter map[ipStr]int
81 dialRateLimiter *rate.Limiter
84 websocketTrackers websocketTrackers
86 activeAnnounceLimiter limiter.Instance
87 webseedHttpClient *http.Client
92 func (cl *Client) BadPeerIPs() (ips []string) {
94 ips = cl.badPeerIPsLocked()
99 func (cl *Client) badPeerIPsLocked() (ips []string) {
100 ips = make([]string, len(cl.badPeerIPs))
102 for k := range cl.badPeerIPs {
109 func (cl *Client) PeerID() PeerID {
113 // Returns the port number for the first listener that has one. No longer assumes that all port
114 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
116 func (cl *Client) LocalPort() (port int) {
117 for i := 0; i < len(cl.listeners); i += 1 {
118 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
125 func writeDhtServerStatus(w io.Writer, s DhtServer) {
126 dhtStats := s.Stats()
127 fmt.Fprintf(w, " ID: %x\n", s.ID())
128 spew.Fdump(w, dhtStats)
131 // Writes out a human readable status of the client, such as for writing to a
133 func (cl *Client) WriteStatus(_w io.Writer) {
136 w := bufio.NewWriter(_w)
138 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
139 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
140 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
141 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
142 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
143 cl.eachDhtServer(func(s DhtServer) {
144 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
145 writeDhtServerStatus(w, s)
147 spew.Fdump(w, &cl.stats)
148 torrentsSlice := cl.torrentsAsSlice()
149 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
151 sort.Slice(torrentsSlice, func(l, r int) bool {
152 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
154 for _, t := range torrentsSlice {
156 fmt.Fprint(w, "<unknown name>")
158 fmt.Fprint(w, t.name())
164 "%f%% of %d bytes (%s)",
165 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
167 humanize.Bytes(uint64(*t.length)))
169 w.WriteString("<missing metainfo>")
177 // Filters things that are less than warning from UPnP discovery.
178 func upnpDiscoverLogFilter(m log.Msg) bool {
179 level, ok := m.GetLevel()
180 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
183 func (cl *Client) initLogger() {
184 logger := cl.config.Logger
187 if !cl.config.Debug {
188 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
191 cl.logger = logger.WithValues(cl)
194 func (cl *Client) announceKey() int32 {
195 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
198 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
199 func (cl *Client) init(cfg *ClientConfig) {
201 cl.dopplegangerAddrs = make(map[string]struct{})
202 cl.torrents = make(map[metainfo.Hash]*Torrent)
203 cl.dialRateLimiter = rate.NewLimiter(10, 10)
204 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
207 cl.webseedHttpClient = &http.Client{
208 Transport: &http.Transport{
209 Proxy: cfg.HTTPProxy,
215 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
217 cfg = NewDefaultClientConfig()
223 go cl.acceptLimitClearer()
232 storageImpl := cfg.DefaultStorage
233 if storageImpl == nil {
234 // We'd use mmap by default but HFS+ doesn't support sparse files.
235 storageImplCloser := storage.NewFile(cfg.DataDir)
236 cl.onClose = append(cl.onClose, func() {
237 if err := storageImplCloser.Close(); err != nil {
238 cl.logger.Printf("error closing default storage: %s", err)
241 storageImpl = storageImplCloser
243 cl.defaultStorage = storage.NewClient(storageImpl)
245 if cfg.PeerID != "" {
246 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
248 o := copy(cl.peerID[:], cfg.Bep20)
249 _, err = rand.Read(cl.peerID[o:])
251 panic("error generating peer id")
255 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
263 for _, _s := range sockets {
264 s := _s // Go is fucking retarded.
265 cl.onClose = append(cl.onClose, func() { go s.Close() })
266 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
267 cl.dialers = append(cl.dialers, s)
268 cl.listeners = append(cl.listeners, s)
269 if cl.config.AcceptPeerConnections {
270 go cl.acceptConnections(s)
277 for _, s := range sockets {
278 if pc, ok := s.(net.PacketConn); ok {
279 ds, err := cl.NewAnacrolixDhtServer(pc)
283 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
284 cl.onClose = append(cl.onClose, func() { ds.Close() })
289 cl.websocketTrackers = websocketTrackers{
292 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
295 t, ok := cl.torrents[infoHash]
297 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
299 return t.announceRequest(event), nil
301 Proxy: cl.config.HTTPProxy,
302 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
305 t, ok := cl.torrents[dcc.InfoHash]
307 cl.logger.WithDefaultLevel(log.Warning).Printf(
308 "got webrtc conn for unloaded torrent with infohash %x",
314 go t.onWebRtcConn(dc, dcc)
321 func (cl *Client) AddDhtServer(d DhtServer) {
322 cl.dhtServers = append(cl.dhtServers, d)
325 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
326 // given address for any Torrent.
327 func (cl *Client) AddDialer(d Dialer) {
330 cl.dialers = append(cl.dialers, d)
331 for _, t := range cl.torrents {
336 func (cl *Client) Listeners() []Listener {
340 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
342 func (cl *Client) AddListener(l Listener) {
343 cl.listeners = append(cl.listeners, l)
344 if cl.config.AcceptPeerConnections {
345 go cl.acceptConnections(l)
349 func (cl *Client) firewallCallback(net.Addr) bool {
351 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
354 torrent.Add("connections firewalled", 1)
356 torrent.Add("connections not firewalled", 1)
361 func (cl *Client) listenOnNetwork(n network) bool {
362 if n.Ipv4 && cl.config.DisableIPv4 {
365 if n.Ipv6 && cl.config.DisableIPv6 {
368 if n.Tcp && cl.config.DisableTCP {
371 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
377 func (cl *Client) listenNetworks() (ns []network) {
378 for _, n := range allPeerNetworks {
379 if cl.listenOnNetwork(n) {
386 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
387 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
388 cfg := dht.ServerConfig{
389 IPBlocklist: cl.ipBlockList,
391 OnAnnouncePeer: cl.onDHTAnnouncePeer,
392 PublicIP: func() net.IP {
393 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
394 return cl.config.PublicIp6
396 return cl.config.PublicIp4
398 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
399 OnQuery: cl.config.DHTOnQuery,
400 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
402 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
405 s, err = dht.NewServer(&cfg)
408 ts, err := s.Bootstrap()
410 cl.logger.Printf("error bootstrapping dht: %s", err)
412 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
418 func (cl *Client) Closed() events.Done {
419 return cl.closed.Done()
422 func (cl *Client) eachDhtServer(f func(DhtServer)) {
423 for _, ds := range cl.dhtServers {
428 // Stops the client. All connections to peers are closed and all activity will come to a halt.
429 func (cl *Client) Close() (errs []error) {
430 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
432 for _, t := range cl.torrents {
433 err := t.close(&closeGroup)
435 errs = append(errs, err)
438 for i := range cl.onClose {
439 cl.onClose[len(cl.onClose)-1-i]()
444 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
448 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
449 if cl.ipBlockList == nil {
452 return cl.ipBlockList.Lookup(ip)
455 func (cl *Client) ipIsBlocked(ip net.IP) bool {
456 _, blocked := cl.ipBlockRange(ip)
460 func (cl *Client) wantConns() bool {
461 if cl.config.AlwaysWantConns {
464 for _, t := range cl.torrents {
472 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
473 func (cl *Client) rejectAccepted(conn net.Conn) error {
475 return errors.New("don't want conns right now")
477 ra := conn.RemoteAddr()
478 if rip := addrIpOrNil(ra); rip != nil {
479 if cl.config.DisableIPv4Peers && rip.To4() != nil {
480 return errors.New("ipv4 peers disabled")
482 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
483 return errors.New("ipv4 disabled")
485 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
486 return errors.New("ipv6 disabled")
488 if cl.rateLimitAccept(rip) {
489 return errors.New("source IP accepted rate limited")
491 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
492 return errors.New("bad source addr")
498 func (cl *Client) acceptConnections(l Listener) {
500 conn, err := l.Accept()
501 torrent.Add("client listener accepts", 1)
502 conn = pproffd.WrapNetConn(conn)
504 closed := cl.closed.IsSet()
506 if !closed && conn != nil {
507 reject = cl.rejectAccepted(conn)
517 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
522 torrent.Add("rejected accepted connections", 1)
523 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
526 go cl.incomingConnection(conn)
528 log.Fmsg("accepted %q connection at %q from %q",
532 ).SetLevel(log.Debug).Log(cl.logger)
533 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
534 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
535 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
540 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
541 func regularNetConnPeerConnConnString(nc net.Conn) string {
542 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
545 func (cl *Client) incomingConnection(nc net.Conn) {
547 if tc, ok := nc.(*net.TCPConn); ok {
550 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
551 regularNetConnPeerConnConnString(nc))
557 c.Discovery = PeerSourceIncoming
558 cl.runReceivedConn(c)
561 // Returns a handle to the given torrent, if it's present in the client.
562 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
565 t, ok = cl.torrents[ih]
569 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
570 return cl.torrents[ih]
573 type DialResult struct {
578 func countDialResult(err error) {
580 torrent.Add("successful dials", 1)
582 torrent.Add("unsuccessful dials", 1)
586 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
587 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
588 if ret < minDialTimeout {
594 // Returns whether an address is known to connect to a client with our own ID.
595 func (cl *Client) dopplegangerAddr(addr string) bool {
596 _, ok := cl.dopplegangerAddrs[addr]
600 // Returns a connection over UTP or TCP, whichever is first to connect.
601 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
602 return DialFirst(ctx, addr, cl.dialers)
605 // Returns a connection over UTP or TCP, whichever is first to connect.
606 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
608 t := perf.NewTimer(perf.CallerName(0))
611 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
613 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
617 ctx, cancel := context.WithCancel(ctx)
618 // As soon as we return one connection, cancel the others.
621 resCh := make(chan DialResult, left)
622 for _, _s := range dialers {
627 dialFromSocket(ctx, s, addr),
632 // Wait for a successful connection.
634 defer perf.ScopeTimer()()
635 for ; left > 0 && res.Conn == nil; left-- {
639 // There are still incompleted dials.
641 for ; left > 0; left-- {
642 conn := (<-resCh).Conn
649 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
654 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
655 c, err := s.Dial(ctx, addr)
656 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
657 // it now in case we close the connection forthwith.
658 if tc, ok := c.(*net.TCPConn); ok {
665 func forgettableDialError(err error) bool {
666 return strings.Contains(err.Error(), "no suitable address found")
669 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
670 if _, ok := t.halfOpen[addr]; !ok {
671 panic("invariant broken")
673 delete(t.halfOpen, addr)
675 for _, t := range cl.torrents {
680 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
681 // for valid reasons.
682 func (cl *Client) initiateProtocolHandshakes(
686 outgoing, encryptHeader bool,
687 remoteAddr PeerRemoteAddr,
688 network, connString string,
690 c *PeerConn, err error,
692 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
693 c.headerEncrypted = encryptHeader
694 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
696 dl, ok := ctx.Deadline()
700 err = nc.SetDeadline(dl)
704 err = cl.initiateHandshakes(c, t)
708 // Returns nil connection and nil error if no connection could be established for valid reasons.
709 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
710 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
713 return t.dialTimeout()
716 dr := cl.dialFirst(dialCtx, addr.String())
719 if dialCtx.Err() != nil {
720 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
722 return nil, errors.New("dial failed")
724 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
731 // Returns nil connection and nil error if no connection could be established
732 // for valid reasons.
733 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
734 torrent.Add("establish outgoing connection", 1)
735 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
736 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
738 torrent.Add("initiated conn with preferred header obfuscation", 1)
741 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
742 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
743 // We should have just tried with the preferred header obfuscation. If it was required,
744 // there's nothing else to try.
747 // Try again with encryption if we didn't earlier, or without if we did.
748 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
750 torrent.Add("initiated conn with fallback header obfuscation", 1)
752 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
756 // Called to dial out and run a connection. The addr we're given is already
757 // considered half-open.
758 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
759 cl.dialRateLimiter.Wait(context.Background())
760 c, err := cl.establishOutgoingConn(t, addr)
762 c.conn.SetWriteDeadline(time.Time{})
766 // Don't release lock between here and addPeerConn, unless it's for
768 cl.noLongerHalfOpen(t, addr.String())
771 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
778 t.runHandshookConnLoggingErr(c)
781 // The port number for incoming peer connections. 0 if the client isn't listening.
782 func (cl *Client) incomingPeerPort() int {
783 return cl.LocalPort()
786 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
787 if c.headerEncrypted {
790 rw, c.cryptoMethod, err = mse.InitiateHandshake(
797 cl.config.CryptoProvides,
801 return fmt.Errorf("header obfuscation handshake: %w", err)
804 ih, err := cl.connBtHandshake(c, &t.infoHash)
806 return fmt.Errorf("bittorrent protocol handshake: %w", err)
808 if ih != t.infoHash {
809 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
814 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
815 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
816 func (cl *Client) forSkeys(f func([]byte) bool) {
819 if false { // Emulate the bug from #114
821 for ih := range cl.torrents {
825 for range cl.torrents {
832 for ih := range cl.torrents {
839 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
840 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
846 // Do encryption and bittorrent handshakes as receiver.
847 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
848 defer perf.ScopeTimerErr(&err)()
850 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
852 if err == nil || err == mse.ErrNoSecretKeyMatch {
853 if c.headerEncrypted {
854 torrent.Add("handshakes received encrypted", 1)
856 torrent.Add("handshakes received unencrypted", 1)
859 torrent.Add("handshakes received with error while handling encryption", 1)
862 if err == mse.ErrNoSecretKeyMatch {
867 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
868 err = errors.New("connection does not have required header obfuscation")
871 ih, err := cl.connBtHandshake(c, nil)
873 return nil, fmt.Errorf("during bt handshake: %w", err)
881 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
885 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
886 &successfulPeerWireProtocolHandshakePeerReservedBytes)
889 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
890 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
894 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
896 c.PeerExtensionBytes = res.PeerExtensionBits
897 c.PeerID = res.PeerID
898 c.completedHandshake = time.Now()
899 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
905 func (cl *Client) runReceivedConn(c *PeerConn) {
906 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
910 t, err := cl.receiveHandshakes(c)
913 "error receiving handshakes on %v: %s", c, err,
914 ).SetLevel(log.Debug).
916 "network", c.Network,
918 torrent.Add("error receiving handshake", 1)
920 cl.onBadAccept(c.RemoteAddr)
925 torrent.Add("received handshake for unloaded torrent", 1)
926 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
928 cl.onBadAccept(c.RemoteAddr)
932 torrent.Add("received handshake for loaded torrent", 1)
933 c.conn.SetWriteDeadline(time.Time{})
936 t.runHandshookConnLoggingErr(c)
939 // Client lock must be held before entering this.
940 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
942 for i, b := range cl.config.MinPeerExtensions {
943 if c.PeerExtensionBytes[i]&b != b {
944 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
947 if c.PeerID == cl.peerID {
950 addr := c.RemoteAddr.String()
951 cl.dopplegangerAddrs[addr] = struct{}{}
953 // Because the remote address is not necessarily the same as its client's torrent listen
954 // address, we won't record the remote address as a doppleganger. Instead, the initiator
955 // can record *us* as the doppleganger.
957 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
960 c.r = deadlineReader{c.conn, c.r}
961 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
962 if connIsIpv6(c.conn) {
963 torrent.Add("completed handshake over ipv6", 1)
965 if err := t.addPeerConn(c); err != nil {
966 return fmt.Errorf("adding connection: %w", err)
968 defer t.dropConnection(c)
970 cl.sendInitialMessages(c, t)
971 c.initUpdateRequestsTimer()
972 err := c.mainReadLoop()
974 return fmt.Errorf("main read loop: %w", err)
981 func (p *Peer) initUpdateRequestsTimer() {
983 if p.updateRequestsTimer != nil {
984 panic(p.updateRequestsTimer)
987 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
990 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
992 func (c *Peer) updateRequestsTimerFunc() {
994 defer c.locker().Unlock()
995 if c.closed.IsSet() {
998 if c.isLowOnRequests() {
999 // If there are no outstanding requests, then a request update should have already run.
1002 if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1003 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1004 // already been fired.
1005 torrent.Add("spurious timer requests updates", 1)
1008 c.updateRequests(peerUpdateRequestsTimerReason)
1011 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1012 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1013 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1014 const localClientReqq = 1 << 5
1016 // See the order given in Transmission's tr_peerMsgsNew.
1017 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1018 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1019 conn.write(pp.Message{
1021 ExtendedID: pp.HandshakeExtendedID,
1022 ExtendedPayload: func() []byte {
1023 msg := pp.ExtendedHandshakeMessage{
1024 M: map[pp.ExtensionName]pp.ExtensionNumber{
1025 pp.ExtensionNameMetadata: metadataExtendedId,
1027 V: cl.config.ExtendedHandshakeClientVersion,
1028 Reqq: localClientReqq,
1029 YourIp: pp.CompactIp(conn.remoteIp()),
1030 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1031 Port: cl.incomingPeerPort(),
1032 MetadataSize: torrent.metadataSize(),
1033 // TODO: We can figured these out specific to the socket
1035 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1036 Ipv6: cl.config.PublicIp6.To16(),
1038 if !cl.config.DisablePEX {
1039 msg.M[pp.ExtensionNamePex] = pexExtendedId
1041 return bencode.MustMarshal(msg)
1046 if conn.fastEnabled() {
1047 if torrent.haveAllPieces() {
1048 conn.write(pp.Message{Type: pp.HaveAll})
1049 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1051 } else if !torrent.haveAnyPieces() {
1052 conn.write(pp.Message{Type: pp.HaveNone})
1053 conn.sentHaves.Clear()
1059 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1060 conn.write(pp.Message{
1067 func (cl *Client) dhtPort() (ret uint16) {
1068 if len(cl.dhtServers) == 0 {
1071 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1074 func (cl *Client) haveDhtServer() bool {
1075 return len(cl.dhtServers) > 0
1078 // Process incoming ut_metadata message.
1079 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1080 var d pp.ExtendedMetadataRequestMsg
1081 err := bencode.Unmarshal(payload, &d)
1082 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1083 } else if err != nil {
1084 return fmt.Errorf("error unmarshalling bencode: %s", err)
1088 case pp.DataMetadataExtensionMsgType:
1089 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1090 if !c.requestedMetadataPiece(piece) {
1091 return fmt.Errorf("got unexpected piece %d", piece)
1093 c.metadataRequests[piece] = false
1094 begin := len(payload) - d.PieceSize()
1095 if begin < 0 || begin >= len(payload) {
1096 return fmt.Errorf("data has bad offset in payload: %d", begin)
1098 t.saveMetadataPiece(piece, payload[begin:])
1099 c.lastUsefulChunkReceived = time.Now()
1100 err = t.maybeCompleteMetadata()
1102 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1103 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1104 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1105 // log consumers can filter for this message.
1106 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1109 case pp.RequestMetadataExtensionMsgType:
1110 if !t.haveMetadataPiece(piece) {
1111 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1114 start := (1 << 14) * piece
1115 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1116 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1118 case pp.RejectMetadataExtensionMsgType:
1121 return errors.New("unknown msg_type value")
1125 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1126 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1127 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1132 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1136 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1139 if _, ok := cl.ipBlockRange(ip); ok {
1142 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1148 // Return a Torrent ready for insertion into a Client.
1149 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1150 return cl.newTorrentOpt(AddTorrentOpts{
1152 Storage: specStorage,
1156 // Return a Torrent ready for insertion into a Client.
1157 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1158 // use provided storage, if provided
1159 storageClient := cl.defaultStorage
1160 if opts.Storage != nil {
1161 storageClient = storage.NewClient(opts.Storage)
1166 infoHash: opts.InfoHash,
1167 peers: prioritizedPeers{
1169 getPrio: func(p PeerInfo) peerPriority {
1171 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1174 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1176 halfOpen: make(map[string]PeerInfo),
1177 pieceStateChanges: pubsub.NewPubSub(),
1179 storageOpener: storageClient,
1180 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1182 metadataChanged: sync.Cond{
1185 webSeeds: make(map[string]*Peer),
1186 gotMetainfoC: make(chan struct{}),
1188 t.networkingEnabled.Set()
1189 t.logger = cl.logger.WithContextValue(t)
1190 if opts.ChunkSize == 0 {
1191 opts.ChunkSize = defaultChunkSize
1193 t.setChunkSize(opts.ChunkSize)
1197 // A file-like handle to some torrent data resource.
1198 type Handle interface {
1205 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1206 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1209 // Adds a torrent by InfoHash with a custom Storage implementation.
1210 // If the torrent already exists then this Storage is ignored and the
1211 // existing torrent returned with `new` set to `false`
1212 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1215 t, ok := cl.torrents[infoHash]
1221 t = cl.newTorrent(infoHash, specStorage)
1222 cl.eachDhtServer(func(s DhtServer) {
1223 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1224 go t.dhtAnnouncer(s)
1227 cl.torrents[infoHash] = t
1228 cl.clearAcceptLimits()
1229 t.updateWantPeersEvent()
1230 // Tickle Client.waitAccept, new torrent may want conns.
1231 cl.event.Broadcast()
1235 // Adds a torrent by InfoHash with a custom Storage implementation.
1236 // If the torrent already exists then this Storage is ignored and the
1237 // existing torrent returned with `new` set to `false`
1238 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1239 infoHash := opts.InfoHash
1242 t, ok := cl.torrents[infoHash]
1248 t = cl.newTorrentOpt(opts)
1249 cl.eachDhtServer(func(s DhtServer) {
1250 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1251 go t.dhtAnnouncer(s)
1254 cl.torrents[infoHash] = t
1255 cl.clearAcceptLimits()
1256 t.updateWantPeersEvent()
1257 // Tickle Client.waitAccept, new torrent may want conns.
1258 cl.event.Broadcast()
1262 type AddTorrentOpts struct {
1264 Storage storage.ClientImpl
1265 ChunkSize pp.Integer
1268 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1269 // Torrent.MergeSpec.
1270 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1271 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1272 InfoHash: spec.InfoHash,
1273 Storage: spec.Storage,
1274 ChunkSize: spec.ChunkSize,
1278 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1280 modSpec.ChunkSize = 0
1282 err = t.MergeSpec(&modSpec)
1283 if err != nil && new {
1289 type stringAddr string
1291 var _ net.Addr = stringAddr("")
1293 func (stringAddr) Network() string { return "" }
1294 func (me stringAddr) String() string { return string(me) }
1296 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1297 // spec.DisallowDataDownload/Upload will be read and applied
1298 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1299 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1300 if spec.DisplayName != "" {
1301 t.SetDisplayName(spec.DisplayName)
1303 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1304 if spec.InfoBytes != nil {
1305 err := t.SetInfoBytes(spec.InfoBytes)
1311 cl.AddDhtNodes(spec.DhtNodes)
1314 useTorrentSources(spec.Sources, t)
1315 for _, url := range spec.Webseeds {
1318 for _, peerAddr := range spec.PeerAddrs {
1320 Addr: stringAddr(peerAddr),
1321 Source: PeerSourceDirect,
1325 if spec.ChunkSize != 0 {
1326 panic("chunk size cannot be changed for existing Torrent")
1328 t.addTrackers(spec.Trackers)
1330 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1331 t.dataUploadDisallowed = spec.DisallowDataUpload
1335 func useTorrentSources(sources []string, t *Torrent) {
1336 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1337 ctx := context.Background()
1338 for i := 0; i < len(sources); i += 1 {
1341 if err := useTorrentSource(ctx, s, t); err != nil {
1342 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1344 t.logger.Printf("successfully used source %q", s)
1350 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1351 ctx, cancel := context.WithCancel(ctx)
1361 var req *http.Request
1362 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1365 var resp *http.Response
1366 if resp, err = http.DefaultClient.Do(req); err != nil {
1369 var mi metainfo.MetaInfo
1370 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1373 if ctx.Err() != nil {
1378 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1381 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1382 t, ok := cl.torrents[infoHash]
1384 err = fmt.Errorf("no such torrent")
1391 delete(cl.torrents, infoHash)
1395 func (cl *Client) allTorrentsCompleted() bool {
1396 for _, t := range cl.torrents {
1400 if !t.haveAllPieces() {
1407 // Returns true when all torrents are completely downloaded and false if the
1408 // client is stopped before that.
1409 func (cl *Client) WaitAll() bool {
1412 for !cl.allTorrentsCompleted() {
1413 if cl.closed.IsSet() {
1421 // Returns handles to all the torrents loaded in the Client.
1422 func (cl *Client) Torrents() []*Torrent {
1425 return cl.torrentsAsSlice()
1428 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1429 for _, t := range cl.torrents {
1430 ret = append(ret, t)
1435 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1436 spec, err := TorrentSpecFromMagnetUri(uri)
1440 T, _, err = cl.AddTorrentSpec(spec)
1444 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1445 ts, err := TorrentSpecFromMetaInfoErr(mi)
1449 T, _, err = cl.AddTorrentSpec(ts)
1453 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1454 mi, err := metainfo.LoadFromFile(filename)
1458 return cl.AddTorrent(mi)
1461 func (cl *Client) DhtServers() []DhtServer {
1462 return cl.dhtServers
1465 func (cl *Client) AddDhtNodes(nodes []string) {
1466 for _, n := range nodes {
1467 hmp := missinggo.SplitHostMaybePort(n)
1468 ip := net.ParseIP(hmp.Host)
1470 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1473 ni := krpc.NodeInfo{
1474 Addr: krpc.NodeAddr{
1479 cl.eachDhtServer(func(s DhtServer) {
1485 func (cl *Client) banPeerIP(ip net.IP) {
1486 cl.logger.Printf("banning ip %v", ip)
1487 if cl.badPeerIPs == nil {
1488 cl.badPeerIPs = make(map[string]struct{})
1490 cl.badPeerIPs[ip.String()] = struct{}{}
1493 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1502 PeerMaxRequests: 250,
1504 RemoteAddr: remoteAddr,
1506 callbacks: &cl.config.Callbacks,
1508 connString: connString,
1512 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1513 c.setRW(connStatsReadWriter{nc, c})
1514 c.r = &rateLimitedReader{
1515 l: cl.config.DownloadRateLimiter,
1518 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1519 for _, f := range cl.config.Callbacks.NewPeer {
1525 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1532 t.addPeers([]PeerInfo{{
1533 Addr: ipPortAddr{ip, port},
1534 Source: PeerSourceDhtAnnouncePeer,
1538 func firstNotNil(ips ...net.IP) net.IP {
1539 for _, ip := range ips {
1547 func (cl *Client) eachListener(f func(Listener) bool) {
1548 for _, s := range cl.listeners {
1555 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1556 for i := 0; i < len(cl.listeners); i += 1 {
1557 if ret = cl.listeners[i]; f(ret) {
1564 func (cl *Client) publicIp(peer net.IP) net.IP {
1565 // TODO: Use BEP 10 to determine how peers are seeing us.
1566 if peer.To4() != nil {
1568 cl.config.PublicIp4,
1569 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1574 cl.config.PublicIp6,
1575 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1579 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1580 l := cl.findListener(
1581 func(l Listener) bool {
1582 return f(addrIpOrNil(l.Addr()))
1588 return addrIpOrNil(l.Addr())
1591 // Our IP as a peer should see it.
1592 func (cl *Client) publicAddr(peer net.IP) IpPort {
1593 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1596 // ListenAddrs addresses currently being listened to.
1597 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1599 ret = make([]net.Addr, len(cl.listeners))
1600 for i := 0; i < len(cl.listeners); i += 1 {
1601 ret[i] = cl.listeners[i].Addr()
1607 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1608 ipa, ok := tryIpPortFromNetAddr(addr)
1612 ip := maskIpForAcceptLimiting(ipa.IP)
1613 if cl.acceptLimiter == nil {
1614 cl.acceptLimiter = make(map[ipStr]int)
1616 cl.acceptLimiter[ipStr(ip.String())]++
1619 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1620 if ip4 := ip.To4(); ip4 != nil {
1621 return ip4.Mask(net.CIDRMask(24, 32))
1626 func (cl *Client) clearAcceptLimits() {
1627 cl.acceptLimiter = nil
1630 func (cl *Client) acceptLimitClearer() {
1633 case <-cl.closed.Done():
1635 case <-time.After(15 * time.Minute):
1637 cl.clearAcceptLimits()
1643 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1644 if cl.config.DisableAcceptRateLimiting {
1647 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1650 func (cl *Client) rLock() {
1654 func (cl *Client) rUnlock() {
1658 func (cl *Client) lock() {
1662 func (cl *Client) unlock() {
1666 func (cl *Client) locker() *lockWithDeferreds {
1670 func (cl *Client) String() string {
1671 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1674 // Returns connection-level aggregate stats at the Client level. See the comment on
1675 // TorrentStats.ConnStats.
1676 func (cl *Client) ConnStats() ConnStats {
1677 return cl.stats.Copy()