20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 updateRequests chansync.BroadcastCond
91 func (cl *Client) BadPeerIPs() (ips []string) {
93 ips = cl.badPeerIPsLocked()
98 func (cl *Client) badPeerIPsLocked() (ips []string) {
99 ips = make([]string, len(cl.badPeerIPs))
101 for k := range cl.badPeerIPs {
108 func (cl *Client) PeerID() PeerID {
112 // Returns the port number for the first listener that has one. No longer assumes that all port
113 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 func (cl *Client) LocalPort() (port int) {
116 for i := 0; i < len(cl.listeners); i += 1 {
117 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
124 func writeDhtServerStatus(w io.Writer, s DhtServer) {
125 dhtStats := s.Stats()
126 fmt.Fprintf(w, " ID: %x\n", s.ID())
127 spew.Fdump(w, dhtStats)
130 // Writes out a human readable status of the client, such as for writing to a
132 func (cl *Client) WriteStatus(_w io.Writer) {
135 w := bufio.NewWriter(_w)
137 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
138 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
139 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
140 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
141 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
142 cl.eachDhtServer(func(s DhtServer) {
143 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
144 writeDhtServerStatus(w, s)
146 spew.Fdump(w, &cl.stats)
147 torrentsSlice := cl.torrentsAsSlice()
148 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150 sort.Slice(torrentsSlice, func(l, r int) bool {
151 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153 for _, t := range torrentsSlice {
155 fmt.Fprint(w, "<unknown name>")
157 fmt.Fprint(w, t.name())
163 "%f%% of %d bytes (%s)",
164 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166 humanize.Bytes(uint64(*t.length)))
168 w.WriteString("<missing metainfo>")
176 // Filters things that are less than warning from UPnP discovery.
177 func upnpDiscoverLogFilter(m log.Msg) bool {
178 level, ok := m.GetLevel()
179 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
182 func (cl *Client) initLogger() {
183 logger := cl.config.Logger
186 if !cl.config.Debug {
187 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
190 cl.logger = logger.WithValues(cl)
193 func (cl *Client) announceKey() int32 {
194 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
197 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
198 func (cl *Client) init(cfg *ClientConfig) {
200 cl.dopplegangerAddrs = make(map[string]struct{})
201 cl.torrents = make(map[metainfo.Hash]*Torrent)
202 cl.dialRateLimiter = rate.NewLimiter(10, 10)
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
211 cfg = NewDefaultClientConfig()
217 go cl.acceptLimitClearer()
226 storageImpl := cfg.DefaultStorage
227 if storageImpl == nil {
228 // We'd use mmap by default but HFS+ doesn't support sparse files.
229 storageImplCloser := storage.NewFile(cfg.DataDir)
230 cl.onClose = append(cl.onClose, func() {
231 if err := storageImplCloser.Close(); err != nil {
232 cl.logger.Printf("error closing default storage: %s", err)
235 storageImpl = storageImplCloser
237 cl.defaultStorage = storage.NewClient(storageImpl)
239 if cfg.PeerID != "" {
240 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
242 o := copy(cl.peerID[:], cfg.Bep20)
243 _, err = rand.Read(cl.peerID[o:])
245 panic("error generating peer id")
249 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
257 for _, _s := range sockets {
258 s := _s // Go is fucking retarded.
259 cl.onClose = append(cl.onClose, func() { s.Close() })
260 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261 cl.dialers = append(cl.dialers, s)
262 cl.listeners = append(cl.listeners, s)
263 if cl.config.AcceptPeerConnections {
264 go cl.acceptConnections(s)
271 for _, s := range sockets {
272 if pc, ok := s.(net.PacketConn); ok {
273 ds, err := cl.NewAnacrolixDhtServer(pc)
277 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278 cl.onClose = append(cl.onClose, func() { ds.Close() })
283 cl.websocketTrackers = websocketTrackers{
286 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
289 t, ok := cl.torrents[infoHash]
291 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
293 return t.announceRequest(event), nil
295 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
298 t, ok := cl.torrents[dcc.InfoHash]
300 cl.logger.WithDefaultLevel(log.Warning).Printf(
301 "got webrtc conn for unloaded torrent with infohash %x",
307 go t.onWebRtcConn(dc, dcc)
314 func (cl *Client) AddDhtServer(d DhtServer) {
315 cl.dhtServers = append(cl.dhtServers, d)
318 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
319 // given address for any Torrent.
320 func (cl *Client) AddDialer(d Dialer) {
323 cl.dialers = append(cl.dialers, d)
324 for _, t := range cl.torrents {
329 func (cl *Client) Listeners() []Listener {
333 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 func (cl *Client) AddListener(l Listener) {
336 cl.listeners = append(cl.listeners, l)
337 if cl.config.AcceptPeerConnections {
338 go cl.acceptConnections(l)
342 func (cl *Client) firewallCallback(net.Addr) bool {
344 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
347 torrent.Add("connections firewalled", 1)
349 torrent.Add("connections not firewalled", 1)
354 func (cl *Client) listenOnNetwork(n network) bool {
355 if n.Ipv4 && cl.config.DisableIPv4 {
358 if n.Ipv6 && cl.config.DisableIPv6 {
361 if n.Tcp && cl.config.DisableTCP {
364 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
370 func (cl *Client) listenNetworks() (ns []network) {
371 for _, n := range allPeerNetworks {
372 if cl.listenOnNetwork(n) {
379 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
380 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
381 cfg := dht.ServerConfig{
382 IPBlocklist: cl.ipBlockList,
384 OnAnnouncePeer: cl.onDHTAnnouncePeer,
385 PublicIP: func() net.IP {
386 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
387 return cl.config.PublicIp6
389 return cl.config.PublicIp4
391 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
392 OnQuery: cl.config.DHTOnQuery,
393 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
398 s, err = dht.NewServer(&cfg)
401 ts, err := s.Bootstrap()
403 cl.logger.Printf("error bootstrapping dht: %s", err)
405 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
411 func (cl *Client) Closed() events.Done {
412 return cl.closed.Done()
415 func (cl *Client) eachDhtServer(f func(DhtServer)) {
416 for _, ds := range cl.dhtServers {
421 // Stops the client. All connections to peers are closed and all activity will
423 func (cl *Client) Close() (errs []error) {
425 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
428 for _, t := range cl.torrents {
429 err := t.close(&closeGroup)
431 errs = append(errs, err)
435 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
444 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
445 if cl.ipBlockList == nil {
448 return cl.ipBlockList.Lookup(ip)
451 func (cl *Client) ipIsBlocked(ip net.IP) bool {
452 _, blocked := cl.ipBlockRange(ip)
456 func (cl *Client) wantConns() bool {
457 if cl.config.AlwaysWantConns {
460 for _, t := range cl.torrents {
468 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
469 func (cl *Client) rejectAccepted(conn net.Conn) error {
471 return errors.New("don't want conns right now")
473 ra := conn.RemoteAddr()
474 if rip := addrIpOrNil(ra); rip != nil {
475 if cl.config.DisableIPv4Peers && rip.To4() != nil {
476 return errors.New("ipv4 peers disabled")
478 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
479 return errors.New("ipv4 disabled")
481 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
482 return errors.New("ipv6 disabled")
484 if cl.rateLimitAccept(rip) {
485 return errors.New("source IP accepted rate limited")
487 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
488 return errors.New("bad source addr")
494 func (cl *Client) acceptConnections(l Listener) {
496 conn, err := l.Accept()
497 torrent.Add("client listener accepts", 1)
498 conn = pproffd.WrapNetConn(conn)
500 closed := cl.closed.IsSet()
503 reject = cl.rejectAccepted(conn)
513 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
518 torrent.Add("rejected accepted connections", 1)
519 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
522 go cl.incomingConnection(conn)
524 log.Fmsg("accepted %q connection at %q from %q",
528 ).SetLevel(log.Debug).Log(cl.logger)
529 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
530 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
531 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
536 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
537 func regularNetConnPeerConnConnString(nc net.Conn) string {
538 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
541 func (cl *Client) incomingConnection(nc net.Conn) {
543 if tc, ok := nc.(*net.TCPConn); ok {
546 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
547 regularNetConnPeerConnConnString(nc))
553 c.Discovery = PeerSourceIncoming
554 cl.runReceivedConn(c)
557 // Returns a handle to the given torrent, if it's present in the client.
558 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
561 t, ok = cl.torrents[ih]
565 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
566 return cl.torrents[ih]
569 type DialResult struct {
574 func countDialResult(err error) {
576 torrent.Add("successful dials", 1)
578 torrent.Add("unsuccessful dials", 1)
582 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
583 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
584 if ret < minDialTimeout {
590 // Returns whether an address is known to connect to a client with our own ID.
591 func (cl *Client) dopplegangerAddr(addr string) bool {
592 _, ok := cl.dopplegangerAddrs[addr]
596 // Returns a connection over UTP or TCP, whichever is first to connect.
597 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
598 return DialFirst(ctx, addr, cl.dialers)
601 // Returns a connection over UTP or TCP, whichever is first to connect.
602 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
604 t := perf.NewTimer(perf.CallerName(0))
607 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
609 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
613 ctx, cancel := context.WithCancel(ctx)
614 // As soon as we return one connection, cancel the others.
617 resCh := make(chan DialResult, left)
618 for _, _s := range dialers {
623 dialFromSocket(ctx, s, addr),
628 // Wait for a successful connection.
630 defer perf.ScopeTimer()()
631 for ; left > 0 && res.Conn == nil; left-- {
635 // There are still incompleted dials.
637 for ; left > 0; left-- {
638 conn := (<-resCh).Conn
645 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
650 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
651 c, err := s.Dial(ctx, addr)
652 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
653 // it now in case we close the connection forthwith.
654 if tc, ok := c.(*net.TCPConn); ok {
661 func forgettableDialError(err error) bool {
662 return strings.Contains(err.Error(), "no suitable address found")
665 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
666 if _, ok := t.halfOpen[addr]; !ok {
667 panic("invariant broken")
669 delete(t.halfOpen, addr)
671 for _, t := range cl.torrents {
676 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
677 // for valid reasons.
678 func (cl *Client) initiateProtocolHandshakes(
682 outgoing, encryptHeader bool,
683 remoteAddr PeerRemoteAddr,
684 network, connString string,
686 c *PeerConn, err error,
688 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
689 c.headerEncrypted = encryptHeader
690 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
692 dl, ok := ctx.Deadline()
696 err = nc.SetDeadline(dl)
700 err = cl.initiateHandshakes(c, t)
704 // Returns nil connection and nil error if no connection could be established for valid reasons.
705 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
706 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
709 return t.dialTimeout()
712 dr := cl.dialFirst(dialCtx, addr.String())
715 if dialCtx.Err() != nil {
716 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
718 return nil, errors.New("dial failed")
720 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
727 // Returns nil connection and nil error if no connection could be established
728 // for valid reasons.
729 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
730 torrent.Add("establish outgoing connection", 1)
731 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
732 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with preferred header obfuscation", 1)
737 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
738 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
739 // We should have just tried with the preferred header obfuscation. If it was required,
740 // there's nothing else to try.
743 // Try again with encryption if we didn't earlier, or without if we did.
744 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
746 torrent.Add("initiated conn with fallback header obfuscation", 1)
748 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
752 // Called to dial out and run a connection. The addr we're given is already
753 // considered half-open.
754 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
755 cl.dialRateLimiter.Wait(context.Background())
756 c, err := cl.establishOutgoingConn(t, addr)
759 // Don't release lock between here and addPeerConn, unless it's for
761 cl.noLongerHalfOpen(t, addr.String())
764 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
771 t.runHandshookConnLoggingErr(c)
774 // The port number for incoming peer connections. 0 if the client isn't listening.
775 func (cl *Client) incomingPeerPort() int {
776 return cl.LocalPort()
779 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
780 if c.headerEncrypted {
783 rw, c.cryptoMethod, err = mse.InitiateHandshake(
790 cl.config.CryptoProvides,
794 return fmt.Errorf("header obfuscation handshake: %w", err)
797 ih, err := cl.connBtHandshake(c, &t.infoHash)
799 return fmt.Errorf("bittorrent protocol handshake: %w", err)
801 if ih != t.infoHash {
802 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
807 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
808 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
809 func (cl *Client) forSkeys(f func([]byte) bool) {
812 if false { // Emulate the bug from #114
814 for ih := range cl.torrents {
818 for range cl.torrents {
825 for ih := range cl.torrents {
832 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
833 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
839 // Do encryption and bittorrent handshakes as receiver.
840 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
841 defer perf.ScopeTimerErr(&err)()
843 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
845 if err == nil || err == mse.ErrNoSecretKeyMatch {
846 if c.headerEncrypted {
847 torrent.Add("handshakes received encrypted", 1)
849 torrent.Add("handshakes received unencrypted", 1)
852 torrent.Add("handshakes received with error while handling encryption", 1)
855 if err == mse.ErrNoSecretKeyMatch {
860 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
861 err = errors.New("connection does not have required header obfuscation")
864 ih, err := cl.connBtHandshake(c, nil)
866 return nil, fmt.Errorf("during bt handshake: %w", err)
874 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
878 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
879 &successfulPeerWireProtocolHandshakePeerReservedBytes)
882 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
883 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
887 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
889 c.PeerExtensionBytes = res.PeerExtensionBits
890 c.PeerID = res.PeerID
891 c.completedHandshake = time.Now()
892 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
898 func (cl *Client) runReceivedConn(c *PeerConn) {
899 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
903 t, err := cl.receiveHandshakes(c)
906 "error receiving handshakes on %v: %s", c, err,
907 ).SetLevel(log.Debug).
909 "network", c.Network,
911 torrent.Add("error receiving handshake", 1)
913 cl.onBadAccept(c.RemoteAddr)
918 torrent.Add("received handshake for unloaded torrent", 1)
919 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
921 cl.onBadAccept(c.RemoteAddr)
925 torrent.Add("received handshake for loaded torrent", 1)
928 t.runHandshookConnLoggingErr(c)
931 // Client lock must be held before entering this.
932 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
934 for i, b := range cl.config.MinPeerExtensions {
935 if c.PeerExtensionBytes[i]&b != b {
936 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes)
939 if c.PeerID == cl.peerID {
942 addr := c.conn.RemoteAddr().String()
943 cl.dopplegangerAddrs[addr] = struct{}{}
945 // Because the remote address is not necessarily the same as its client's torrent listen
946 // address, we won't record the remote address as a doppleganger. Instead, the initiator
947 // can record *us* as the doppleganger.
949 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
952 c.conn.SetWriteDeadline(time.Time{})
953 c.r = deadlineReader{c.conn, c.r}
954 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
955 if connIsIpv6(c.conn) {
956 torrent.Add("completed handshake over ipv6", 1)
958 if err := t.addPeerConn(c); err != nil {
959 return fmt.Errorf("adding connection: %w", err)
961 defer t.dropConnection(c)
963 cl.sendInitialMessages(c, t)
964 c.initUpdateRequestsTimer()
965 err := c.mainReadLoop()
967 return fmt.Errorf("main read loop: %w", err)
974 func (p *Peer) initUpdateRequestsTimer() {
976 if p.updateRequestsTimer != nil {
977 panic(p.updateRequestsTimer)
980 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
981 p.updateRequestsTimer.Stop()
984 func (c *Peer) updateRequestsTimerFunc() {
986 defer c.locker().Unlock()
987 if c.closed.IsSet() {
990 if c.needRequestUpdate != "" {
993 if c.isLowOnRequests() {
994 // If there are no outstanding requests, then a request update should have already run.
997 c.updateRequests("updateRequestsTimer")
1000 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1001 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1002 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1003 const localClientReqq = 1 << 5
1005 // See the order given in Transmission's tr_peerMsgsNew.
1006 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1007 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1008 conn.write(pp.Message{
1010 ExtendedID: pp.HandshakeExtendedID,
1011 ExtendedPayload: func() []byte {
1012 msg := pp.ExtendedHandshakeMessage{
1013 M: map[pp.ExtensionName]pp.ExtensionNumber{
1014 pp.ExtensionNameMetadata: metadataExtendedId,
1016 V: cl.config.ExtendedHandshakeClientVersion,
1017 Reqq: localClientReqq,
1018 YourIp: pp.CompactIp(conn.remoteIp()),
1019 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1020 Port: cl.incomingPeerPort(),
1021 MetadataSize: torrent.metadataSize(),
1022 // TODO: We can figured these out specific to the socket
1024 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1025 Ipv6: cl.config.PublicIp6.To16(),
1027 if !cl.config.DisablePEX {
1028 msg.M[pp.ExtensionNamePex] = pexExtendedId
1030 return bencode.MustMarshal(msg)
1035 if conn.fastEnabled() {
1036 if torrent.haveAllPieces() {
1037 conn.write(pp.Message{Type: pp.HaveAll})
1038 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1040 } else if !torrent.haveAnyPieces() {
1041 conn.write(pp.Message{Type: pp.HaveNone})
1042 conn.sentHaves.Clear()
1048 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1049 conn.write(pp.Message{
1056 func (cl *Client) dhtPort() (ret uint16) {
1057 if len(cl.dhtServers) == 0 {
1060 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1063 func (cl *Client) haveDhtServer() bool {
1064 return len(cl.dhtServers) > 0
1067 // Process incoming ut_metadata message.
1068 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1069 var d pp.ExtendedMetadataRequestMsg
1070 err := bencode.Unmarshal(payload, &d)
1071 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1072 } else if err != nil {
1073 return fmt.Errorf("error unmarshalling bencode: %s", err)
1077 case pp.DataMetadataExtensionMsgType:
1078 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1079 if !c.requestedMetadataPiece(piece) {
1080 return fmt.Errorf("got unexpected piece %d", piece)
1082 c.metadataRequests[piece] = false
1083 begin := len(payload) - d.PieceSize()
1084 if begin < 0 || begin >= len(payload) {
1085 return fmt.Errorf("data has bad offset in payload: %d", begin)
1087 t.saveMetadataPiece(piece, payload[begin:])
1088 c.lastUsefulChunkReceived = time.Now()
1089 err = t.maybeCompleteMetadata()
1091 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1092 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1093 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1094 // log consumers can filter for this message.
1095 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1098 case pp.RequestMetadataExtensionMsgType:
1099 if !t.haveMetadataPiece(piece) {
1100 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1103 start := (1 << 14) * piece
1104 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1105 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1107 case pp.RejectMetadataExtensionMsgType:
1110 return errors.New("unknown msg_type value")
1114 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1115 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1116 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1121 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1125 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1128 if _, ok := cl.ipBlockRange(ip); ok {
1131 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1137 // Return a Torrent ready for insertion into a Client.
1138 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1139 return cl.newTorrentOpt(AddTorrentOpts{
1141 Storage: specStorage,
1145 // Return a Torrent ready for insertion into a Client.
1146 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1147 // use provided storage, if provided
1148 storageClient := cl.defaultStorage
1149 if opts.Storage != nil {
1150 storageClient = storage.NewClient(opts.Storage)
1155 infoHash: opts.InfoHash,
1156 peers: prioritizedPeers{
1158 getPrio: func(p PeerInfo) peerPriority {
1160 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1163 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1165 halfOpen: make(map[string]PeerInfo),
1166 pieceStateChanges: pubsub.NewPubSub(),
1168 storageOpener: storageClient,
1169 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1171 metadataChanged: sync.Cond{
1174 webSeeds: make(map[string]*Peer),
1175 gotMetainfoC: make(chan struct{}),
1177 t.networkingEnabled.Set()
1178 t.logger = cl.logger.WithContextValue(t)
1179 if opts.ChunkSize == 0 {
1180 opts.ChunkSize = defaultChunkSize
1182 t.setChunkSize(opts.ChunkSize)
1186 // A file-like handle to some torrent data resource.
1187 type Handle interface {
1194 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1195 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1198 // Adds a torrent by InfoHash with a custom Storage implementation.
1199 // If the torrent already exists then this Storage is ignored and the
1200 // existing torrent returned with `new` set to `false`
1201 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1204 t, ok := cl.torrents[infoHash]
1210 t = cl.newTorrent(infoHash, specStorage)
1211 cl.eachDhtServer(func(s DhtServer) {
1212 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1213 go t.dhtAnnouncer(s)
1216 cl.torrents[infoHash] = t
1217 cl.clearAcceptLimits()
1218 t.updateWantPeersEvent()
1219 // Tickle Client.waitAccept, new torrent may want conns.
1220 cl.event.Broadcast()
1224 // Adds a torrent by InfoHash with a custom Storage implementation.
1225 // If the torrent already exists then this Storage is ignored and the
1226 // existing torrent returned with `new` set to `false`
1227 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1228 infoHash := opts.InfoHash
1231 t, ok := cl.torrents[infoHash]
1237 t = cl.newTorrentOpt(opts)
1238 cl.eachDhtServer(func(s DhtServer) {
1239 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1240 go t.dhtAnnouncer(s)
1243 cl.torrents[infoHash] = t
1244 cl.clearAcceptLimits()
1245 t.updateWantPeersEvent()
1246 // Tickle Client.waitAccept, new torrent may want conns.
1247 cl.event.Broadcast()
1251 type AddTorrentOpts struct {
1253 Storage storage.ClientImpl
1254 ChunkSize pp.Integer
1257 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1258 // Torrent.MergeSpec.
1259 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1260 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1261 InfoHash: spec.InfoHash,
1262 Storage: spec.Storage,
1263 ChunkSize: spec.ChunkSize,
1267 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1269 modSpec.ChunkSize = 0
1271 err = t.MergeSpec(&modSpec)
1272 if err != nil && new {
1278 type stringAddr string
1280 var _ net.Addr = stringAddr("")
1282 func (stringAddr) Network() string { return "" }
1283 func (me stringAddr) String() string { return string(me) }
1285 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1286 // spec.DisallowDataDownload/Upload will be read and applied
1287 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1288 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1289 if spec.DisplayName != "" {
1290 t.SetDisplayName(spec.DisplayName)
1292 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1293 if spec.InfoBytes != nil {
1294 err := t.SetInfoBytes(spec.InfoBytes)
1300 cl.AddDhtNodes(spec.DhtNodes)
1303 useTorrentSources(spec.Sources, t)
1304 for _, url := range spec.Webseeds {
1307 for _, peerAddr := range spec.PeerAddrs {
1309 Addr: stringAddr(peerAddr),
1310 Source: PeerSourceDirect,
1314 if spec.ChunkSize != 0 {
1315 panic("chunk size cannot be changed for existing Torrent")
1317 t.addTrackers(spec.Trackers)
1319 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1320 t.dataUploadDisallowed = spec.DisallowDataUpload
1324 func useTorrentSources(sources []string, t *Torrent) {
1325 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1326 ctx := context.Background()
1327 for i := 0; i < len(sources); i += 1 {
1330 if err := useTorrentSource(ctx, s, t); err != nil {
1331 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1333 t.logger.Printf("successfully used source %q", s)
1339 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1340 ctx, cancel := context.WithCancel(ctx)
1350 var req *http.Request
1351 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1354 var resp *http.Response
1355 if resp, err = http.DefaultClient.Do(req); err != nil {
1358 var mi metainfo.MetaInfo
1359 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1362 if ctx.Err() != nil {
1367 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1370 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1371 t, ok := cl.torrents[infoHash]
1373 err = fmt.Errorf("no such torrent")
1380 delete(cl.torrents, infoHash)
1384 func (cl *Client) allTorrentsCompleted() bool {
1385 for _, t := range cl.torrents {
1389 if !t.haveAllPieces() {
1396 // Returns true when all torrents are completely downloaded and false if the
1397 // client is stopped before that.
1398 func (cl *Client) WaitAll() bool {
1401 for !cl.allTorrentsCompleted() {
1402 if cl.closed.IsSet() {
1410 // Returns handles to all the torrents loaded in the Client.
1411 func (cl *Client) Torrents() []*Torrent {
1414 return cl.torrentsAsSlice()
1417 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1418 for _, t := range cl.torrents {
1419 ret = append(ret, t)
1424 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1425 spec, err := TorrentSpecFromMagnetUri(uri)
1429 T, _, err = cl.AddTorrentSpec(spec)
1433 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1434 ts, err := TorrentSpecFromMetaInfoErr(mi)
1438 T, _, err = cl.AddTorrentSpec(ts)
1442 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1443 mi, err := metainfo.LoadFromFile(filename)
1447 return cl.AddTorrent(mi)
1450 func (cl *Client) DhtServers() []DhtServer {
1451 return cl.dhtServers
1454 func (cl *Client) AddDhtNodes(nodes []string) {
1455 for _, n := range nodes {
1456 hmp := missinggo.SplitHostMaybePort(n)
1457 ip := net.ParseIP(hmp.Host)
1459 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1462 ni := krpc.NodeInfo{
1463 Addr: krpc.NodeAddr{
1468 cl.eachDhtServer(func(s DhtServer) {
1474 func (cl *Client) banPeerIP(ip net.IP) {
1475 cl.logger.Printf("banning ip %v", ip)
1476 if cl.badPeerIPs == nil {
1477 cl.badPeerIPs = make(map[string]struct{})
1479 cl.badPeerIPs[ip.String()] = struct{}{}
1482 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1491 PeerMaxRequests: 250,
1493 RemoteAddr: remoteAddr,
1495 callbacks: &cl.config.Callbacks,
1497 connString: connString,
1501 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1502 c.setRW(connStatsReadWriter{nc, c})
1503 c.r = &rateLimitedReader{
1504 l: cl.config.DownloadRateLimiter,
1507 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1508 for _, f := range cl.config.Callbacks.NewPeer {
1514 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1521 t.addPeers([]PeerInfo{{
1522 Addr: ipPortAddr{ip, port},
1523 Source: PeerSourceDhtAnnouncePeer,
1527 func firstNotNil(ips ...net.IP) net.IP {
1528 for _, ip := range ips {
1536 func (cl *Client) eachListener(f func(Listener) bool) {
1537 for _, s := range cl.listeners {
1544 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1545 for i := 0; i < len(cl.listeners); i += 1 {
1546 if ret = cl.listeners[i]; f(ret) {
1553 func (cl *Client) publicIp(peer net.IP) net.IP {
1554 // TODO: Use BEP 10 to determine how peers are seeing us.
1555 if peer.To4() != nil {
1557 cl.config.PublicIp4,
1558 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1563 cl.config.PublicIp6,
1564 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1568 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1569 l := cl.findListener(
1570 func(l Listener) bool {
1571 return f(addrIpOrNil(l.Addr()))
1577 return addrIpOrNil(l.Addr())
1580 // Our IP as a peer should see it.
1581 func (cl *Client) publicAddr(peer net.IP) IpPort {
1582 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1585 // ListenAddrs addresses currently being listened to.
1586 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1588 ret = make([]net.Addr, len(cl.listeners))
1589 for i := 0; i < len(cl.listeners); i += 1 {
1590 ret[i] = cl.listeners[i].Addr()
1596 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1597 ipa, ok := tryIpPortFromNetAddr(addr)
1601 ip := maskIpForAcceptLimiting(ipa.IP)
1602 if cl.acceptLimiter == nil {
1603 cl.acceptLimiter = make(map[ipStr]int)
1605 cl.acceptLimiter[ipStr(ip.String())]++
1608 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1609 if ip4 := ip.To4(); ip4 != nil {
1610 return ip4.Mask(net.CIDRMask(24, 32))
1615 func (cl *Client) clearAcceptLimits() {
1616 cl.acceptLimiter = nil
1619 func (cl *Client) acceptLimitClearer() {
1622 case <-cl.closed.Done():
1624 case <-time.After(15 * time.Minute):
1626 cl.clearAcceptLimits()
1632 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1633 if cl.config.DisableAcceptRateLimiting {
1636 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1639 func (cl *Client) rLock() {
1643 func (cl *Client) rUnlock() {
1647 func (cl *Client) lock() {
1651 func (cl *Client) unlock() {
1655 func (cl *Client) locker() *lockWithDeferreds {
1659 func (cl *Client) String() string {
1660 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1663 // Returns connection-level aggregate stats at the Client level. See the comment on
1664 // TorrentStats.ConnStats.
1665 func (cl *Client) ConnStats() ConnStats {
1666 return cl.stats.Copy()