20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
89 func (cl *Client) BadPeerIPs() (ips []string) {
91 ips = cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() (ips []string) {
97 ips = make([]string, len(cl.badPeerIPs))
99 for k := range cl.badPeerIPs {
106 func (cl *Client) PeerID() PeerID {
110 // Returns the port number for the first listener that has one. No longer assumes that all port
111 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
113 func (cl *Client) LocalPort() (port int) {
114 for i := 0; i < len(cl.listeners); i += 1 {
115 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
122 func writeDhtServerStatus(w io.Writer, s DhtServer) {
123 dhtStats := s.Stats()
124 fmt.Fprintf(w, " ID: %x\n", s.ID())
125 spew.Fdump(w, dhtStats)
128 // Writes out a human readable status of the client, such as for writing to a
130 func (cl *Client) WriteStatus(_w io.Writer) {
133 w := bufio.NewWriter(_w)
135 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
136 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
137 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
138 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
139 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
140 cl.eachDhtServer(func(s DhtServer) {
141 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
142 writeDhtServerStatus(w, s)
144 spew.Fdump(w, &cl.stats)
145 torrentsSlice := cl.torrentsAsSlice()
146 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
148 sort.Slice(torrentsSlice, func(l, r int) bool {
149 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
151 for _, t := range torrentsSlice {
153 fmt.Fprint(w, "<unknown name>")
155 fmt.Fprint(w, t.name())
161 "%f%% of %d bytes (%s)",
162 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
164 humanize.Bytes(uint64(*t.length)))
166 w.WriteString("<missing metainfo>")
174 // Filters things that are less than warning from UPnP discovery.
175 func upnpDiscoverLogFilter(m log.Msg) bool {
176 level, ok := m.GetLevel()
177 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
180 func (cl *Client) initLogger() {
181 logger := cl.config.Logger
184 if !cl.config.Debug {
185 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
188 cl.logger = logger.WithValues(cl)
191 func (cl *Client) announceKey() int32 {
192 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
195 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
196 func (cl *Client) init(cfg *ClientConfig) {
198 cl.dopplegangerAddrs = make(map[string]struct{})
199 cl.torrents = make(map[metainfo.Hash]*Torrent)
200 cl.dialRateLimiter = rate.NewLimiter(10, 10)
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 cl.event.L = cl.locker()
204 cl.ipBlockList = cfg.IPBlocklist
207 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
209 cfg = NewDefaultClientConfig()
215 go cl.acceptLimitClearer()
224 storageImpl := cfg.DefaultStorage
225 if storageImpl == nil {
226 // We'd use mmap by default but HFS+ doesn't support sparse files.
227 storageImplCloser := storage.NewFile(cfg.DataDir)
228 cl.onClose = append(cl.onClose, func() {
229 if err := storageImplCloser.Close(); err != nil {
230 cl.logger.Printf("error closing default storage: %s", err)
233 storageImpl = storageImplCloser
235 cl.defaultStorage = storage.NewClient(storageImpl)
237 if cfg.PeerID != "" {
238 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
240 o := copy(cl.peerID[:], cfg.Bep20)
241 _, err = rand.Read(cl.peerID[o:])
243 panic("error generating peer id")
247 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
255 for _, _s := range sockets {
256 s := _s // Go is fucking retarded.
257 cl.onClose = append(cl.onClose, func() { s.Close() })
258 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
259 cl.dialers = append(cl.dialers, s)
260 cl.listeners = append(cl.listeners, s)
261 if cl.config.AcceptPeerConnections {
262 go cl.acceptConnections(s)
269 for _, s := range sockets {
270 if pc, ok := s.(net.PacketConn); ok {
271 ds, err := cl.NewAnacrolixDhtServer(pc)
275 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
276 cl.onClose = append(cl.onClose, func() { ds.Close() })
281 cl.websocketTrackers = websocketTrackers{
284 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
287 t, ok := cl.torrents[infoHash]
289 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
291 return t.announceRequest(event), nil
293 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
296 t, ok := cl.torrents[dcc.InfoHash]
298 cl.logger.WithDefaultLevel(log.Warning).Printf(
299 "got webrtc conn for unloaded torrent with infohash %x",
305 go t.onWebRtcConn(dc, dcc)
312 func (cl *Client) AddDhtServer(d DhtServer) {
313 cl.dhtServers = append(cl.dhtServers, d)
316 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
317 // given address for any Torrent.
318 func (cl *Client) AddDialer(d Dialer) {
321 cl.dialers = append(cl.dialers, d)
322 for _, t := range cl.torrents {
327 func (cl *Client) Listeners() []Listener {
331 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
333 func (cl *Client) AddListener(l Listener) {
334 cl.listeners = append(cl.listeners, l)
335 if cl.config.AcceptPeerConnections {
336 go cl.acceptConnections(l)
340 func (cl *Client) firewallCallback(net.Addr) bool {
342 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
345 torrent.Add("connections firewalled", 1)
347 torrent.Add("connections not firewalled", 1)
352 func (cl *Client) listenOnNetwork(n network) bool {
353 if n.Ipv4 && cl.config.DisableIPv4 {
356 if n.Ipv6 && cl.config.DisableIPv6 {
359 if n.Tcp && cl.config.DisableTCP {
362 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
368 func (cl *Client) listenNetworks() (ns []network) {
369 for _, n := range allPeerNetworks {
370 if cl.listenOnNetwork(n) {
377 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
378 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
379 cfg := dht.ServerConfig{
380 IPBlocklist: cl.ipBlockList,
382 OnAnnouncePeer: cl.onDHTAnnouncePeer,
383 PublicIP: func() net.IP {
384 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
385 return cl.config.PublicIp6
387 return cl.config.PublicIp4
389 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
390 OnQuery: cl.config.DHTOnQuery,
391 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
393 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
396 s, err = dht.NewServer(&cfg)
399 ts, err := s.Bootstrap()
401 cl.logger.Printf("error bootstrapping dht: %s", err)
403 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
409 func (cl *Client) Closed() events.Done {
410 return cl.closed.Done()
413 func (cl *Client) eachDhtServer(f func(DhtServer)) {
414 for _, ds := range cl.dhtServers {
419 // Stops the client. All connections to peers are closed and all activity will
421 func (cl *Client) Close() (errs []error) {
423 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
426 for _, t := range cl.torrents {
427 err := t.close(&closeGroup)
429 errs = append(errs, err)
433 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
435 for i := range cl.onClose {
436 cl.onClose[len(cl.onClose)-1-i]()
442 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
443 if cl.ipBlockList == nil {
446 return cl.ipBlockList.Lookup(ip)
449 func (cl *Client) ipIsBlocked(ip net.IP) bool {
450 _, blocked := cl.ipBlockRange(ip)
454 func (cl *Client) wantConns() bool {
455 if cl.config.AlwaysWantConns {
458 for _, t := range cl.torrents {
466 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
467 func (cl *Client) rejectAccepted(conn net.Conn) error {
469 return errors.New("don't want conns right now")
471 ra := conn.RemoteAddr()
472 if rip := addrIpOrNil(ra); rip != nil {
473 if cl.config.DisableIPv4Peers && rip.To4() != nil {
474 return errors.New("ipv4 peers disabled")
476 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
477 return errors.New("ipv4 disabled")
479 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
480 return errors.New("ipv6 disabled")
482 if cl.rateLimitAccept(rip) {
483 return errors.New("source IP accepted rate limited")
485 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
486 return errors.New("bad source addr")
492 func (cl *Client) acceptConnections(l Listener) {
494 conn, err := l.Accept()
495 torrent.Add("client listener accepts", 1)
496 conn = pproffd.WrapNetConn(conn)
498 closed := cl.closed.IsSet()
501 reject = cl.rejectAccepted(conn)
511 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
516 torrent.Add("rejected accepted connections", 1)
517 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
520 go cl.incomingConnection(conn)
522 log.Fmsg("accepted %q connection at %q from %q",
526 ).SetLevel(log.Debug).Log(cl.logger)
527 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
528 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
529 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
534 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
535 func regularNetConnPeerConnConnString(nc net.Conn) string {
536 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
539 func (cl *Client) incomingConnection(nc net.Conn) {
541 if tc, ok := nc.(*net.TCPConn); ok {
544 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
545 regularNetConnPeerConnConnString(nc))
551 c.Discovery = PeerSourceIncoming
552 cl.runReceivedConn(c)
555 // Returns a handle to the given torrent, if it's present in the client.
556 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
559 t, ok = cl.torrents[ih]
563 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
564 return cl.torrents[ih]
567 type DialResult struct {
572 func countDialResult(err error) {
574 torrent.Add("successful dials", 1)
576 torrent.Add("unsuccessful dials", 1)
580 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
581 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
582 if ret < minDialTimeout {
588 // Returns whether an address is known to connect to a client with our own ID.
589 func (cl *Client) dopplegangerAddr(addr string) bool {
590 _, ok := cl.dopplegangerAddrs[addr]
594 // Returns a connection over UTP or TCP, whichever is first to connect.
595 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
596 return DialFirst(ctx, addr, cl.dialers)
599 // Returns a connection over UTP or TCP, whichever is first to connect.
600 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
602 t := perf.NewTimer(perf.CallerName(0))
605 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
607 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
611 ctx, cancel := context.WithCancel(ctx)
612 // As soon as we return one connection, cancel the others.
615 resCh := make(chan DialResult, left)
616 for _, _s := range dialers {
621 dialFromSocket(ctx, s, addr),
626 // Wait for a successful connection.
628 defer perf.ScopeTimer()()
629 for ; left > 0 && res.Conn == nil; left-- {
633 // There are still incompleted dials.
635 for ; left > 0; left-- {
636 conn := (<-resCh).Conn
643 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
648 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
649 c, err := s.Dial(ctx, addr)
650 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
651 // it now in case we close the connection forthwith.
652 if tc, ok := c.(*net.TCPConn); ok {
659 func forgettableDialError(err error) bool {
660 return strings.Contains(err.Error(), "no suitable address found")
663 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
664 if _, ok := t.halfOpen[addr]; !ok {
665 panic("invariant broken")
667 delete(t.halfOpen, addr)
669 for _, t := range cl.torrents {
674 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
675 // for valid reasons.
676 func (cl *Client) initiateProtocolHandshakes(
680 outgoing, encryptHeader bool,
681 remoteAddr PeerRemoteAddr,
682 network, connString string,
684 c *PeerConn, err error,
686 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
687 c.headerEncrypted = encryptHeader
688 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
690 dl, ok := ctx.Deadline()
694 err = nc.SetDeadline(dl)
698 err = cl.initiateHandshakes(c, t)
702 // Returns nil connection and nil error if no connection could be established for valid reasons.
703 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
704 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
707 return t.dialTimeout()
710 dr := cl.dialFirst(dialCtx, addr.String())
713 if dialCtx.Err() != nil {
714 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
716 return nil, errors.New("dial failed")
718 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
725 // Returns nil connection and nil error if no connection could be established
726 // for valid reasons.
727 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
728 torrent.Add("establish outgoing connection", 1)
729 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
730 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
732 torrent.Add("initiated conn with preferred header obfuscation", 1)
735 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
736 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
737 // We should have just tried with the preferred header obfuscation. If it was required,
738 // there's nothing else to try.
741 // Try again with encryption if we didn't earlier, or without if we did.
742 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
744 torrent.Add("initiated conn with fallback header obfuscation", 1)
746 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
750 // Called to dial out and run a connection. The addr we're given is already
751 // considered half-open.
752 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
753 cl.dialRateLimiter.Wait(context.Background())
754 c, err := cl.establishOutgoingConn(t, addr)
757 // Don't release lock between here and addPeerConn, unless it's for
759 cl.noLongerHalfOpen(t, addr.String())
762 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
769 t.runHandshookConnLoggingErr(c)
772 // The port number for incoming peer connections. 0 if the client isn't listening.
773 func (cl *Client) incomingPeerPort() int {
774 return cl.LocalPort()
777 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
778 if c.headerEncrypted {
781 rw, c.cryptoMethod, err = mse.InitiateHandshake(
788 cl.config.CryptoProvides,
792 return fmt.Errorf("header obfuscation handshake: %w", err)
795 ih, err := cl.connBtHandshake(c, &t.infoHash)
797 return fmt.Errorf("bittorrent protocol handshake: %w", err)
799 if ih != t.infoHash {
800 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
805 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
806 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
807 func (cl *Client) forSkeys(f func([]byte) bool) {
810 if false { // Emulate the bug from #114
812 for ih := range cl.torrents {
816 for range cl.torrents {
823 for ih := range cl.torrents {
830 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
831 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
837 // Do encryption and bittorrent handshakes as receiver.
838 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
839 defer perf.ScopeTimerErr(&err)()
841 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
843 if err == nil || err == mse.ErrNoSecretKeyMatch {
844 if c.headerEncrypted {
845 torrent.Add("handshakes received encrypted", 1)
847 torrent.Add("handshakes received unencrypted", 1)
850 torrent.Add("handshakes received with error while handling encryption", 1)
853 if err == mse.ErrNoSecretKeyMatch {
858 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
859 err = errors.New("connection does not have required header obfuscation")
862 ih, err := cl.connBtHandshake(c, nil)
864 return nil, fmt.Errorf("during bt handshake: %w", err)
872 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
876 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
877 &successfulPeerWireProtocolHandshakePeerReservedBytes)
880 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
881 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
885 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
887 c.PeerExtensionBytes = res.PeerExtensionBits
888 c.PeerID = res.PeerID
889 c.completedHandshake = time.Now()
890 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
896 func (cl *Client) runReceivedConn(c *PeerConn) {
897 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
901 t, err := cl.receiveHandshakes(c)
904 "error receiving handshakes on %v: %s", c, err,
905 ).SetLevel(log.Debug).
907 "network", c.Network,
909 torrent.Add("error receiving handshake", 1)
911 cl.onBadAccept(c.RemoteAddr)
916 torrent.Add("received handshake for unloaded torrent", 1)
917 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
919 cl.onBadAccept(c.RemoteAddr)
923 torrent.Add("received handshake for loaded torrent", 1)
926 t.runHandshookConnLoggingErr(c)
929 // Client lock must be held before entering this.
930 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
932 for i, b := range cl.config.MinPeerExtensions {
933 if c.PeerExtensionBytes[i]&b != b {
934 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes)
937 if c.PeerID == cl.peerID {
940 addr := c.conn.RemoteAddr().String()
941 cl.dopplegangerAddrs[addr] = struct{}{}
943 // Because the remote address is not necessarily the same as its client's torrent listen
944 // address, we won't record the remote address as a doppleganger. Instead, the initiator
945 // can record *us* as the doppleganger.
947 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
950 c.conn.SetWriteDeadline(time.Time{})
951 c.r = deadlineReader{c.conn, c.r}
952 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
953 if connIsIpv6(c.conn) {
954 torrent.Add("completed handshake over ipv6", 1)
956 if err := t.addPeerConn(c); err != nil {
957 return fmt.Errorf("adding connection: %w", err)
959 defer t.dropConnection(c)
961 cl.sendInitialMessages(c, t)
962 c.initUpdateRequestsTimer()
963 err := c.mainReadLoop()
965 return fmt.Errorf("main read loop: %w", err)
972 func (p *Peer) initUpdateRequestsTimer() {
974 if p.updateRequestsTimer != nil {
975 panic(p.updateRequestsTimer)
978 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
979 p.updateRequestsTimer.Stop()
982 func (c *Peer) updateRequestsTimerFunc() {
984 defer c.locker().Unlock()
985 if c.closed.IsSet() {
988 if c.needRequestUpdate != "" {
991 if c.isLowOnRequests() {
992 // If there are no outstanding requests, then a request update should have already run.
995 c.updateRequests("updateRequestsTimer")
998 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
999 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1000 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1001 const localClientReqq = 1 << 5
1003 // See the order given in Transmission's tr_peerMsgsNew.
1004 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1005 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1006 conn.write(pp.Message{
1008 ExtendedID: pp.HandshakeExtendedID,
1009 ExtendedPayload: func() []byte {
1010 msg := pp.ExtendedHandshakeMessage{
1011 M: map[pp.ExtensionName]pp.ExtensionNumber{
1012 pp.ExtensionNameMetadata: metadataExtendedId,
1014 V: cl.config.ExtendedHandshakeClientVersion,
1015 Reqq: localClientReqq,
1016 YourIp: pp.CompactIp(conn.remoteIp()),
1017 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1018 Port: cl.incomingPeerPort(),
1019 MetadataSize: torrent.metadataSize(),
1020 // TODO: We can figured these out specific to the socket
1022 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1023 Ipv6: cl.config.PublicIp6.To16(),
1025 if !cl.config.DisablePEX {
1026 msg.M[pp.ExtensionNamePex] = pexExtendedId
1028 return bencode.MustMarshal(msg)
1033 if conn.fastEnabled() {
1034 if torrent.haveAllPieces() {
1035 conn.write(pp.Message{Type: pp.HaveAll})
1036 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1038 } else if !torrent.haveAnyPieces() {
1039 conn.write(pp.Message{Type: pp.HaveNone})
1040 conn.sentHaves.Clear()
1046 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1047 conn.write(pp.Message{
1054 func (cl *Client) dhtPort() (ret uint16) {
1055 if len(cl.dhtServers) == 0 {
1058 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1061 func (cl *Client) haveDhtServer() bool {
1062 return len(cl.dhtServers) > 0
1065 // Process incoming ut_metadata message.
1066 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1067 var d pp.ExtendedMetadataRequestMsg
1068 err := bencode.Unmarshal(payload, &d)
1069 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1070 } else if err != nil {
1071 return fmt.Errorf("error unmarshalling bencode: %s", err)
1075 case pp.DataMetadataExtensionMsgType:
1076 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1077 if !c.requestedMetadataPiece(piece) {
1078 return fmt.Errorf("got unexpected piece %d", piece)
1080 c.metadataRequests[piece] = false
1081 begin := len(payload) - d.PieceSize()
1082 if begin < 0 || begin >= len(payload) {
1083 return fmt.Errorf("data has bad offset in payload: %d", begin)
1085 t.saveMetadataPiece(piece, payload[begin:])
1086 c.lastUsefulChunkReceived = time.Now()
1087 err = t.maybeCompleteMetadata()
1089 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1090 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1091 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1092 // log consumers can filter for this message.
1093 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1096 case pp.RequestMetadataExtensionMsgType:
1097 if !t.haveMetadataPiece(piece) {
1098 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1101 start := (1 << 14) * piece
1102 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1103 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1105 case pp.RejectMetadataExtensionMsgType:
1108 return errors.New("unknown msg_type value")
1112 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1113 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1114 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1119 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1123 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1126 if _, ok := cl.ipBlockRange(ip); ok {
1129 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1135 // Return a Torrent ready for insertion into a Client.
1136 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1137 return cl.newTorrentOpt(AddTorrentOpts{
1139 Storage: specStorage,
1143 // Return a Torrent ready for insertion into a Client.
1144 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1145 // use provided storage, if provided
1146 storageClient := cl.defaultStorage
1147 if opts.Storage != nil {
1148 storageClient = storage.NewClient(opts.Storage)
1153 infoHash: opts.InfoHash,
1154 peers: prioritizedPeers{
1156 getPrio: func(p PeerInfo) peerPriority {
1158 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1161 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1163 halfOpen: make(map[string]PeerInfo),
1164 pieceStateChanges: pubsub.NewPubSub(),
1166 storageOpener: storageClient,
1167 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1169 metadataChanged: sync.Cond{
1172 webSeeds: make(map[string]*Peer),
1173 gotMetainfoC: make(chan struct{}),
1175 t.networkingEnabled.Set()
1176 t.logger = cl.logger.WithContextValue(t)
1177 if opts.ChunkSize == 0 {
1178 opts.ChunkSize = defaultChunkSize
1180 t.setChunkSize(opts.ChunkSize)
1184 // A file-like handle to some torrent data resource.
1185 type Handle interface {
1192 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1193 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1196 // Adds a torrent by InfoHash with a custom Storage implementation.
1197 // If the torrent already exists then this Storage is ignored and the
1198 // existing torrent returned with `new` set to `false`
1199 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1202 t, ok := cl.torrents[infoHash]
1208 t = cl.newTorrent(infoHash, specStorage)
1209 cl.eachDhtServer(func(s DhtServer) {
1210 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1211 go t.dhtAnnouncer(s)
1214 cl.torrents[infoHash] = t
1215 cl.clearAcceptLimits()
1216 t.updateWantPeersEvent()
1217 // Tickle Client.waitAccept, new torrent may want conns.
1218 cl.event.Broadcast()
1222 // Adds a torrent by InfoHash with a custom Storage implementation.
1223 // If the torrent already exists then this Storage is ignored and the
1224 // existing torrent returned with `new` set to `false`
1225 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1226 infoHash := opts.InfoHash
1229 t, ok := cl.torrents[infoHash]
1235 t = cl.newTorrentOpt(opts)
1236 cl.eachDhtServer(func(s DhtServer) {
1237 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1238 go t.dhtAnnouncer(s)
1241 cl.torrents[infoHash] = t
1242 cl.clearAcceptLimits()
1243 t.updateWantPeersEvent()
1244 // Tickle Client.waitAccept, new torrent may want conns.
1245 cl.event.Broadcast()
1249 type AddTorrentOpts struct {
1251 Storage storage.ClientImpl
1252 ChunkSize pp.Integer
1255 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1256 // Torrent.MergeSpec.
1257 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1258 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1259 InfoHash: spec.InfoHash,
1260 Storage: spec.Storage,
1261 ChunkSize: spec.ChunkSize,
1265 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1267 modSpec.ChunkSize = 0
1269 err = t.MergeSpec(&modSpec)
1270 if err != nil && new {
1276 type stringAddr string
1278 var _ net.Addr = stringAddr("")
1280 func (stringAddr) Network() string { return "" }
1281 func (me stringAddr) String() string { return string(me) }
1283 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1284 // spec.DisallowDataDownload/Upload will be read and applied
1285 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1286 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1287 if spec.DisplayName != "" {
1288 t.SetDisplayName(spec.DisplayName)
1290 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1291 if spec.InfoBytes != nil {
1292 err := t.SetInfoBytes(spec.InfoBytes)
1298 cl.AddDhtNodes(spec.DhtNodes)
1301 useTorrentSources(spec.Sources, t)
1302 for _, url := range spec.Webseeds {
1305 for _, peerAddr := range spec.PeerAddrs {
1307 Addr: stringAddr(peerAddr),
1308 Source: PeerSourceDirect,
1312 if spec.ChunkSize != 0 {
1313 panic("chunk size cannot be changed for existing Torrent")
1315 t.addTrackers(spec.Trackers)
1317 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1318 t.dataUploadDisallowed = spec.DisallowDataUpload
1322 func useTorrentSources(sources []string, t *Torrent) {
1323 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1324 ctx := context.Background()
1325 for i := 0; i < len(sources); i += 1 {
1328 if err := useTorrentSource(ctx, s, t); err != nil {
1329 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1331 t.logger.Printf("successfully used source %q", s)
1337 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1338 ctx, cancel := context.WithCancel(ctx)
1348 var req *http.Request
1349 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1352 var resp *http.Response
1353 if resp, err = http.DefaultClient.Do(req); err != nil {
1356 var mi metainfo.MetaInfo
1357 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1360 if ctx.Err() != nil {
1365 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1368 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1369 t, ok := cl.torrents[infoHash]
1371 err = fmt.Errorf("no such torrent")
1378 delete(cl.torrents, infoHash)
1382 func (cl *Client) allTorrentsCompleted() bool {
1383 for _, t := range cl.torrents {
1387 if !t.haveAllPieces() {
1394 // Returns true when all torrents are completely downloaded and false if the
1395 // client is stopped before that.
1396 func (cl *Client) WaitAll() bool {
1399 for !cl.allTorrentsCompleted() {
1400 if cl.closed.IsSet() {
1408 // Returns handles to all the torrents loaded in the Client.
1409 func (cl *Client) Torrents() []*Torrent {
1412 return cl.torrentsAsSlice()
1415 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1416 for _, t := range cl.torrents {
1417 ret = append(ret, t)
1422 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1423 spec, err := TorrentSpecFromMagnetUri(uri)
1427 T, _, err = cl.AddTorrentSpec(spec)
1431 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1432 ts, err := TorrentSpecFromMetaInfoErr(mi)
1436 T, _, err = cl.AddTorrentSpec(ts)
1440 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1441 mi, err := metainfo.LoadFromFile(filename)
1445 return cl.AddTorrent(mi)
1448 func (cl *Client) DhtServers() []DhtServer {
1449 return cl.dhtServers
1452 func (cl *Client) AddDhtNodes(nodes []string) {
1453 for _, n := range nodes {
1454 hmp := missinggo.SplitHostMaybePort(n)
1455 ip := net.ParseIP(hmp.Host)
1457 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1460 ni := krpc.NodeInfo{
1461 Addr: krpc.NodeAddr{
1466 cl.eachDhtServer(func(s DhtServer) {
1472 func (cl *Client) banPeerIP(ip net.IP) {
1473 cl.logger.Printf("banning ip %v", ip)
1474 if cl.badPeerIPs == nil {
1475 cl.badPeerIPs = make(map[string]struct{})
1477 cl.badPeerIPs[ip.String()] = struct{}{}
1480 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1489 PeerMaxRequests: 250,
1491 RemoteAddr: remoteAddr,
1493 callbacks: &cl.config.Callbacks,
1495 connString: connString,
1499 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1500 c.setRW(connStatsReadWriter{nc, c})
1501 c.r = &rateLimitedReader{
1502 l: cl.config.DownloadRateLimiter,
1505 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1506 for _, f := range cl.config.Callbacks.NewPeer {
1512 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1519 t.addPeers([]PeerInfo{{
1520 Addr: ipPortAddr{ip, port},
1521 Source: PeerSourceDhtAnnouncePeer,
1525 func firstNotNil(ips ...net.IP) net.IP {
1526 for _, ip := range ips {
1534 func (cl *Client) eachListener(f func(Listener) bool) {
1535 for _, s := range cl.listeners {
1542 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1543 for i := 0; i < len(cl.listeners); i += 1 {
1544 if ret = cl.listeners[i]; f(ret) {
1551 func (cl *Client) publicIp(peer net.IP) net.IP {
1552 // TODO: Use BEP 10 to determine how peers are seeing us.
1553 if peer.To4() != nil {
1555 cl.config.PublicIp4,
1556 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1561 cl.config.PublicIp6,
1562 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1566 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1567 l := cl.findListener(
1568 func(l Listener) bool {
1569 return f(addrIpOrNil(l.Addr()))
1575 return addrIpOrNil(l.Addr())
1578 // Our IP as a peer should see it.
1579 func (cl *Client) publicAddr(peer net.IP) IpPort {
1580 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1583 // ListenAddrs addresses currently being listened to.
1584 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1586 ret = make([]net.Addr, len(cl.listeners))
1587 for i := 0; i < len(cl.listeners); i += 1 {
1588 ret[i] = cl.listeners[i].Addr()
1594 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1595 ipa, ok := tryIpPortFromNetAddr(addr)
1599 ip := maskIpForAcceptLimiting(ipa.IP)
1600 if cl.acceptLimiter == nil {
1601 cl.acceptLimiter = make(map[ipStr]int)
1603 cl.acceptLimiter[ipStr(ip.String())]++
1606 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1607 if ip4 := ip.To4(); ip4 != nil {
1608 return ip4.Mask(net.CIDRMask(24, 32))
1613 func (cl *Client) clearAcceptLimits() {
1614 cl.acceptLimiter = nil
1617 func (cl *Client) acceptLimitClearer() {
1620 case <-cl.closed.Done():
1622 case <-time.After(15 * time.Minute):
1624 cl.clearAcceptLimits()
1630 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1631 if cl.config.DisableAcceptRateLimiting {
1634 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1637 func (cl *Client) rLock() {
1641 func (cl *Client) rUnlock() {
1645 func (cl *Client) lock() {
1649 func (cl *Client) unlock() {
1653 func (cl *Client) locker() *lockWithDeferreds {
1657 func (cl *Client) String() string {
1658 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1661 // Returns connection-level aggregate stats at the Client level. See the comment on
1662 // TorrentStats.ConnStats.
1663 func (cl *Client) ConnStats() ConnStats {
1664 return cl.stats.Copy()