18 "github.com/anacrolix/chansync/events"
19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/v2"
25 "github.com/anacrolix/missinggo/v2/bitmap"
26 "github.com/anacrolix/missinggo/v2/pproffd"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed chansync.SetOnce
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() (ips []string) {
91 ips = cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() (ips []string) {
97 ips = make([]string, len(cl.badPeerIPs))
99 for k := range cl.badPeerIPs {
106 func (cl *Client) PeerID() PeerID {
110 // Returns the port number for the first listener that has one. No longer assumes that all port
111 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
113 func (cl *Client) LocalPort() (port int) {
114 for i := 0; i < len(cl.listeners); i += 1 {
115 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
122 func writeDhtServerStatus(w io.Writer, s DhtServer) {
123 dhtStats := s.Stats()
124 fmt.Fprintf(w, " ID: %x\n", s.ID())
125 spew.Fdump(w, dhtStats)
128 // Writes out a human readable status of the client, such as for writing to a
130 func (cl *Client) WriteStatus(_w io.Writer) {
133 w := bufio.NewWriter(_w)
135 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
136 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
137 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
138 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
139 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
140 cl.eachDhtServer(func(s DhtServer) {
141 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
142 writeDhtServerStatus(w, s)
144 spew.Fdump(w, &cl.stats)
145 torrentsSlice := cl.torrentsAsSlice()
146 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
148 sort.Slice(torrentsSlice, func(l, r int) bool {
149 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
151 for _, t := range torrentsSlice {
153 fmt.Fprint(w, "<unknown name>")
155 fmt.Fprint(w, t.name())
161 "%f%% of %d bytes (%s)",
162 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
164 humanize.Bytes(uint64(*t.length)))
166 w.WriteString("<missing metainfo>")
174 // Filters things that are less than warning from UPnP discovery.
175 func upnpDiscoverLogFilter(m log.Msg) bool {
176 level, ok := m.GetLevel()
177 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
180 func (cl *Client) initLogger() {
181 logger := cl.config.Logger
184 if !cl.config.Debug {
185 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
188 cl.logger = logger.WithValues(cl)
191 func (cl *Client) announceKey() int32 {
192 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
195 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
196 func (cl *Client) init(cfg *ClientConfig) {
198 cl.dopplegangerAddrs = make(map[string]struct{})
199 cl.torrents = make(map[metainfo.Hash]*Torrent)
200 cl.dialRateLimiter = rate.NewLimiter(10, 10)
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 cl.event.L = cl.locker()
204 cl.ipBlockList = cfg.IPBlocklist
207 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
209 cfg = NewDefaultClientConfig()
215 go cl.acceptLimitClearer()
224 storageImpl := cfg.DefaultStorage
225 if storageImpl == nil {
226 // We'd use mmap by default but HFS+ doesn't support sparse files.
227 storageImplCloser := storage.NewFile(cfg.DataDir)
228 cl.onClose = append(cl.onClose, func() {
229 if err := storageImplCloser.Close(); err != nil {
230 cl.logger.Printf("error closing default storage: %s", err)
233 storageImpl = storageImplCloser
235 cl.defaultStorage = storage.NewClient(storageImpl)
237 if cfg.PeerID != "" {
238 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
240 o := copy(cl.peerID[:], cfg.Bep20)
241 _, err = rand.Read(cl.peerID[o:])
243 panic("error generating peer id")
247 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
255 for _, _s := range sockets {
256 s := _s // Go is fucking retarded.
257 cl.onClose = append(cl.onClose, func() { s.Close() })
258 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
259 cl.dialers = append(cl.dialers, s)
260 cl.listeners = append(cl.listeners, s)
261 if cl.config.AcceptPeerConnections {
262 go cl.acceptConnections(s)
269 for _, s := range sockets {
270 if pc, ok := s.(net.PacketConn); ok {
271 ds, err := cl.NewAnacrolixDhtServer(pc)
275 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
276 cl.onClose = append(cl.onClose, func() { ds.Close() })
281 cl.websocketTrackers = websocketTrackers{
284 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
287 t, ok := cl.torrents[infoHash]
289 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
291 return t.announceRequest(event), nil
293 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
296 t, ok := cl.torrents[dcc.InfoHash]
298 cl.logger.WithDefaultLevel(log.Warning).Printf(
299 "got webrtc conn for unloaded torrent with infohash %x",
305 go t.onWebRtcConn(dc, dcc)
316 func (cl *Client) AddDhtServer(d DhtServer) {
317 cl.dhtServers = append(cl.dhtServers, d)
320 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
321 // given address for any Torrent.
322 func (cl *Client) AddDialer(d Dialer) {
325 cl.dialers = append(cl.dialers, d)
326 for _, t := range cl.torrents {
331 func (cl *Client) Listeners() []Listener {
335 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
337 func (cl *Client) AddListener(l Listener) {
338 cl.listeners = append(cl.listeners, l)
339 if cl.config.AcceptPeerConnections {
340 go cl.acceptConnections(l)
344 func (cl *Client) firewallCallback(net.Addr) bool {
346 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
349 torrent.Add("connections firewalled", 1)
351 torrent.Add("connections not firewalled", 1)
356 func (cl *Client) listenOnNetwork(n network) bool {
357 if n.Ipv4 && cl.config.DisableIPv4 {
360 if n.Ipv6 && cl.config.DisableIPv6 {
363 if n.Tcp && cl.config.DisableTCP {
366 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
372 func (cl *Client) listenNetworks() (ns []network) {
373 for _, n := range allPeerNetworks {
374 if cl.listenOnNetwork(n) {
381 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
382 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
383 cfg := dht.ServerConfig{
384 IPBlocklist: cl.ipBlockList,
386 OnAnnouncePeer: cl.onDHTAnnouncePeer,
387 PublicIP: func() net.IP {
388 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
389 return cl.config.PublicIp6
391 return cl.config.PublicIp4
393 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
394 OnQuery: cl.config.DHTOnQuery,
395 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
397 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
400 s, err = dht.NewServer(&cfg)
403 ts, err := s.Bootstrap()
405 cl.logger.Printf("error bootstrapping dht: %s", err)
407 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
413 func (cl *Client) Closed() events.Done {
414 return cl.closed.Done()
417 func (cl *Client) eachDhtServer(f func(DhtServer)) {
418 for _, ds := range cl.dhtServers {
423 // Stops the client. All connections to peers are closed and all activity will
425 func (cl *Client) Close() {
427 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
430 for _, t := range cl.torrents {
434 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
436 for i := range cl.onClose {
437 cl.onClose[len(cl.onClose)-1-i]()
442 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
443 if cl.ipBlockList == nil {
446 return cl.ipBlockList.Lookup(ip)
449 func (cl *Client) ipIsBlocked(ip net.IP) bool {
450 _, blocked := cl.ipBlockRange(ip)
454 func (cl *Client) wantConns() bool {
455 if cl.config.AlwaysWantConns {
458 for _, t := range cl.torrents {
466 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
467 func (cl *Client) rejectAccepted(conn net.Conn) error {
469 return errors.New("don't want conns right now")
471 ra := conn.RemoteAddr()
472 if rip := addrIpOrNil(ra); rip != nil {
473 if cl.config.DisableIPv4Peers && rip.To4() != nil {
474 return errors.New("ipv4 peers disabled")
476 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
477 return errors.New("ipv4 disabled")
480 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
481 return errors.New("ipv6 disabled")
483 if cl.rateLimitAccept(rip) {
484 return errors.New("source IP accepted rate limited")
486 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
487 return errors.New("bad source addr")
493 func (cl *Client) acceptConnections(l Listener) {
495 conn, err := l.Accept()
496 torrent.Add("client listener accepts", 1)
497 conn = pproffd.WrapNetConn(conn)
499 closed := cl.closed.IsSet()
502 reject = cl.rejectAccepted(conn)
512 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
517 torrent.Add("rejected accepted connections", 1)
518 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
521 go cl.incomingConnection(conn)
523 log.Fmsg("accepted %q connection at %q from %q",
527 ).SetLevel(log.Debug).Log(cl.logger)
528 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
529 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
530 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
535 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
536 func regularNetConnPeerConnConnString(nc net.Conn) string {
537 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
540 func (cl *Client) incomingConnection(nc net.Conn) {
542 if tc, ok := nc.(*net.TCPConn); ok {
545 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
546 regularNetConnPeerConnConnString(nc))
552 c.Discovery = PeerSourceIncoming
553 cl.runReceivedConn(c)
556 // Returns a handle to the given torrent, if it's present in the client.
557 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
560 t, ok = cl.torrents[ih]
564 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
565 return cl.torrents[ih]
568 type DialResult struct {
573 func countDialResult(err error) {
575 torrent.Add("successful dials", 1)
577 torrent.Add("unsuccessful dials", 1)
581 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
582 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
583 if ret < minDialTimeout {
589 // Returns whether an address is known to connect to a client with our own ID.
590 func (cl *Client) dopplegangerAddr(addr string) bool {
591 _, ok := cl.dopplegangerAddrs[addr]
595 // Returns a connection over UTP or TCP, whichever is first to connect.
596 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
597 return DialFirst(ctx, addr, cl.dialers)
600 // Returns a connection over UTP or TCP, whichever is first to connect.
601 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
603 t := perf.NewTimer(perf.CallerName(0))
606 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
608 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
612 ctx, cancel := context.WithCancel(ctx)
613 // As soon as we return one connection, cancel the others.
616 resCh := make(chan DialResult, left)
617 for _, _s := range dialers {
622 dialFromSocket(ctx, s, addr),
627 // Wait for a successful connection.
629 defer perf.ScopeTimer()()
630 for ; left > 0 && res.Conn == nil; left-- {
634 // There are still incompleted dials.
636 for ; left > 0; left-- {
637 conn := (<-resCh).Conn
644 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
649 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
650 c, err := s.Dial(ctx, addr)
651 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
652 // it now in case we close the connection forthwith.
653 if tc, ok := c.(*net.TCPConn); ok {
660 func forgettableDialError(err error) bool {
661 return strings.Contains(err.Error(), "no suitable address found")
664 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
665 if _, ok := t.halfOpen[addr]; !ok {
666 panic("invariant broken")
668 delete(t.halfOpen, addr)
670 for _, t := range cl.torrents {
675 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
676 // for valid reasons.
677 func (cl *Client) initiateProtocolHandshakes(
681 outgoing, encryptHeader bool,
682 remoteAddr PeerRemoteAddr,
683 network, connString string,
685 c *PeerConn, err error,
687 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
688 c.headerEncrypted = encryptHeader
689 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
691 dl, ok := ctx.Deadline()
695 err = nc.SetDeadline(dl)
699 err = cl.initiateHandshakes(c, t)
703 // Returns nil connection and nil error if no connection could be established for valid reasons.
704 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
705 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
708 return t.dialTimeout()
711 dr := cl.dialFirst(dialCtx, addr.String())
714 if dialCtx.Err() != nil {
715 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
717 return nil, errors.New("dial failed")
719 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
726 // Returns nil connection and nil error if no connection could be established
727 // for valid reasons.
728 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
729 torrent.Add("establish outgoing connection", 1)
730 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
731 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
733 torrent.Add("initiated conn with preferred header obfuscation", 1)
736 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
737 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
738 // We should have just tried with the preferred header obfuscation. If it was required,
739 // there's nothing else to try.
742 // Try again with encryption if we didn't earlier, or without if we did.
743 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
745 torrent.Add("initiated conn with fallback header obfuscation", 1)
747 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
751 // Called to dial out and run a connection. The addr we're given is already
752 // considered half-open.
753 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
754 cl.dialRateLimiter.Wait(context.Background())
755 c, err := cl.establishOutgoingConn(t, addr)
758 // Don't release lock between here and addPeerConn, unless it's for
760 cl.noLongerHalfOpen(t, addr.String())
763 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
770 t.runHandshookConnLoggingErr(c)
773 // The port number for incoming peer connections. 0 if the client isn't listening.
774 func (cl *Client) incomingPeerPort() int {
775 return cl.LocalPort()
778 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
779 if c.headerEncrypted {
782 rw, c.cryptoMethod, err = mse.InitiateHandshake(
789 cl.config.CryptoProvides,
793 return fmt.Errorf("header obfuscation handshake: %w", err)
796 ih, err := cl.connBtHandshake(c, &t.infoHash)
798 return fmt.Errorf("bittorrent protocol handshake: %w", err)
800 if ih != t.infoHash {
801 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
806 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
807 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
808 func (cl *Client) forSkeys(f func([]byte) bool) {
811 if false { // Emulate the bug from #114
813 for ih := range cl.torrents {
817 for range cl.torrents {
824 for ih := range cl.torrents {
831 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
832 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
838 // Do encryption and bittorrent handshakes as receiver.
839 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
840 defer perf.ScopeTimerErr(&err)()
842 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
844 if err == nil || err == mse.ErrNoSecretKeyMatch {
845 if c.headerEncrypted {
846 torrent.Add("handshakes received encrypted", 1)
848 torrent.Add("handshakes received unencrypted", 1)
851 torrent.Add("handshakes received with error while handling encryption", 1)
854 if err == mse.ErrNoSecretKeyMatch {
859 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
860 err = errors.New("connection does not have required header obfuscation")
863 ih, err := cl.connBtHandshake(c, nil)
865 return nil, fmt.Errorf("during bt handshake: %w", err)
873 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
874 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
879 c.PeerExtensionBytes = res.PeerExtensionBits
880 c.PeerID = res.PeerID
881 c.completedHandshake = time.Now()
882 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
888 func (cl *Client) runReceivedConn(c *PeerConn) {
889 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
893 t, err := cl.receiveHandshakes(c)
896 "error receiving handshakes on %v: %s", c, err,
897 ).SetLevel(log.Debug).
899 "network", c.Network,
901 torrent.Add("error receiving handshake", 1)
903 cl.onBadAccept(c.RemoteAddr)
908 torrent.Add("received handshake for unloaded torrent", 1)
909 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
911 cl.onBadAccept(c.RemoteAddr)
915 torrent.Add("received handshake for loaded torrent", 1)
918 t.runHandshookConnLoggingErr(c)
921 // Client lock must be held before entering this.
922 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
924 if c.PeerID == cl.peerID {
927 addr := c.conn.RemoteAddr().String()
928 cl.dopplegangerAddrs[addr] = struct{}{}
930 // Because the remote address is not necessarily the same as its client's torrent listen
931 // address, we won't record the remote address as a doppleganger. Instead, the initiator
932 // can record *us* as the doppleganger.
934 return errors.New("local and remote peer ids are the same")
936 c.conn.SetWriteDeadline(time.Time{})
937 c.r = deadlineReader{c.conn, c.r}
938 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
939 if connIsIpv6(c.conn) {
940 torrent.Add("completed handshake over ipv6", 1)
942 if err := t.addPeerConn(c); err != nil {
943 return fmt.Errorf("adding connection: %w", err)
945 defer t.dropConnection(c)
947 cl.sendInitialMessages(c, t)
948 err := c.mainReadLoop()
950 return fmt.Errorf("main read loop: %w", err)
955 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
956 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
957 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
958 const localClientReqq = 1 << 5
960 // See the order given in Transmission's tr_peerMsgsNew.
961 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
962 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
963 conn.write(pp.Message{
965 ExtendedID: pp.HandshakeExtendedID,
966 ExtendedPayload: func() []byte {
967 msg := pp.ExtendedHandshakeMessage{
968 M: map[pp.ExtensionName]pp.ExtensionNumber{
969 pp.ExtensionNameMetadata: metadataExtendedId,
971 V: cl.config.ExtendedHandshakeClientVersion,
972 Reqq: localClientReqq,
973 YourIp: pp.CompactIp(conn.remoteIp()),
974 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
975 Port: cl.incomingPeerPort(),
976 MetadataSize: torrent.metadataSize(),
977 // TODO: We can figured these out specific to the socket
979 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
980 Ipv6: cl.config.PublicIp6.To16(),
982 if !cl.config.DisablePEX {
983 msg.M[pp.ExtensionNamePex] = pexExtendedId
985 return bencode.MustMarshal(msg)
990 if conn.fastEnabled() {
991 if torrent.haveAllPieces() {
992 conn.write(pp.Message{Type: pp.HaveAll})
993 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
995 } else if !torrent.haveAnyPieces() {
996 conn.write(pp.Message{Type: pp.HaveNone})
997 conn.sentHaves.Clear()
1003 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1004 conn.write(pp.Message{
1011 func (cl *Client) dhtPort() (ret uint16) {
1012 if len(cl.dhtServers) == 0 {
1015 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1018 func (cl *Client) haveDhtServer() bool {
1019 return len(cl.dhtServers) > 0
1022 // Process incoming ut_metadata message.
1023 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1024 var d pp.ExtendedMetadataRequestMsg
1025 err := bencode.Unmarshal(payload, &d)
1026 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1027 } else if err != nil {
1028 return fmt.Errorf("error unmarshalling bencode: %s", err)
1032 case pp.DataMetadataExtensionMsgType:
1033 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1034 if !c.requestedMetadataPiece(piece) {
1035 return fmt.Errorf("got unexpected piece %d", piece)
1037 c.metadataRequests[piece] = false
1038 begin := len(payload) - d.PieceSize()
1039 if begin < 0 || begin >= len(payload) {
1040 return fmt.Errorf("data has bad offset in payload: %d", begin)
1042 t.saveMetadataPiece(piece, payload[begin:])
1043 c.lastUsefulChunkReceived = time.Now()
1044 err = t.maybeCompleteMetadata()
1046 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1047 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1048 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1049 // log consumers can filter for this message.
1050 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1053 case pp.RequestMetadataExtensionMsgType:
1054 if !t.haveMetadataPiece(piece) {
1055 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1058 start := (1 << 14) * piece
1059 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1060 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1062 case pp.RejectMetadataExtensionMsgType:
1065 return errors.New("unknown msg_type value")
1069 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1070 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1071 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1076 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1080 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1083 if _, ok := cl.ipBlockRange(ip); ok {
1086 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1092 // Return a Torrent ready for insertion into a Client.
1093 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1094 // use provided storage, if provided
1095 storageClient := cl.defaultStorage
1096 if specStorage != nil {
1097 storageClient = storage.NewClient(specStorage)
1103 peers: prioritizedPeers{
1105 getPrio: func(p PeerInfo) peerPriority {
1107 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1110 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1112 halfOpen: make(map[string]PeerInfo),
1113 pieceStateChanges: pubsub.NewPubSub(),
1115 storageOpener: storageClient,
1116 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1118 metadataChanged: sync.Cond{
1121 webSeeds: make(map[string]*Peer),
1122 gotMetainfoC: make(chan struct{}),
1124 t.networkingEnabled.Set()
1125 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1126 t.logger = cl.logger.WithContextValue(t)
1127 t.setChunkSize(defaultChunkSize)
1131 // A file-like handle to some torrent data resource.
1132 type Handle interface {
1139 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1140 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1143 // Adds a torrent by InfoHash with a custom Storage implementation.
1144 // If the torrent already exists then this Storage is ignored and the
1145 // existing torrent returned with `new` set to `false`
1146 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1149 t, ok := cl.torrents[infoHash]
1155 t = cl.newTorrent(infoHash, specStorage)
1156 cl.eachDhtServer(func(s DhtServer) {
1157 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1158 go t.dhtAnnouncer(s)
1161 cl.torrents[infoHash] = t
1162 cl.clearAcceptLimits()
1163 t.updateWantPeersEvent()
1164 // Tickle Client.waitAccept, new torrent may want conns.
1165 cl.event.Broadcast()
1169 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1170 // Torrent.MergeSpec.
1171 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1172 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1173 err = t.MergeSpec(spec)
1174 if err != nil && new {
1180 type stringAddr string
1182 var _ net.Addr = stringAddr("")
1184 func (stringAddr) Network() string { return "" }
1185 func (me stringAddr) String() string { return string(me) }
1187 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1188 // spec.DisallowDataDownload/Upload will be read and applied
1189 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1190 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1191 if spec.DisplayName != "" {
1192 t.SetDisplayName(spec.DisplayName)
1194 if spec.InfoBytes != nil {
1195 err := t.SetInfoBytes(spec.InfoBytes)
1201 cl.AddDhtNodes(spec.DhtNodes)
1204 useTorrentSources(spec.Sources, t)
1205 for _, url := range spec.Webseeds {
1208 for _, peerAddr := range spec.PeerAddrs {
1210 Addr: stringAddr(peerAddr),
1211 Source: PeerSourceDirect,
1215 if spec.ChunkSize != 0 {
1216 t.setChunkSize(pp.Integer(spec.ChunkSize))
1218 t.addTrackers(spec.Trackers)
1220 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1221 t.dataUploadDisallowed = spec.DisallowDataUpload
1225 func useTorrentSources(sources []string, t *Torrent) {
1226 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1227 ctx := context.Background()
1228 for i := 0; i < len(sources); i += 1 {
1231 if err := useTorrentSource(ctx, s, t); err != nil {
1232 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1234 t.logger.Printf("successfully used source %q", s)
1240 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1241 ctx, cancel := context.WithCancel(ctx)
1251 var req *http.Request
1252 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1255 var resp *http.Response
1256 if resp, err = http.DefaultClient.Do(req); err != nil {
1259 var mi metainfo.MetaInfo
1260 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1263 if ctx.Err() != nil {
1268 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1271 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1272 t, ok := cl.torrents[infoHash]
1274 err = fmt.Errorf("no such torrent")
1281 delete(cl.torrents, infoHash)
1285 func (cl *Client) allTorrentsCompleted() bool {
1286 for _, t := range cl.torrents {
1290 if !t.haveAllPieces() {
1297 // Returns true when all torrents are completely downloaded and false if the
1298 // client is stopped before that.
1299 func (cl *Client) WaitAll() bool {
1302 for !cl.allTorrentsCompleted() {
1303 if cl.closed.IsSet() {
1311 // Returns handles to all the torrents loaded in the Client.
1312 func (cl *Client) Torrents() []*Torrent {
1315 return cl.torrentsAsSlice()
1318 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1319 for _, t := range cl.torrents {
1320 ret = append(ret, t)
1325 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1326 spec, err := TorrentSpecFromMagnetUri(uri)
1330 T, _, err = cl.AddTorrentSpec(spec)
1334 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1335 ts, err := TorrentSpecFromMetaInfoErr(mi)
1339 T, _, err = cl.AddTorrentSpec(ts)
1343 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1344 mi, err := metainfo.LoadFromFile(filename)
1348 return cl.AddTorrent(mi)
1351 func (cl *Client) DhtServers() []DhtServer {
1352 return cl.dhtServers
1355 func (cl *Client) AddDhtNodes(nodes []string) {
1356 for _, n := range nodes {
1357 hmp := missinggo.SplitHostMaybePort(n)
1358 ip := net.ParseIP(hmp.Host)
1360 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1363 ni := krpc.NodeInfo{
1364 Addr: krpc.NodeAddr{
1369 cl.eachDhtServer(func(s DhtServer) {
1375 func (cl *Client) banPeerIP(ip net.IP) {
1376 cl.logger.Printf("banning ip %v", ip)
1377 if cl.badPeerIPs == nil {
1378 cl.badPeerIPs = make(map[string]struct{})
1380 cl.badPeerIPs[ip.String()] = struct{}{}
1383 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1392 PeerMaxRequests: 250,
1394 RemoteAddr: remoteAddr,
1396 callbacks: &cl.config.Callbacks,
1398 connString: connString,
1402 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1403 c.setRW(connStatsReadWriter{nc, c})
1404 c.r = &rateLimitedReader{
1405 l: cl.config.DownloadRateLimiter,
1408 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1409 for _, f := range cl.config.Callbacks.NewPeer {
1415 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1422 t.addPeers([]PeerInfo{{
1423 Addr: ipPortAddr{ip, port},
1424 Source: PeerSourceDhtAnnouncePeer,
1428 func firstNotNil(ips ...net.IP) net.IP {
1429 for _, ip := range ips {
1437 func (cl *Client) eachListener(f func(Listener) bool) {
1438 for _, s := range cl.listeners {
1445 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1446 for i := 0; i < len(cl.listeners); i += 1 {
1447 if ret = cl.listeners[i]; f(ret) {
1454 func (cl *Client) publicIp(peer net.IP) net.IP {
1455 // TODO: Use BEP 10 to determine how peers are seeing us.
1456 if peer.To4() != nil {
1458 cl.config.PublicIp4,
1459 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1464 cl.config.PublicIp6,
1465 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1469 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1470 l := cl.findListener(
1471 func(l Listener) bool {
1472 return f(addrIpOrNil(l.Addr()))
1478 return addrIpOrNil(l.Addr())
1481 // Our IP as a peer should see it.
1482 func (cl *Client) publicAddr(peer net.IP) IpPort {
1483 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1486 // ListenAddrs addresses currently being listened to.
1487 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1489 ret = make([]net.Addr, len(cl.listeners))
1490 for i := 0; i < len(cl.listeners); i += 1 {
1491 ret[i] = cl.listeners[i].Addr()
1497 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1498 ipa, ok := tryIpPortFromNetAddr(addr)
1502 ip := maskIpForAcceptLimiting(ipa.IP)
1503 if cl.acceptLimiter == nil {
1504 cl.acceptLimiter = make(map[ipStr]int)
1506 cl.acceptLimiter[ipStr(ip.String())]++
1509 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1510 if ip4 := ip.To4(); ip4 != nil {
1511 return ip4.Mask(net.CIDRMask(24, 32))
1516 func (cl *Client) clearAcceptLimits() {
1517 cl.acceptLimiter = nil
1520 func (cl *Client) acceptLimitClearer() {
1523 case <-cl.closed.Done():
1525 case <-time.After(15 * time.Minute):
1527 cl.clearAcceptLimits()
1533 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1534 if cl.config.DisableAcceptRateLimiting {
1537 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1540 func (cl *Client) rLock() {
1544 func (cl *Client) rUnlock() {
1548 func (cl *Client) lock() {
1552 func (cl *Client) unlock() {
1556 func (cl *Client) locker() *lockWithDeferreds {
1560 func (cl *Client) String() string {
1561 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1564 // Returns connection-level aggregate stats at the Client level. See the comment on
1565 // TorrentStats.ConnStats.
1566 func (cl *Client) ConnStats() ConnStats {
1567 return cl.stats.Copy()