20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 request_strategy "github.com/anacrolix/torrent/request-strategy"
31 "github.com/davecgh/go-spew/spew"
32 "github.com/dustin/go-humanize"
33 "github.com/google/btree"
34 "github.com/pion/datachannel"
35 "golang.org/x/time/rate"
37 "github.com/anacrolix/chansync"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/internal/limiter"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
46 "github.com/anacrolix/torrent/tracker"
47 "github.com/anacrolix/torrent/webtorrent"
50 // Clients contain zero or more Torrents. A Client manages a blocklist, the
51 // TCP/UDP protocol ports, and DHT as desired.
53 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59 closed chansync.SetOnce
65 defaultStorage *storage.Client
69 dhtServers []DhtServer
70 ipBlockList iplist.Ranger
72 // Set of addresses that have our client ID. This intentionally will
73 // include ourselves if we end up trying to connect to our own address
74 // through legitimate channels.
75 dopplegangerAddrs map[string]struct{}
76 badPeerIPs map[string]struct{}
77 torrents map[InfoHash]*Torrent
78 pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
80 acceptLimiter map[ipStr]int
81 dialRateLimiter *rate.Limiter
84 websocketTrackers websocketTrackers
86 activeAnnounceLimiter limiter.Instance
87 webseedHttpClient *http.Client
92 func (cl *Client) BadPeerIPs() (ips []string) {
94 ips = cl.badPeerIPsLocked()
99 func (cl *Client) badPeerIPsLocked() (ips []string) {
100 ips = make([]string, len(cl.badPeerIPs))
102 for k := range cl.badPeerIPs {
109 func (cl *Client) PeerID() PeerID {
113 // Returns the port number for the first listener that has one. No longer assumes that all port
114 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
116 func (cl *Client) LocalPort() (port int) {
117 for i := 0; i < len(cl.listeners); i += 1 {
118 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
125 func writeDhtServerStatus(w io.Writer, s DhtServer) {
126 dhtStats := s.Stats()
127 fmt.Fprintf(w, " ID: %x\n", s.ID())
128 spew.Fdump(w, dhtStats)
131 // Writes out a human readable status of the client, such as for writing to a
133 func (cl *Client) WriteStatus(_w io.Writer) {
136 w := bufio.NewWriter(_w)
138 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
139 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
140 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
141 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
142 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
143 cl.eachDhtServer(func(s DhtServer) {
144 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
145 writeDhtServerStatus(w, s)
147 spew.Fdump(w, &cl.stats)
148 torrentsSlice := cl.torrentsAsSlice()
149 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
151 sort.Slice(torrentsSlice, func(l, r int) bool {
152 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
154 for _, t := range torrentsSlice {
156 fmt.Fprint(w, "<unknown name>")
158 fmt.Fprint(w, t.name())
164 "%f%% of %d bytes (%s)",
165 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
167 humanize.Bytes(uint64(*t.length)))
169 w.WriteString("<missing metainfo>")
177 // Filters things that are less than warning from UPnP discovery.
178 func upnpDiscoverLogFilter(m log.Msg) bool {
179 level, ok := m.GetLevel()
180 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
183 func (cl *Client) initLogger() {
184 logger := cl.config.Logger
187 if !cl.config.Debug {
188 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
191 cl.logger = logger.WithValues(cl)
194 func (cl *Client) announceKey() int32 {
195 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
198 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
199 func (cl *Client) init(cfg *ClientConfig) {
201 cl.dopplegangerAddrs = make(map[string]struct{})
202 cl.torrents = make(map[metainfo.Hash]*Torrent)
203 cl.dialRateLimiter = rate.NewLimiter(10, 10)
204 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
207 cl.webseedHttpClient = &http.Client{
208 Transport: &http.Transport{
209 Proxy: cfg.HTTPProxy,
215 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
217 cfg = NewDefaultClientConfig()
223 go cl.acceptLimitClearer()
232 storageImpl := cfg.DefaultStorage
233 if storageImpl == nil {
234 // We'd use mmap by default but HFS+ doesn't support sparse files.
235 storageImplCloser := storage.NewFile(cfg.DataDir)
236 cl.onClose = append(cl.onClose, func() {
237 if err := storageImplCloser.Close(); err != nil {
238 cl.logger.Printf("error closing default storage: %s", err)
241 storageImpl = storageImplCloser
243 cl.defaultStorage = storage.NewClient(storageImpl)
245 if cfg.PeerID != "" {
246 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
248 o := copy(cl.peerID[:], cfg.Bep20)
249 _, err = rand.Read(cl.peerID[o:])
251 panic("error generating peer id")
255 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
263 for _, _s := range sockets {
264 s := _s // Go is fucking retarded.
265 cl.onClose = append(cl.onClose, func() { go s.Close() })
266 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
267 cl.dialers = append(cl.dialers, s)
268 cl.listeners = append(cl.listeners, s)
269 if cl.config.AcceptPeerConnections {
270 go cl.acceptConnections(s)
277 for _, s := range sockets {
278 if pc, ok := s.(net.PacketConn); ok {
279 ds, err := cl.NewAnacrolixDhtServer(pc)
283 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
284 cl.onClose = append(cl.onClose, func() { ds.Close() })
289 cl.websocketTrackers = websocketTrackers{
292 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
295 t, ok := cl.torrents[infoHash]
297 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
299 return t.announceRequest(event), nil
301 Proxy: cl.config.HTTPProxy,
302 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
305 t, ok := cl.torrents[dcc.InfoHash]
307 cl.logger.WithDefaultLevel(log.Warning).Printf(
308 "got webrtc conn for unloaded torrent with infohash %x",
314 go t.onWebRtcConn(dc, dcc)
321 func (cl *Client) AddDhtServer(d DhtServer) {
322 cl.dhtServers = append(cl.dhtServers, d)
325 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
326 // given address for any Torrent.
327 func (cl *Client) AddDialer(d Dialer) {
330 cl.dialers = append(cl.dialers, d)
331 for _, t := range cl.torrents {
336 func (cl *Client) Listeners() []Listener {
340 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
342 func (cl *Client) AddListener(l Listener) {
343 cl.listeners = append(cl.listeners, l)
344 if cl.config.AcceptPeerConnections {
345 go cl.acceptConnections(l)
349 func (cl *Client) firewallCallback(net.Addr) bool {
351 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
354 torrent.Add("connections firewalled", 1)
356 torrent.Add("connections not firewalled", 1)
361 func (cl *Client) listenOnNetwork(n network) bool {
362 if n.Ipv4 && cl.config.DisableIPv4 {
365 if n.Ipv6 && cl.config.DisableIPv6 {
368 if n.Tcp && cl.config.DisableTCP {
371 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
377 func (cl *Client) listenNetworks() (ns []network) {
378 for _, n := range allPeerNetworks {
379 if cl.listenOnNetwork(n) {
386 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
387 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
388 cfg := dht.ServerConfig{
389 IPBlocklist: cl.ipBlockList,
391 OnAnnouncePeer: cl.onDHTAnnouncePeer,
392 PublicIP: func() net.IP {
393 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
394 return cl.config.PublicIp6
396 return cl.config.PublicIp4
398 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
399 OnQuery: cl.config.DHTOnQuery,
400 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
402 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
405 s, err = dht.NewServer(&cfg)
408 ts, err := s.Bootstrap()
410 cl.logger.Printf("error bootstrapping dht: %s", err)
412 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
418 func (cl *Client) Closed() events.Done {
419 return cl.closed.Done()
422 func (cl *Client) eachDhtServer(f func(DhtServer)) {
423 for _, ds := range cl.dhtServers {
428 // Stops the client. All connections to peers are closed and all activity will come to a halt.
429 func (cl *Client) Close() (errs []error) {
430 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
432 for _, t := range cl.torrents {
433 err := t.close(&closeGroup)
435 errs = append(errs, err)
438 for i := range cl.onClose {
439 cl.onClose[len(cl.onClose)-1-i]()
444 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
448 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
449 if cl.ipBlockList == nil {
452 return cl.ipBlockList.Lookup(ip)
455 func (cl *Client) ipIsBlocked(ip net.IP) bool {
456 _, blocked := cl.ipBlockRange(ip)
460 func (cl *Client) wantConns() bool {
461 if cl.config.AlwaysWantConns {
464 for _, t := range cl.torrents {
472 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
473 func (cl *Client) rejectAccepted(conn net.Conn) error {
475 return errors.New("don't want conns right now")
477 ra := conn.RemoteAddr()
478 if rip := addrIpOrNil(ra); rip != nil {
479 if cl.config.DisableIPv4Peers && rip.To4() != nil {
480 return errors.New("ipv4 peers disabled")
482 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
483 return errors.New("ipv4 disabled")
485 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
486 return errors.New("ipv6 disabled")
488 if cl.rateLimitAccept(rip) {
489 return errors.New("source IP accepted rate limited")
491 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
492 return errors.New("bad source addr")
498 func (cl *Client) acceptConnections(l Listener) {
500 conn, err := l.Accept()
501 torrent.Add("client listener accepts", 1)
502 conn = pproffd.WrapNetConn(conn)
504 closed := cl.closed.IsSet()
506 if !closed && conn != nil {
507 reject = cl.rejectAccepted(conn)
517 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
522 torrent.Add("rejected accepted connections", 1)
523 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
526 go cl.incomingConnection(conn)
528 log.Fmsg("accepted %q connection at %q from %q",
532 ).SetLevel(log.Debug).Log(cl.logger)
533 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
534 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
535 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
540 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
541 func regularNetConnPeerConnConnString(nc net.Conn) string {
542 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
545 func (cl *Client) incomingConnection(nc net.Conn) {
547 if tc, ok := nc.(*net.TCPConn); ok {
550 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
551 regularNetConnPeerConnConnString(nc))
557 c.Discovery = PeerSourceIncoming
558 cl.runReceivedConn(c)
561 // Returns a handle to the given torrent, if it's present in the client.
562 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
565 t, ok = cl.torrents[ih]
569 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
570 return cl.torrents[ih]
573 type DialResult struct {
578 func countDialResult(err error) {
580 torrent.Add("successful dials", 1)
582 torrent.Add("unsuccessful dials", 1)
586 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
587 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
588 if ret < minDialTimeout {
594 // Returns whether an address is known to connect to a client with our own ID.
595 func (cl *Client) dopplegangerAddr(addr string) bool {
596 _, ok := cl.dopplegangerAddrs[addr]
600 // Returns a connection over UTP or TCP, whichever is first to connect.
601 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
602 return DialFirst(ctx, addr, cl.dialers)
605 // Returns a connection over UTP or TCP, whichever is first to connect.
606 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
608 t := perf.NewTimer(perf.CallerName(0))
611 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
613 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
617 ctx, cancel := context.WithCancel(ctx)
618 // As soon as we return one connection, cancel the others.
621 resCh := make(chan DialResult, left)
622 for _, _s := range dialers {
627 dialFromSocket(ctx, s, addr),
632 // Wait for a successful connection.
634 defer perf.ScopeTimer()()
635 for ; left > 0 && res.Conn == nil; left-- {
639 // There are still incompleted dials.
641 for ; left > 0; left-- {
642 conn := (<-resCh).Conn
649 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
654 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
655 c, err := s.Dial(ctx, addr)
656 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
657 // it now in case we close the connection forthwith.
658 if tc, ok := c.(*net.TCPConn); ok {
665 func forgettableDialError(err error) bool {
666 return strings.Contains(err.Error(), "no suitable address found")
669 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
670 if _, ok := t.halfOpen[addr]; !ok {
671 panic("invariant broken")
673 delete(t.halfOpen, addr)
675 for _, t := range cl.torrents {
680 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
681 // for valid reasons.
682 func (cl *Client) initiateProtocolHandshakes(
686 outgoing, encryptHeader bool,
687 remoteAddr PeerRemoteAddr,
688 network, connString string,
690 c *PeerConn, err error,
692 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
693 c.headerEncrypted = encryptHeader
694 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
696 dl, ok := ctx.Deadline()
700 err = nc.SetDeadline(dl)
704 err = cl.initiateHandshakes(c, t)
708 // Returns nil connection and nil error if no connection could be established for valid reasons.
709 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
710 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
713 return t.dialTimeout()
716 dr := cl.dialFirst(dialCtx, addr.String())
719 if dialCtx.Err() != nil {
720 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
722 return nil, errors.New("dial failed")
724 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
731 // Returns nil connection and nil error if no connection could be established
732 // for valid reasons.
733 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
734 torrent.Add("establish outgoing connection", 1)
735 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
736 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
738 torrent.Add("initiated conn with preferred header obfuscation", 1)
741 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
742 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
743 // We should have just tried with the preferred header obfuscation. If it was required,
744 // there's nothing else to try.
747 // Try again with encryption if we didn't earlier, or without if we did.
748 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
750 torrent.Add("initiated conn with fallback header obfuscation", 1)
752 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
756 // Called to dial out and run a connection. The addr we're given is already
757 // considered half-open.
758 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
759 cl.dialRateLimiter.Wait(context.Background())
760 c, err := cl.establishOutgoingConn(t, addr)
762 c.conn.SetWriteDeadline(time.Time{})
766 // Don't release lock between here and addPeerConn, unless it's for
768 cl.noLongerHalfOpen(t, addr.String())
771 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
778 t.runHandshookConnLoggingErr(c)
781 // The port number for incoming peer connections. 0 if the client isn't listening.
782 func (cl *Client) incomingPeerPort() int {
783 return cl.LocalPort()
786 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
787 if c.headerEncrypted {
790 rw, c.cryptoMethod, err = mse.InitiateHandshake(
797 cl.config.CryptoProvides,
801 return fmt.Errorf("header obfuscation handshake: %w", err)
804 ih, err := cl.connBtHandshake(c, &t.infoHash)
806 return fmt.Errorf("bittorrent protocol handshake: %w", err)
808 if ih != t.infoHash {
809 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
814 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
815 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
816 func (cl *Client) forSkeys(f func([]byte) bool) {
819 if false { // Emulate the bug from #114
821 for ih := range cl.torrents {
825 for range cl.torrents {
832 for ih := range cl.torrents {
839 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
840 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
846 // Do encryption and bittorrent handshakes as receiver.
847 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
848 defer perf.ScopeTimerErr(&err)()
850 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
852 if err == nil || err == mse.ErrNoSecretKeyMatch {
853 if c.headerEncrypted {
854 torrent.Add("handshakes received encrypted", 1)
856 torrent.Add("handshakes received unencrypted", 1)
859 torrent.Add("handshakes received with error while handling encryption", 1)
862 if err == mse.ErrNoSecretKeyMatch {
867 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
868 err = errors.New("connection does not have required header obfuscation")
871 ih, err := cl.connBtHandshake(c, nil)
873 return nil, fmt.Errorf("during bt handshake: %w", err)
881 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
885 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
886 &successfulPeerWireProtocolHandshakePeerReservedBytes)
889 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
890 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
894 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
896 c.PeerExtensionBytes = res.PeerExtensionBits
897 c.PeerID = res.PeerID
898 c.completedHandshake = time.Now()
899 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
905 func (cl *Client) runReceivedConn(c *PeerConn) {
906 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
910 t, err := cl.receiveHandshakes(c)
913 "error receiving handshakes on %v: %s", c, err,
914 ).SetLevel(log.Debug).
916 "network", c.Network,
918 torrent.Add("error receiving handshake", 1)
920 cl.onBadAccept(c.RemoteAddr)
925 torrent.Add("received handshake for unloaded torrent", 1)
926 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
928 cl.onBadAccept(c.RemoteAddr)
932 torrent.Add("received handshake for loaded torrent", 1)
933 c.conn.SetWriteDeadline(time.Time{})
936 t.runHandshookConnLoggingErr(c)
939 // Client lock must be held before entering this.
940 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
942 for i, b := range cl.config.MinPeerExtensions {
943 if c.PeerExtensionBytes[i]&b != b {
944 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
947 if c.PeerID == cl.peerID {
950 addr := c.RemoteAddr.String()
951 cl.dopplegangerAddrs[addr] = struct{}{}
953 // Because the remote address is not necessarily the same as its client's torrent listen
954 // address, we won't record the remote address as a doppleganger. Instead, the initiator
955 // can record *us* as the doppleganger.
957 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
960 c.r = deadlineReader{c.conn, c.r}
961 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
962 if connIsIpv6(c.conn) {
963 torrent.Add("completed handshake over ipv6", 1)
965 if err := t.addPeerConn(c); err != nil {
966 return fmt.Errorf("adding connection: %w", err)
968 defer t.dropConnection(c)
970 cl.sendInitialMessages(c, t)
971 c.initUpdateRequestsTimer()
972 err := c.mainReadLoop()
974 return fmt.Errorf("main read loop: %w", err)
981 func (p *Peer) initUpdateRequestsTimer() {
983 if p.updateRequestsTimer != nil {
984 panic(p.updateRequestsTimer)
987 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
988 p.updateRequestsTimer.Stop()
991 func (c *Peer) updateRequestsTimerFunc() {
993 defer c.locker().Unlock()
994 if c.closed.IsSet() {
997 if c.needRequestUpdate != "" {
1000 if c.isLowOnRequests() {
1001 // If there are no outstanding requests, then a request update should have already run.
1004 c.updateRequests("updateRequestsTimer")
1007 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1008 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1009 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1010 const localClientReqq = 1 << 5
1012 // See the order given in Transmission's tr_peerMsgsNew.
1013 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1014 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1015 conn.write(pp.Message{
1017 ExtendedID: pp.HandshakeExtendedID,
1018 ExtendedPayload: func() []byte {
1019 msg := pp.ExtendedHandshakeMessage{
1020 M: map[pp.ExtensionName]pp.ExtensionNumber{
1021 pp.ExtensionNameMetadata: metadataExtendedId,
1023 V: cl.config.ExtendedHandshakeClientVersion,
1024 Reqq: localClientReqq,
1025 YourIp: pp.CompactIp(conn.remoteIp()),
1026 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1027 Port: cl.incomingPeerPort(),
1028 MetadataSize: torrent.metadataSize(),
1029 // TODO: We can figured these out specific to the socket
1031 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1032 Ipv6: cl.config.PublicIp6.To16(),
1034 if !cl.config.DisablePEX {
1035 msg.M[pp.ExtensionNamePex] = pexExtendedId
1037 return bencode.MustMarshal(msg)
1042 if conn.fastEnabled() {
1043 if torrent.haveAllPieces() {
1044 conn.write(pp.Message{Type: pp.HaveAll})
1045 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1047 } else if !torrent.haveAnyPieces() {
1048 conn.write(pp.Message{Type: pp.HaveNone})
1049 conn.sentHaves.Clear()
1055 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1056 conn.write(pp.Message{
1063 func (cl *Client) dhtPort() (ret uint16) {
1064 if len(cl.dhtServers) == 0 {
1067 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1070 func (cl *Client) haveDhtServer() bool {
1071 return len(cl.dhtServers) > 0
1074 // Process incoming ut_metadata message.
1075 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1076 var d pp.ExtendedMetadataRequestMsg
1077 err := bencode.Unmarshal(payload, &d)
1078 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1079 } else if err != nil {
1080 return fmt.Errorf("error unmarshalling bencode: %s", err)
1084 case pp.DataMetadataExtensionMsgType:
1085 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1086 if !c.requestedMetadataPiece(piece) {
1087 return fmt.Errorf("got unexpected piece %d", piece)
1089 c.metadataRequests[piece] = false
1090 begin := len(payload) - d.PieceSize()
1091 if begin < 0 || begin >= len(payload) {
1092 return fmt.Errorf("data has bad offset in payload: %d", begin)
1094 t.saveMetadataPiece(piece, payload[begin:])
1095 c.lastUsefulChunkReceived = time.Now()
1096 err = t.maybeCompleteMetadata()
1098 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1099 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1100 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1101 // log consumers can filter for this message.
1102 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1105 case pp.RequestMetadataExtensionMsgType:
1106 if !t.haveMetadataPiece(piece) {
1107 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1110 start := (1 << 14) * piece
1111 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1112 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1114 case pp.RejectMetadataExtensionMsgType:
1117 return errors.New("unknown msg_type value")
1121 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1122 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1123 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1128 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1132 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1135 if _, ok := cl.ipBlockRange(ip); ok {
1138 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1144 // Return a Torrent ready for insertion into a Client.
1145 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1146 return cl.newTorrentOpt(AddTorrentOpts{
1148 Storage: specStorage,
1152 // Return a Torrent ready for insertion into a Client.
1153 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1154 // use provided storage, if provided
1155 storageClient := cl.defaultStorage
1156 if opts.Storage != nil {
1157 storageClient = storage.NewClient(opts.Storage)
1162 infoHash: opts.InfoHash,
1163 peers: prioritizedPeers{
1165 getPrio: func(p PeerInfo) peerPriority {
1167 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1170 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1172 halfOpen: make(map[string]PeerInfo),
1173 pieceStateChanges: pubsub.NewPubSub(),
1175 storageOpener: storageClient,
1176 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1178 metadataChanged: sync.Cond{
1181 webSeeds: make(map[string]*Peer),
1182 gotMetainfoC: make(chan struct{}),
1184 t.networkingEnabled.Set()
1185 t.logger = cl.logger.WithContextValue(t)
1186 if opts.ChunkSize == 0 {
1187 opts.ChunkSize = defaultChunkSize
1189 t.setChunkSize(opts.ChunkSize)
1193 // A file-like handle to some torrent data resource.
1194 type Handle interface {
1201 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1202 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1205 // Adds a torrent by InfoHash with a custom Storage implementation.
1206 // If the torrent already exists then this Storage is ignored and the
1207 // existing torrent returned with `new` set to `false`
1208 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1211 t, ok := cl.torrents[infoHash]
1217 t = cl.newTorrent(infoHash, specStorage)
1218 cl.eachDhtServer(func(s DhtServer) {
1219 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1220 go t.dhtAnnouncer(s)
1223 cl.torrents[infoHash] = t
1224 cl.clearAcceptLimits()
1225 t.updateWantPeersEvent()
1226 // Tickle Client.waitAccept, new torrent may want conns.
1227 cl.event.Broadcast()
1231 // Adds a torrent by InfoHash with a custom Storage implementation.
1232 // If the torrent already exists then this Storage is ignored and the
1233 // existing torrent returned with `new` set to `false`
1234 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1235 infoHash := opts.InfoHash
1238 t, ok := cl.torrents[infoHash]
1244 t = cl.newTorrentOpt(opts)
1245 cl.eachDhtServer(func(s DhtServer) {
1246 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1247 go t.dhtAnnouncer(s)
1250 cl.torrents[infoHash] = t
1251 cl.clearAcceptLimits()
1252 t.updateWantPeersEvent()
1253 // Tickle Client.waitAccept, new torrent may want conns.
1254 cl.event.Broadcast()
1258 type AddTorrentOpts struct {
1260 Storage storage.ClientImpl
1261 ChunkSize pp.Integer
1264 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1265 // Torrent.MergeSpec.
1266 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1267 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1268 InfoHash: spec.InfoHash,
1269 Storage: spec.Storage,
1270 ChunkSize: spec.ChunkSize,
1274 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1276 modSpec.ChunkSize = 0
1278 err = t.MergeSpec(&modSpec)
1279 if err != nil && new {
1285 type stringAddr string
1287 var _ net.Addr = stringAddr("")
1289 func (stringAddr) Network() string { return "" }
1290 func (me stringAddr) String() string { return string(me) }
1292 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1293 // spec.DisallowDataDownload/Upload will be read and applied
1294 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1295 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1296 if spec.DisplayName != "" {
1297 t.SetDisplayName(spec.DisplayName)
1299 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1300 if spec.InfoBytes != nil {
1301 err := t.SetInfoBytes(spec.InfoBytes)
1307 cl.AddDhtNodes(spec.DhtNodes)
1310 useTorrentSources(spec.Sources, t)
1311 for _, url := range spec.Webseeds {
1314 for _, peerAddr := range spec.PeerAddrs {
1316 Addr: stringAddr(peerAddr),
1317 Source: PeerSourceDirect,
1321 if spec.ChunkSize != 0 {
1322 panic("chunk size cannot be changed for existing Torrent")
1324 t.addTrackers(spec.Trackers)
1326 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1327 t.dataUploadDisallowed = spec.DisallowDataUpload
1331 func useTorrentSources(sources []string, t *Torrent) {
1332 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1333 ctx := context.Background()
1334 for i := 0; i < len(sources); i += 1 {
1337 if err := useTorrentSource(ctx, s, t); err != nil {
1338 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1340 t.logger.Printf("successfully used source %q", s)
1346 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1347 ctx, cancel := context.WithCancel(ctx)
1357 var req *http.Request
1358 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1361 var resp *http.Response
1362 if resp, err = http.DefaultClient.Do(req); err != nil {
1365 var mi metainfo.MetaInfo
1366 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1369 if ctx.Err() != nil {
1374 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1377 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1378 t, ok := cl.torrents[infoHash]
1380 err = fmt.Errorf("no such torrent")
1387 delete(cl.torrents, infoHash)
1391 func (cl *Client) allTorrentsCompleted() bool {
1392 for _, t := range cl.torrents {
1396 if !t.haveAllPieces() {
1403 // Returns true when all torrents are completely downloaded and false if the
1404 // client is stopped before that.
1405 func (cl *Client) WaitAll() bool {
1408 for !cl.allTorrentsCompleted() {
1409 if cl.closed.IsSet() {
1417 // Returns handles to all the torrents loaded in the Client.
1418 func (cl *Client) Torrents() []*Torrent {
1421 return cl.torrentsAsSlice()
1424 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1425 for _, t := range cl.torrents {
1426 ret = append(ret, t)
1431 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1432 spec, err := TorrentSpecFromMagnetUri(uri)
1436 T, _, err = cl.AddTorrentSpec(spec)
1440 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1441 ts, err := TorrentSpecFromMetaInfoErr(mi)
1445 T, _, err = cl.AddTorrentSpec(ts)
1449 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1450 mi, err := metainfo.LoadFromFile(filename)
1454 return cl.AddTorrent(mi)
1457 func (cl *Client) DhtServers() []DhtServer {
1458 return cl.dhtServers
1461 func (cl *Client) AddDhtNodes(nodes []string) {
1462 for _, n := range nodes {
1463 hmp := missinggo.SplitHostMaybePort(n)
1464 ip := net.ParseIP(hmp.Host)
1466 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1469 ni := krpc.NodeInfo{
1470 Addr: krpc.NodeAddr{
1475 cl.eachDhtServer(func(s DhtServer) {
1481 func (cl *Client) banPeerIP(ip net.IP) {
1482 cl.logger.Printf("banning ip %v", ip)
1483 if cl.badPeerIPs == nil {
1484 cl.badPeerIPs = make(map[string]struct{})
1486 cl.badPeerIPs[ip.String()] = struct{}{}
1489 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1498 PeerMaxRequests: 250,
1500 RemoteAddr: remoteAddr,
1502 callbacks: &cl.config.Callbacks,
1504 connString: connString,
1508 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1509 c.setRW(connStatsReadWriter{nc, c})
1510 c.r = &rateLimitedReader{
1511 l: cl.config.DownloadRateLimiter,
1514 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1515 for _, f := range cl.config.Callbacks.NewPeer {
1521 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1528 t.addPeers([]PeerInfo{{
1529 Addr: ipPortAddr{ip, port},
1530 Source: PeerSourceDhtAnnouncePeer,
1534 func firstNotNil(ips ...net.IP) net.IP {
1535 for _, ip := range ips {
1543 func (cl *Client) eachListener(f func(Listener) bool) {
1544 for _, s := range cl.listeners {
1551 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1552 for i := 0; i < len(cl.listeners); i += 1 {
1553 if ret = cl.listeners[i]; f(ret) {
1560 func (cl *Client) publicIp(peer net.IP) net.IP {
1561 // TODO: Use BEP 10 to determine how peers are seeing us.
1562 if peer.To4() != nil {
1564 cl.config.PublicIp4,
1565 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1570 cl.config.PublicIp6,
1571 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1575 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1576 l := cl.findListener(
1577 func(l Listener) bool {
1578 return f(addrIpOrNil(l.Addr()))
1584 return addrIpOrNil(l.Addr())
1587 // Our IP as a peer should see it.
1588 func (cl *Client) publicAddr(peer net.IP) IpPort {
1589 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1592 // ListenAddrs addresses currently being listened to.
1593 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1595 ret = make([]net.Addr, len(cl.listeners))
1596 for i := 0; i < len(cl.listeners); i += 1 {
1597 ret[i] = cl.listeners[i].Addr()
1603 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1604 ipa, ok := tryIpPortFromNetAddr(addr)
1608 ip := maskIpForAcceptLimiting(ipa.IP)
1609 if cl.acceptLimiter == nil {
1610 cl.acceptLimiter = make(map[ipStr]int)
1612 cl.acceptLimiter[ipStr(ip.String())]++
1615 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1616 if ip4 := ip.To4(); ip4 != nil {
1617 return ip4.Mask(net.CIDRMask(24, 32))
1622 func (cl *Client) clearAcceptLimits() {
1623 cl.acceptLimiter = nil
1626 func (cl *Client) acceptLimitClearer() {
1629 case <-cl.closed.Done():
1631 case <-time.After(15 * time.Minute):
1633 cl.clearAcceptLimits()
1639 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1640 if cl.config.DisableAcceptRateLimiting {
1643 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1646 func (cl *Client) rLock() {
1650 func (cl *Client) rUnlock() {
1654 func (cl *Client) lock() {
1658 func (cl *Client) unlock() {
1662 func (cl *Client) locker() *lockWithDeferreds {
1666 func (cl *Client) String() string {
1667 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1670 // Returns connection-level aggregate stats at the Client level. See the comment on
1671 // TorrentStats.ConnStats.
1672 func (cl *Client) ConnStats() ConnStats {
1673 return cl.stats.Copy()