21 "github.com/anacrolix/chansync/events"
22 "github.com/anacrolix/dht/v2"
23 "github.com/anacrolix/dht/v2/krpc"
24 "github.com/anacrolix/log"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pubsub"
27 "github.com/anacrolix/missinggo/v2"
28 "github.com/anacrolix/missinggo/v2/bitmap"
29 "github.com/anacrolix/missinggo/v2/pproffd"
30 "github.com/anacrolix/sync"
31 "github.com/anacrolix/torrent/option"
32 request_strategy "github.com/anacrolix/torrent/request-strategy"
33 "github.com/davecgh/go-spew/spew"
34 "github.com/dustin/go-humanize"
35 "github.com/google/btree"
36 "github.com/pion/datachannel"
37 "golang.org/x/time/rate"
39 "github.com/anacrolix/chansync"
41 "github.com/anacrolix/torrent/bencode"
42 "github.com/anacrolix/torrent/internal/limiter"
43 "github.com/anacrolix/torrent/iplist"
44 "github.com/anacrolix/torrent/metainfo"
45 "github.com/anacrolix/torrent/mse"
46 pp "github.com/anacrolix/torrent/peer_protocol"
47 "github.com/anacrolix/torrent/storage"
48 "github.com/anacrolix/torrent/tracker"
49 "github.com/anacrolix/torrent/webtorrent"
52 // Clients contain zero or more Torrents. A Client manages a blocklist, the
53 // TCP/UDP protocol ports, and DHT as desired.
55 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
61 closed chansync.SetOnce
67 defaultStorage *storage.Client
71 dhtServers []DhtServer
72 ipBlockList iplist.Ranger
74 // Set of addresses that have our client ID. This intentionally will
75 // include ourselves if we end up trying to connect to our own address
76 // through legitimate channels.
77 dopplegangerAddrs map[string]struct{}
78 badPeerIPs map[string]struct{}
79 bannedPrefixes map[string]struct{}
80 torrents map[InfoHash]*Torrent
81 pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
83 acceptLimiter map[ipStr]int
84 dialRateLimiter *rate.Limiter
87 websocketTrackers websocketTrackers
89 activeAnnounceLimiter limiter.Instance
90 webseedHttpClient *http.Client
95 func (cl *Client) BadPeerIPs() (ips []string) {
97 ips = cl.badPeerIPsLocked()
102 func (cl *Client) badPeerIPsLocked() (ips []string) {
103 ips = make([]string, len(cl.badPeerIPs))
105 for k := range cl.badPeerIPs {
112 func (cl *Client) PeerID() PeerID {
116 // Returns the port number for the first listener that has one. No longer assumes that all port
117 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
119 func (cl *Client) LocalPort() (port int) {
120 for i := 0; i < len(cl.listeners); i += 1 {
121 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
128 func writeDhtServerStatus(w io.Writer, s DhtServer) {
129 dhtStats := s.Stats()
130 fmt.Fprintf(w, " ID: %x\n", s.ID())
131 spew.Fdump(w, dhtStats)
134 // Writes out a human readable status of the client, such as for writing to a
136 func (cl *Client) WriteStatus(_w io.Writer) {
139 w := bufio.NewWriter(_w)
141 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
142 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
143 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
144 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
145 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
146 cl.eachDhtServer(func(s DhtServer) {
147 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
148 writeDhtServerStatus(w, s)
150 spew.Fdump(w, &cl.stats)
151 torrentsSlice := cl.torrentsAsSlice()
152 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
154 sort.Slice(torrentsSlice, func(l, r int) bool {
155 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
157 for _, t := range torrentsSlice {
159 fmt.Fprint(w, "<unknown name>")
161 fmt.Fprint(w, t.name())
167 "%f%% of %d bytes (%s)",
168 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
170 humanize.Bytes(uint64(*t.length)))
172 w.WriteString("<missing metainfo>")
180 // Filters things that are less than warning from UPnP discovery.
181 func upnpDiscoverLogFilter(m log.Msg) bool {
182 level, ok := m.GetLevel()
183 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
186 func (cl *Client) initLogger() {
187 logger := cl.config.Logger
190 if !cl.config.Debug {
191 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
194 cl.logger = logger.WithValues(cl)
197 func (cl *Client) announceKey() int32 {
198 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
201 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
202 func (cl *Client) init(cfg *ClientConfig) {
204 cl.dopplegangerAddrs = make(map[string]struct{})
205 cl.torrents = make(map[metainfo.Hash]*Torrent)
206 cl.dialRateLimiter = rate.NewLimiter(10, 10)
207 cl.activeAnnounceLimiter.SlotsPerKey = 2
208 cl.event.L = cl.locker()
209 cl.ipBlockList = cfg.IPBlocklist
210 cl.webseedHttpClient = &http.Client{
211 Transport: &http.Transport{
212 Proxy: cfg.HTTPProxy,
216 cl.bannedPrefixes = make(map[banPrefix]struct{})
219 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
221 cfg = NewDefaultClientConfig()
227 go cl.acceptLimitClearer()
236 storageImpl := cfg.DefaultStorage
237 if storageImpl == nil {
238 // We'd use mmap by default but HFS+ doesn't support sparse files.
239 storageImplCloser := storage.NewFile(cfg.DataDir)
240 cl.onClose = append(cl.onClose, func() {
241 if err := storageImplCloser.Close(); err != nil {
242 cl.logger.Printf("error closing default storage: %s", err)
245 storageImpl = storageImplCloser
247 cl.defaultStorage = storage.NewClient(storageImpl)
249 if cfg.PeerID != "" {
250 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
252 o := copy(cl.peerID[:], cfg.Bep20)
253 _, err = rand.Read(cl.peerID[o:])
255 panic("error generating peer id")
259 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
267 for _, _s := range sockets {
268 s := _s // Go is fucking retarded.
269 cl.onClose = append(cl.onClose, func() { go s.Close() })
270 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
271 cl.dialers = append(cl.dialers, s)
272 cl.listeners = append(cl.listeners, s)
273 if cl.config.AcceptPeerConnections {
274 go cl.acceptConnections(s)
281 for _, s := range sockets {
282 if pc, ok := s.(net.PacketConn); ok {
283 ds, err := cl.NewAnacrolixDhtServer(pc)
287 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
288 cl.onClose = append(cl.onClose, func() { ds.Close() })
293 cl.websocketTrackers = websocketTrackers{
296 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
299 t, ok := cl.torrents[infoHash]
301 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
303 return t.announceRequest(event), nil
305 Proxy: cl.config.HTTPProxy,
306 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
309 t, ok := cl.torrents[dcc.InfoHash]
311 cl.logger.WithDefaultLevel(log.Warning).Printf(
312 "got webrtc conn for unloaded torrent with infohash %x",
318 go t.onWebRtcConn(dc, dcc)
325 func (cl *Client) AddDhtServer(d DhtServer) {
326 cl.dhtServers = append(cl.dhtServers, d)
329 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
330 // given address for any Torrent.
331 func (cl *Client) AddDialer(d Dialer) {
334 cl.dialers = append(cl.dialers, d)
335 for _, t := range cl.torrents {
340 func (cl *Client) Listeners() []Listener {
344 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
346 func (cl *Client) AddListener(l Listener) {
347 cl.listeners = append(cl.listeners, l)
348 if cl.config.AcceptPeerConnections {
349 go cl.acceptConnections(l)
353 func (cl *Client) firewallCallback(net.Addr) bool {
355 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
358 torrent.Add("connections firewalled", 1)
360 torrent.Add("connections not firewalled", 1)
365 func (cl *Client) listenOnNetwork(n network) bool {
366 if n.Ipv4 && cl.config.DisableIPv4 {
369 if n.Ipv6 && cl.config.DisableIPv6 {
372 if n.Tcp && cl.config.DisableTCP {
375 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
381 func (cl *Client) listenNetworks() (ns []network) {
382 for _, n := range allPeerNetworks {
383 if cl.listenOnNetwork(n) {
390 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
391 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
392 cfg := dht.ServerConfig{
393 IPBlocklist: cl.ipBlockList,
395 OnAnnouncePeer: cl.onDHTAnnouncePeer,
396 PublicIP: func() net.IP {
397 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
398 return cl.config.PublicIp6
400 return cl.config.PublicIp4
402 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
403 OnQuery: cl.config.DHTOnQuery,
404 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
406 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
409 s, err = dht.NewServer(&cfg)
412 ts, err := s.Bootstrap()
414 cl.logger.Printf("error bootstrapping dht: %s", err)
416 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
422 func (cl *Client) Closed() events.Done {
423 return cl.closed.Done()
426 func (cl *Client) eachDhtServer(f func(DhtServer)) {
427 for _, ds := range cl.dhtServers {
432 // Stops the client. All connections to peers are closed and all activity will come to a halt.
433 func (cl *Client) Close() (errs []error) {
434 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
436 for _, t := range cl.torrents {
437 err := t.close(&closeGroup)
439 errs = append(errs, err)
442 for i := range cl.onClose {
443 cl.onClose[len(cl.onClose)-1-i]()
448 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
452 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
453 if cl.ipBlockList == nil {
456 return cl.ipBlockList.Lookup(ip)
459 func (cl *Client) ipIsBlocked(ip net.IP) bool {
460 _, blocked := cl.ipBlockRange(ip)
464 func (cl *Client) wantConns() bool {
465 if cl.config.AlwaysWantConns {
468 for _, t := range cl.torrents {
476 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
477 func (cl *Client) rejectAccepted(conn net.Conn) error {
479 return errors.New("don't want conns right now")
481 ra := conn.RemoteAddr()
482 if rip := addrIpOrNil(ra); rip != nil {
483 if cl.config.DisableIPv4Peers && rip.To4() != nil {
484 return errors.New("ipv4 peers disabled")
486 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
487 return errors.New("ipv4 disabled")
489 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
490 return errors.New("ipv6 disabled")
492 if cl.rateLimitAccept(rip) {
493 return errors.New("source IP accepted rate limited")
495 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
496 return errors.New("bad source addr")
502 func (cl *Client) acceptConnections(l Listener) {
504 conn, err := l.Accept()
505 torrent.Add("client listener accepts", 1)
506 conn = pproffd.WrapNetConn(conn)
508 closed := cl.closed.IsSet()
510 if !closed && conn != nil {
511 reject = cl.rejectAccepted(conn)
521 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
526 torrent.Add("rejected accepted connections", 1)
527 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
530 go cl.incomingConnection(conn)
532 log.Fmsg("accepted %q connection at %q from %q",
536 ).SetLevel(log.Debug).Log(cl.logger)
537 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
538 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
539 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
544 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
545 func regularNetConnPeerConnConnString(nc net.Conn) string {
546 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
549 func (cl *Client) incomingConnection(nc net.Conn) {
551 if tc, ok := nc.(*net.TCPConn); ok {
554 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
555 regularNetConnPeerConnConnString(nc))
561 c.Discovery = PeerSourceIncoming
562 cl.runReceivedConn(c)
565 // Returns a handle to the given torrent, if it's present in the client.
566 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
569 t, ok = cl.torrents[ih]
573 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
574 return cl.torrents[ih]
577 type DialResult struct {
582 func countDialResult(err error) {
584 torrent.Add("successful dials", 1)
586 torrent.Add("unsuccessful dials", 1)
590 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
591 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
592 if ret < minDialTimeout {
598 // Returns whether an address is known to connect to a client with our own ID.
599 func (cl *Client) dopplegangerAddr(addr string) bool {
600 _, ok := cl.dopplegangerAddrs[addr]
604 // Returns a connection over UTP or TCP, whichever is first to connect.
605 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
606 return DialFirst(ctx, addr, cl.dialers)
609 // Returns a connection over UTP or TCP, whichever is first to connect.
610 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
612 t := perf.NewTimer(perf.CallerName(0))
615 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
617 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
621 ctx, cancel := context.WithCancel(ctx)
622 // As soon as we return one connection, cancel the others.
625 resCh := make(chan DialResult, left)
626 for _, _s := range dialers {
631 dialFromSocket(ctx, s, addr),
636 // Wait for a successful connection.
638 defer perf.ScopeTimer()()
639 for ; left > 0 && res.Conn == nil; left-- {
643 // There are still incompleted dials.
645 for ; left > 0; left-- {
646 conn := (<-resCh).Conn
653 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
658 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
659 c, err := s.Dial(ctx, addr)
660 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
661 // it now in case we close the connection forthwith.
662 if tc, ok := c.(*net.TCPConn); ok {
669 func forgettableDialError(err error) bool {
670 return strings.Contains(err.Error(), "no suitable address found")
673 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
674 if _, ok := t.halfOpen[addr]; !ok {
675 panic("invariant broken")
677 delete(t.halfOpen, addr)
679 for _, t := range cl.torrents {
684 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
685 // for valid reasons.
686 func (cl *Client) initiateProtocolHandshakes(
690 outgoing, encryptHeader bool,
691 remoteAddr PeerRemoteAddr,
692 network, connString string,
694 c *PeerConn, err error,
696 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
697 c.headerEncrypted = encryptHeader
698 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
700 dl, ok := ctx.Deadline()
704 err = nc.SetDeadline(dl)
708 err = cl.initiateHandshakes(c, t)
712 // Returns nil connection and nil error if no connection could be established for valid reasons.
713 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
714 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
717 return t.dialTimeout()
720 dr := cl.dialFirst(dialCtx, addr.String())
723 if dialCtx.Err() != nil {
724 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
726 return nil, errors.New("dial failed")
728 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
735 // Returns nil connection and nil error if no connection could be established
736 // for valid reasons.
737 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
738 torrent.Add("establish outgoing connection", 1)
739 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
740 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
742 torrent.Add("initiated conn with preferred header obfuscation", 1)
745 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
746 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
747 // We should have just tried with the preferred header obfuscation. If it was required,
748 // there's nothing else to try.
751 // Try again with encryption if we didn't earlier, or without if we did.
752 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
754 torrent.Add("initiated conn with fallback header obfuscation", 1)
756 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
760 // Called to dial out and run a connection. The addr we're given is already
761 // considered half-open.
762 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
763 cl.dialRateLimiter.Wait(context.Background())
764 c, err := cl.establishOutgoingConn(t, addr)
766 c.conn.SetWriteDeadline(time.Time{})
770 // Don't release lock between here and addPeerConn, unless it's for
772 cl.noLongerHalfOpen(t, addr.String())
775 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
782 t.runHandshookConnLoggingErr(c)
785 // The port number for incoming peer connections. 0 if the client isn't listening.
786 func (cl *Client) incomingPeerPort() int {
787 return cl.LocalPort()
790 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
791 if c.headerEncrypted {
794 rw, c.cryptoMethod, err = mse.InitiateHandshake(
801 cl.config.CryptoProvides,
805 return fmt.Errorf("header obfuscation handshake: %w", err)
808 ih, err := cl.connBtHandshake(c, &t.infoHash)
810 return fmt.Errorf("bittorrent protocol handshake: %w", err)
812 if ih != t.infoHash {
813 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
818 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
819 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
820 func (cl *Client) forSkeys(f func([]byte) bool) {
823 if false { // Emulate the bug from #114
825 for ih := range cl.torrents {
829 for range cl.torrents {
836 for ih := range cl.torrents {
843 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
844 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
850 // Do encryption and bittorrent handshakes as receiver.
851 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
852 defer perf.ScopeTimerErr(&err)()
854 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
856 if err == nil || err == mse.ErrNoSecretKeyMatch {
857 if c.headerEncrypted {
858 torrent.Add("handshakes received encrypted", 1)
860 torrent.Add("handshakes received unencrypted", 1)
863 torrent.Add("handshakes received with error while handling encryption", 1)
866 if err == mse.ErrNoSecretKeyMatch {
871 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
872 err = errors.New("connection does not have required header obfuscation")
875 ih, err := cl.connBtHandshake(c, nil)
877 return nil, fmt.Errorf("during bt handshake: %w", err)
885 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
889 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
890 &successfulPeerWireProtocolHandshakePeerReservedBytes)
893 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
894 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
898 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
900 c.PeerExtensionBytes = res.PeerExtensionBits
901 c.PeerID = res.PeerID
902 c.completedHandshake = time.Now()
903 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
909 func (cl *Client) runReceivedConn(c *PeerConn) {
910 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
914 t, err := cl.receiveHandshakes(c)
917 "error receiving handshakes on %v: %s", c, err,
918 ).SetLevel(log.Debug).
920 "network", c.Network,
922 torrent.Add("error receiving handshake", 1)
924 cl.onBadAccept(c.RemoteAddr)
929 torrent.Add("received handshake for unloaded torrent", 1)
930 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
932 cl.onBadAccept(c.RemoteAddr)
936 torrent.Add("received handshake for loaded torrent", 1)
937 c.conn.SetWriteDeadline(time.Time{})
940 t.runHandshookConnLoggingErr(c)
943 // Client lock must be held before entering this.
944 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
946 for i, b := range cl.config.MinPeerExtensions {
947 if c.PeerExtensionBytes[i]&b != b {
948 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
951 if c.PeerID == cl.peerID {
954 addr := c.RemoteAddr.String()
955 cl.dopplegangerAddrs[addr] = struct{}{}
957 // Because the remote address is not necessarily the same as its client's torrent listen
958 // address, we won't record the remote address as a doppleganger. Instead, the initiator
959 // can record *us* as the doppleganger.
961 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
964 c.r = deadlineReader{c.conn, c.r}
965 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
966 if connIsIpv6(c.conn) {
967 torrent.Add("completed handshake over ipv6", 1)
969 if err := t.addPeerConn(c); err != nil {
970 return fmt.Errorf("adding connection: %w", err)
972 defer t.dropConnection(c)
974 cl.sendInitialMessages(c, t)
975 c.initUpdateRequestsTimer()
976 err := c.mainReadLoop()
978 return fmt.Errorf("main read loop: %w", err)
985 func (p *Peer) initUpdateRequestsTimer() {
987 if p.updateRequestsTimer != nil {
988 panic(p.updateRequestsTimer)
991 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
994 const peerUpdateRequestsTimerReason = "updateRequestsTimer"
996 func (c *Peer) updateRequestsTimerFunc() {
998 defer c.locker().Unlock()
999 if c.closed.IsSet() {
1002 if c.isLowOnRequests() {
1003 // If there are no outstanding requests, then a request update should have already run.
1006 if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
1007 // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
1008 // already been fired.
1009 torrent.Add("spurious timer requests updates", 1)
1012 c.updateRequests(peerUpdateRequestsTimerReason)
1015 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1016 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1017 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1018 const localClientReqq = 1 << 5
1020 // See the order given in Transmission's tr_peerMsgsNew.
1021 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1022 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1023 conn.write(pp.Message{
1025 ExtendedID: pp.HandshakeExtendedID,
1026 ExtendedPayload: func() []byte {
1027 msg := pp.ExtendedHandshakeMessage{
1028 M: map[pp.ExtensionName]pp.ExtensionNumber{
1029 pp.ExtensionNameMetadata: metadataExtendedId,
1031 V: cl.config.ExtendedHandshakeClientVersion,
1032 Reqq: localClientReqq,
1033 YourIp: pp.CompactIp(conn.remoteIp()),
1034 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1035 Port: cl.incomingPeerPort(),
1036 MetadataSize: torrent.metadataSize(),
1037 // TODO: We can figured these out specific to the socket
1039 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1040 Ipv6: cl.config.PublicIp6.To16(),
1042 if !cl.config.DisablePEX {
1043 msg.M[pp.ExtensionNamePex] = pexExtendedId
1045 return bencode.MustMarshal(msg)
1050 if conn.fastEnabled() {
1051 if torrent.haveAllPieces() {
1052 conn.write(pp.Message{Type: pp.HaveAll})
1053 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1055 } else if !torrent.haveAnyPieces() {
1056 conn.write(pp.Message{Type: pp.HaveNone})
1057 conn.sentHaves.Clear()
1063 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1064 conn.write(pp.Message{
1071 func (cl *Client) dhtPort() (ret uint16) {
1072 if len(cl.dhtServers) == 0 {
1075 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1078 func (cl *Client) haveDhtServer() bool {
1079 return len(cl.dhtServers) > 0
1082 // Process incoming ut_metadata message.
1083 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1084 var d pp.ExtendedMetadataRequestMsg
1085 err := bencode.Unmarshal(payload, &d)
1086 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1087 } else if err != nil {
1088 return fmt.Errorf("error unmarshalling bencode: %s", err)
1092 case pp.DataMetadataExtensionMsgType:
1093 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1094 if !c.requestedMetadataPiece(piece) {
1095 return fmt.Errorf("got unexpected piece %d", piece)
1097 c.metadataRequests[piece] = false
1098 begin := len(payload) - d.PieceSize()
1099 if begin < 0 || begin >= len(payload) {
1100 return fmt.Errorf("data has bad offset in payload: %d", begin)
1102 t.saveMetadataPiece(piece, payload[begin:])
1103 c.lastUsefulChunkReceived = time.Now()
1104 err = t.maybeCompleteMetadata()
1106 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1107 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1108 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1109 // log consumers can filter for this message.
1110 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1113 case pp.RequestMetadataExtensionMsgType:
1114 if !t.haveMetadataPiece(piece) {
1115 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1118 start := (1 << 14) * piece
1119 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1120 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1122 case pp.RejectMetadataExtensionMsgType:
1125 return errors.New("unknown msg_type value")
1129 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1130 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1131 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1133 addrStr := addr.String()
1134 for prefix := range cl.bannedPrefixes {
1135 if strings.HasPrefix(addrStr, prefix) {
1142 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1146 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1149 if _, ok := cl.ipBlockRange(ip); ok {
1152 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1158 // Return a Torrent ready for insertion into a Client.
1159 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1160 return cl.newTorrentOpt(AddTorrentOpts{
1162 Storage: specStorage,
1166 // Return a Torrent ready for insertion into a Client.
1167 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1168 // use provided storage, if provided
1169 storageClient := cl.defaultStorage
1170 if opts.Storage != nil {
1171 storageClient = storage.NewClient(opts.Storage)
1176 infoHash: opts.InfoHash,
1177 peers: prioritizedPeers{
1179 getPrio: func(p PeerInfo) peerPriority {
1181 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1184 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1186 halfOpen: make(map[string]PeerInfo),
1187 pieceStateChanges: pubsub.NewPubSub(),
1189 storageOpener: storageClient,
1190 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1192 metadataChanged: sync.Cond{
1195 webSeeds: make(map[string]*Peer),
1196 gotMetainfoC: make(chan struct{}),
1198 t.smartBanCache.Hash = sha1.Sum
1199 t.smartBanCache.Init()
1200 t.networkingEnabled.Set()
1201 t.logger = cl.logger.WithContextValue(t)
1202 if opts.ChunkSize == 0 {
1203 opts.ChunkSize = defaultChunkSize
1205 t.setChunkSize(opts.ChunkSize)
1209 // A file-like handle to some torrent data resource.
1210 type Handle interface {
1217 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1218 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1221 // Adds a torrent by InfoHash with a custom Storage implementation.
1222 // If the torrent already exists then this Storage is ignored and the
1223 // existing torrent returned with `new` set to `false`
1224 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1227 t, ok := cl.torrents[infoHash]
1233 t = cl.newTorrent(infoHash, specStorage)
1234 cl.eachDhtServer(func(s DhtServer) {
1235 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1236 go t.dhtAnnouncer(s)
1239 cl.torrents[infoHash] = t
1240 cl.clearAcceptLimits()
1241 t.updateWantPeersEvent()
1242 // Tickle Client.waitAccept, new torrent may want conns.
1243 cl.event.Broadcast()
1247 // Adds a torrent by InfoHash with a custom Storage implementation.
1248 // If the torrent already exists then this Storage is ignored and the
1249 // existing torrent returned with `new` set to `false`
1250 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1251 infoHash := opts.InfoHash
1254 t, ok := cl.torrents[infoHash]
1260 t = cl.newTorrentOpt(opts)
1261 cl.eachDhtServer(func(s DhtServer) {
1262 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1263 go t.dhtAnnouncer(s)
1266 cl.torrents[infoHash] = t
1267 cl.clearAcceptLimits()
1268 t.updateWantPeersEvent()
1269 // Tickle Client.waitAccept, new torrent may want conns.
1270 cl.event.Broadcast()
1274 type AddTorrentOpts struct {
1276 Storage storage.ClientImpl
1277 ChunkSize pp.Integer
1280 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1281 // Torrent.MergeSpec.
1282 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1283 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1284 InfoHash: spec.InfoHash,
1285 Storage: spec.Storage,
1286 ChunkSize: spec.ChunkSize,
1290 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1292 modSpec.ChunkSize = 0
1294 err = t.MergeSpec(&modSpec)
1295 if err != nil && new {
1301 type stringAddr string
1303 var _ net.Addr = stringAddr("")
1305 func (stringAddr) Network() string { return "" }
1306 func (me stringAddr) String() string { return string(me) }
1308 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1309 // spec.DisallowDataDownload/Upload will be read and applied
1310 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1311 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1312 if spec.DisplayName != "" {
1313 t.SetDisplayName(spec.DisplayName)
1315 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1316 if spec.InfoBytes != nil {
1317 err := t.SetInfoBytes(spec.InfoBytes)
1323 cl.AddDhtNodes(spec.DhtNodes)
1326 useTorrentSources(spec.Sources, t)
1327 for _, url := range spec.Webseeds {
1330 for _, peerAddr := range spec.PeerAddrs {
1332 Addr: stringAddr(peerAddr),
1333 Source: PeerSourceDirect,
1337 if spec.ChunkSize != 0 {
1338 panic("chunk size cannot be changed for existing Torrent")
1340 t.addTrackers(spec.Trackers)
1342 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1343 t.dataUploadDisallowed = spec.DisallowDataUpload
1347 func useTorrentSources(sources []string, t *Torrent) {
1348 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1349 ctx := context.Background()
1350 for i := 0; i < len(sources); i += 1 {
1353 if err := useTorrentSource(ctx, s, t); err != nil {
1354 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1356 t.logger.Printf("successfully used source %q", s)
1362 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1363 ctx, cancel := context.WithCancel(ctx)
1373 var req *http.Request
1374 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1377 var resp *http.Response
1378 if resp, err = http.DefaultClient.Do(req); err != nil {
1381 var mi metainfo.MetaInfo
1382 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1385 if ctx.Err() != nil {
1390 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1393 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1394 t, ok := cl.torrents[infoHash]
1396 err = fmt.Errorf("no such torrent")
1403 delete(cl.torrents, infoHash)
1407 func (cl *Client) allTorrentsCompleted() bool {
1408 for _, t := range cl.torrents {
1412 if !t.haveAllPieces() {
1419 // Returns true when all torrents are completely downloaded and false if the
1420 // client is stopped before that.
1421 func (cl *Client) WaitAll() bool {
1424 for !cl.allTorrentsCompleted() {
1425 if cl.closed.IsSet() {
1433 // Returns handles to all the torrents loaded in the Client.
1434 func (cl *Client) Torrents() []*Torrent {
1437 return cl.torrentsAsSlice()
1440 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1441 for _, t := range cl.torrents {
1442 ret = append(ret, t)
1447 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1448 spec, err := TorrentSpecFromMagnetUri(uri)
1452 T, _, err = cl.AddTorrentSpec(spec)
1456 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1457 ts, err := TorrentSpecFromMetaInfoErr(mi)
1461 T, _, err = cl.AddTorrentSpec(ts)
1465 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1466 mi, err := metainfo.LoadFromFile(filename)
1470 return cl.AddTorrent(mi)
1473 func (cl *Client) DhtServers() []DhtServer {
1474 return cl.dhtServers
1477 func (cl *Client) AddDhtNodes(nodes []string) {
1478 for _, n := range nodes {
1479 hmp := missinggo.SplitHostMaybePort(n)
1480 ip := net.ParseIP(hmp.Host)
1482 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1485 ni := krpc.NodeInfo{
1486 Addr: krpc.NodeAddr{
1491 cl.eachDhtServer(func(s DhtServer) {
1497 func (cl *Client) banPeerIP(ip net.IP) {
1498 cl.logger.Printf("banning ip %v", ip)
1499 if cl.badPeerIPs == nil {
1500 cl.badPeerIPs = make(map[string]struct{})
1502 cl.badPeerIPs[ip.String()] = struct{}{}
1505 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1514 PeerMaxRequests: 250,
1516 RemoteAddr: remoteAddr,
1518 callbacks: &cl.config.Callbacks,
1520 connString: connString,
1523 if remoteAddr != nil {
1524 c.banPrefix = option.Some(remoteAddr.String())
1527 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1528 c.setRW(connStatsReadWriter{nc, c})
1529 c.r = &rateLimitedReader{
1530 l: cl.config.DownloadRateLimiter,
1533 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1534 for _, f := range cl.config.Callbacks.NewPeer {
1540 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1547 t.addPeers([]PeerInfo{{
1548 Addr: ipPortAddr{ip, port},
1549 Source: PeerSourceDhtAnnouncePeer,
1553 func firstNotNil(ips ...net.IP) net.IP {
1554 for _, ip := range ips {
1562 func (cl *Client) eachListener(f func(Listener) bool) {
1563 for _, s := range cl.listeners {
1570 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1571 for i := 0; i < len(cl.listeners); i += 1 {
1572 if ret = cl.listeners[i]; f(ret) {
1579 func (cl *Client) publicIp(peer net.IP) net.IP {
1580 // TODO: Use BEP 10 to determine how peers are seeing us.
1581 if peer.To4() != nil {
1583 cl.config.PublicIp4,
1584 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1589 cl.config.PublicIp6,
1590 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1594 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1595 l := cl.findListener(
1596 func(l Listener) bool {
1597 return f(addrIpOrNil(l.Addr()))
1603 return addrIpOrNil(l.Addr())
1606 // Our IP as a peer should see it.
1607 func (cl *Client) publicAddr(peer net.IP) IpPort {
1608 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1611 // ListenAddrs addresses currently being listened to.
1612 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1614 ret = make([]net.Addr, len(cl.listeners))
1615 for i := 0; i < len(cl.listeners); i += 1 {
1616 ret[i] = cl.listeners[i].Addr()
1622 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1623 ipa, ok := tryIpPortFromNetAddr(addr)
1627 ip := maskIpForAcceptLimiting(ipa.IP)
1628 if cl.acceptLimiter == nil {
1629 cl.acceptLimiter = make(map[ipStr]int)
1631 cl.acceptLimiter[ipStr(ip.String())]++
1634 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1635 if ip4 := ip.To4(); ip4 != nil {
1636 return ip4.Mask(net.CIDRMask(24, 32))
1641 func (cl *Client) clearAcceptLimits() {
1642 cl.acceptLimiter = nil
1645 func (cl *Client) acceptLimitClearer() {
1648 case <-cl.closed.Done():
1650 case <-time.After(15 * time.Minute):
1652 cl.clearAcceptLimits()
1658 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1659 if cl.config.DisableAcceptRateLimiting {
1662 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1665 func (cl *Client) rLock() {
1669 func (cl *Client) rUnlock() {
1673 func (cl *Client) lock() {
1677 func (cl *Client) unlock() {
1681 func (cl *Client) locker() *lockWithDeferreds {
1685 func (cl *Client) String() string {
1686 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1689 // Returns connection-level aggregate stats at the Client level. See the comment on
1690 // TorrentStats.ConnStats.
1691 func (cl *Client) ConnStats() ConnStats {
1692 return cl.stats.Copy()
1695 func (cl *Client) banPrefix(prefix banPrefix) {
1696 cl.bannedPrefixes[prefix] = struct{}{}