20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 updateRequests chansync.BroadcastCond
91 func (cl *Client) BadPeerIPs() (ips []string) {
93 ips = cl.badPeerIPsLocked()
98 func (cl *Client) badPeerIPsLocked() (ips []string) {
99 ips = make([]string, len(cl.badPeerIPs))
101 for k := range cl.badPeerIPs {
108 func (cl *Client) PeerID() PeerID {
112 // Returns the port number for the first listener that has one. No longer assumes that all port
113 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 func (cl *Client) LocalPort() (port int) {
116 for i := 0; i < len(cl.listeners); i += 1 {
117 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
124 func writeDhtServerStatus(w io.Writer, s DhtServer) {
125 dhtStats := s.Stats()
126 fmt.Fprintf(w, " ID: %x\n", s.ID())
127 spew.Fdump(w, dhtStats)
130 // Writes out a human readable status of the client, such as for writing to a
132 func (cl *Client) WriteStatus(_w io.Writer) {
135 w := bufio.NewWriter(_w)
137 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
138 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
139 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
140 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
141 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
142 cl.eachDhtServer(func(s DhtServer) {
143 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
144 writeDhtServerStatus(w, s)
146 spew.Fdump(w, &cl.stats)
147 torrentsSlice := cl.torrentsAsSlice()
148 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150 sort.Slice(torrentsSlice, func(l, r int) bool {
151 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153 for _, t := range torrentsSlice {
155 fmt.Fprint(w, "<unknown name>")
157 fmt.Fprint(w, t.name())
163 "%f%% of %d bytes (%s)",
164 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166 humanize.Bytes(uint64(*t.length)))
168 w.WriteString("<missing metainfo>")
176 // Filters things that are less than warning from UPnP discovery.
177 func upnpDiscoverLogFilter(m log.Msg) bool {
178 level, ok := m.GetLevel()
179 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
182 func (cl *Client) initLogger() {
183 logger := cl.config.Logger
186 if !cl.config.Debug {
187 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
190 cl.logger = logger.WithValues(cl)
193 func (cl *Client) announceKey() int32 {
194 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
197 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
198 func (cl *Client) init(cfg *ClientConfig) {
200 cl.dopplegangerAddrs = make(map[string]struct{})
201 cl.torrents = make(map[metainfo.Hash]*Torrent)
202 cl.dialRateLimiter = rate.NewLimiter(10, 10)
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
211 cfg = NewDefaultClientConfig()
217 go cl.acceptLimitClearer()
226 storageImpl := cfg.DefaultStorage
227 if storageImpl == nil {
228 // We'd use mmap by default but HFS+ doesn't support sparse files.
229 storageImplCloser := storage.NewFile(cfg.DataDir)
230 cl.onClose = append(cl.onClose, func() {
231 if err := storageImplCloser.Close(); err != nil {
232 cl.logger.Printf("error closing default storage: %s", err)
235 storageImpl = storageImplCloser
237 cl.defaultStorage = storage.NewClient(storageImpl)
239 if cfg.PeerID != "" {
240 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
242 o := copy(cl.peerID[:], cfg.Bep20)
243 _, err = rand.Read(cl.peerID[o:])
245 panic("error generating peer id")
249 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
257 for _, _s := range sockets {
258 s := _s // Go is fucking retarded.
259 cl.onClose = append(cl.onClose, func() { s.Close() })
260 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261 cl.dialers = append(cl.dialers, s)
262 cl.listeners = append(cl.listeners, s)
263 if cl.config.AcceptPeerConnections {
264 go cl.acceptConnections(s)
271 for _, s := range sockets {
272 if pc, ok := s.(net.PacketConn); ok {
273 ds, err := cl.NewAnacrolixDhtServer(pc)
277 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278 cl.onClose = append(cl.onClose, func() { ds.Close() })
283 cl.websocketTrackers = websocketTrackers{
286 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
289 t, ok := cl.torrents[infoHash]
291 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
293 return t.announceRequest(event), nil
295 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
298 t, ok := cl.torrents[dcc.InfoHash]
300 cl.logger.WithDefaultLevel(log.Warning).Printf(
301 "got webrtc conn for unloaded torrent with infohash %x",
307 go t.onWebRtcConn(dc, dcc)
314 func (cl *Client) AddDhtServer(d DhtServer) {
315 cl.dhtServers = append(cl.dhtServers, d)
318 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
319 // given address for any Torrent.
320 func (cl *Client) AddDialer(d Dialer) {
323 cl.dialers = append(cl.dialers, d)
324 for _, t := range cl.torrents {
329 func (cl *Client) Listeners() []Listener {
333 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 func (cl *Client) AddListener(l Listener) {
336 cl.listeners = append(cl.listeners, l)
337 if cl.config.AcceptPeerConnections {
338 go cl.acceptConnections(l)
342 func (cl *Client) firewallCallback(net.Addr) bool {
344 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
347 torrent.Add("connections firewalled", 1)
349 torrent.Add("connections not firewalled", 1)
354 func (cl *Client) listenOnNetwork(n network) bool {
355 if n.Ipv4 && cl.config.DisableIPv4 {
358 if n.Ipv6 && cl.config.DisableIPv6 {
361 if n.Tcp && cl.config.DisableTCP {
364 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
370 func (cl *Client) listenNetworks() (ns []network) {
371 for _, n := range allPeerNetworks {
372 if cl.listenOnNetwork(n) {
379 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
380 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
381 cfg := dht.ServerConfig{
382 IPBlocklist: cl.ipBlockList,
384 OnAnnouncePeer: cl.onDHTAnnouncePeer,
385 PublicIP: func() net.IP {
386 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
387 return cl.config.PublicIp6
389 return cl.config.PublicIp4
391 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
392 OnQuery: cl.config.DHTOnQuery,
393 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
398 s, err = dht.NewServer(&cfg)
401 ts, err := s.Bootstrap()
403 cl.logger.Printf("error bootstrapping dht: %s", err)
405 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
411 func (cl *Client) Closed() events.Done {
412 return cl.closed.Done()
415 func (cl *Client) eachDhtServer(f func(DhtServer)) {
416 for _, ds := range cl.dhtServers {
421 // Stops the client. All connections to peers are closed and all activity will
423 func (cl *Client) Close() (errs []error) {
425 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
428 for _, t := range cl.torrents {
429 err := t.close(&closeGroup)
431 errs = append(errs, err)
435 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
444 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
445 if cl.ipBlockList == nil {
448 return cl.ipBlockList.Lookup(ip)
451 func (cl *Client) ipIsBlocked(ip net.IP) bool {
452 _, blocked := cl.ipBlockRange(ip)
456 func (cl *Client) wantConns() bool {
457 if cl.config.AlwaysWantConns {
460 for _, t := range cl.torrents {
468 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
469 func (cl *Client) rejectAccepted(conn net.Conn) error {
471 return errors.New("don't want conns right now")
473 ra := conn.RemoteAddr()
474 if rip := addrIpOrNil(ra); rip != nil {
475 if cl.config.DisableIPv4Peers && rip.To4() != nil {
476 return errors.New("ipv4 peers disabled")
478 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
479 return errors.New("ipv4 disabled")
482 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
483 return errors.New("ipv6 disabled")
485 if cl.rateLimitAccept(rip) {
486 return errors.New("source IP accepted rate limited")
488 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
489 return errors.New("bad source addr")
495 func (cl *Client) acceptConnections(l Listener) {
497 conn, err := l.Accept()
498 torrent.Add("client listener accepts", 1)
499 conn = pproffd.WrapNetConn(conn)
501 closed := cl.closed.IsSet()
504 reject = cl.rejectAccepted(conn)
514 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add("rejected accepted connections", 1)
520 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
523 go cl.incomingConnection(conn)
525 log.Fmsg("accepted %q connection at %q from %q",
529 ).SetLevel(log.Debug).Log(cl.logger)
530 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
531 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
532 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
537 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
538 func regularNetConnPeerConnConnString(nc net.Conn) string {
539 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
542 func (cl *Client) incomingConnection(nc net.Conn) {
544 if tc, ok := nc.(*net.TCPConn); ok {
547 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
548 regularNetConnPeerConnConnString(nc))
554 c.Discovery = PeerSourceIncoming
555 cl.runReceivedConn(c)
558 // Returns a handle to the given torrent, if it's present in the client.
559 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
562 t, ok = cl.torrents[ih]
566 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
567 return cl.torrents[ih]
570 type DialResult struct {
575 func countDialResult(err error) {
577 torrent.Add("successful dials", 1)
579 torrent.Add("unsuccessful dials", 1)
583 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
584 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
585 if ret < minDialTimeout {
591 // Returns whether an address is known to connect to a client with our own ID.
592 func (cl *Client) dopplegangerAddr(addr string) bool {
593 _, ok := cl.dopplegangerAddrs[addr]
597 // Returns a connection over UTP or TCP, whichever is first to connect.
598 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
599 return DialFirst(ctx, addr, cl.dialers)
602 // Returns a connection over UTP or TCP, whichever is first to connect.
603 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605 t := perf.NewTimer(perf.CallerName(0))
608 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
614 ctx, cancel := context.WithCancel(ctx)
615 // As soon as we return one connection, cancel the others.
618 resCh := make(chan DialResult, left)
619 for _, _s := range dialers {
624 dialFromSocket(ctx, s, addr),
629 // Wait for a successful connection.
631 defer perf.ScopeTimer()()
632 for ; left > 0 && res.Conn == nil; left-- {
636 // There are still incompleted dials.
638 for ; left > 0; left-- {
639 conn := (<-resCh).Conn
646 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
651 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
662 func forgettableDialError(err error) bool {
663 return strings.Contains(err.Error(), "no suitable address found")
666 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
667 if _, ok := t.halfOpen[addr]; !ok {
668 panic("invariant broken")
670 delete(t.halfOpen, addr)
672 for _, t := range cl.torrents {
677 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
678 // for valid reasons.
679 func (cl *Client) initiateProtocolHandshakes(
683 outgoing, encryptHeader bool,
684 remoteAddr PeerRemoteAddr,
685 network, connString string,
687 c *PeerConn, err error,
689 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
690 c.headerEncrypted = encryptHeader
691 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693 dl, ok := ctx.Deadline()
697 err = nc.SetDeadline(dl)
701 err = cl.initiateHandshakes(c, t)
705 // Returns nil connection and nil error if no connection could be established for valid reasons.
706 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
707 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
710 return t.dialTimeout()
713 dr := cl.dialFirst(dialCtx, addr.String())
716 if dialCtx.Err() != nil {
717 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719 return nil, errors.New("dial failed")
721 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
728 // Returns nil connection and nil error if no connection could be established
729 // for valid reasons.
730 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
731 torrent.Add("establish outgoing connection", 1)
732 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
733 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with preferred header obfuscation", 1)
738 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
739 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
740 // We should have just tried with the preferred header obfuscation. If it was required,
741 // there's nothing else to try.
744 // Try again with encryption if we didn't earlier, or without if we did.
745 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with fallback header obfuscation", 1)
749 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
753 // Called to dial out and run a connection. The addr we're given is already
754 // considered half-open.
755 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
756 cl.dialRateLimiter.Wait(context.Background())
757 c, err := cl.establishOutgoingConn(t, addr)
760 // Don't release lock between here and addPeerConn, unless it's for
762 cl.noLongerHalfOpen(t, addr.String())
765 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
772 t.runHandshookConnLoggingErr(c)
775 // The port number for incoming peer connections. 0 if the client isn't listening.
776 func (cl *Client) incomingPeerPort() int {
777 return cl.LocalPort()
780 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
781 if c.headerEncrypted {
784 rw, c.cryptoMethod, err = mse.InitiateHandshake(
791 cl.config.CryptoProvides,
795 return fmt.Errorf("header obfuscation handshake: %w", err)
798 ih, err := cl.connBtHandshake(c, &t.infoHash)
800 return fmt.Errorf("bittorrent protocol handshake: %w", err)
802 if ih != t.infoHash {
803 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
808 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
809 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
810 func (cl *Client) forSkeys(f func([]byte) bool) {
813 if false { // Emulate the bug from #114
815 for ih := range cl.torrents {
819 for range cl.torrents {
826 for ih := range cl.torrents {
833 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
834 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
840 // Do encryption and bittorrent handshakes as receiver.
841 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
842 defer perf.ScopeTimerErr(&err)()
844 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
846 if err == nil || err == mse.ErrNoSecretKeyMatch {
847 if c.headerEncrypted {
848 torrent.Add("handshakes received encrypted", 1)
850 torrent.Add("handshakes received unencrypted", 1)
853 torrent.Add("handshakes received with error while handling encryption", 1)
856 if err == mse.ErrNoSecretKeyMatch {
861 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
862 err = errors.New("connection does not have required header obfuscation")
865 ih, err := cl.connBtHandshake(c, nil)
867 return nil, fmt.Errorf("during bt handshake: %w", err)
875 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
879 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
880 &successfulPeerWireProtocolHandshakePeerReservedBytes)
883 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
884 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
888 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
890 c.PeerExtensionBytes = res.PeerExtensionBits
891 c.PeerID = res.PeerID
892 c.completedHandshake = time.Now()
893 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
899 func (cl *Client) runReceivedConn(c *PeerConn) {
900 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
904 t, err := cl.receiveHandshakes(c)
907 "error receiving handshakes on %v: %s", c, err,
908 ).SetLevel(log.Debug).
910 "network", c.Network,
912 torrent.Add("error receiving handshake", 1)
914 cl.onBadAccept(c.RemoteAddr)
919 torrent.Add("received handshake for unloaded torrent", 1)
920 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
922 cl.onBadAccept(c.RemoteAddr)
926 torrent.Add("received handshake for loaded torrent", 1)
929 t.runHandshookConnLoggingErr(c)
932 // Client lock must be held before entering this.
933 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
935 for i, b := range cl.config.MinPeerExtensions {
936 if c.PeerExtensionBytes[i]&b != b {
937 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes)
940 if c.PeerID == cl.peerID {
943 addr := c.conn.RemoteAddr().String()
944 cl.dopplegangerAddrs[addr] = struct{}{}
946 // Because the remote address is not necessarily the same as its client's torrent listen
947 // address, we won't record the remote address as a doppleganger. Instead, the initiator
948 // can record *us* as the doppleganger.
950 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
953 c.conn.SetWriteDeadline(time.Time{})
954 c.r = deadlineReader{c.conn, c.r}
955 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
956 if connIsIpv6(c.conn) {
957 torrent.Add("completed handshake over ipv6", 1)
959 if err := t.addPeerConn(c); err != nil {
960 return fmt.Errorf("adding connection: %w", err)
962 defer t.dropConnection(c)
964 cl.sendInitialMessages(c, t)
965 c.initUpdateRequestsTimer()
966 err := c.mainReadLoop()
968 return fmt.Errorf("main read loop: %w", err)
975 func (p *Peer) initUpdateRequestsTimer() {
977 if p.updateRequestsTimer != nil {
978 panic(p.updateRequestsTimer)
981 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
982 p.updateRequestsTimer.Stop()
985 func (c *Peer) updateRequestsTimerFunc() {
987 defer c.locker().Unlock()
988 if c.closed.IsSet() {
991 if c.needRequestUpdate != "" {
994 if c.isLowOnRequests() {
995 // If there are no outstanding requests, then a request update should have already run.
998 c.updateRequests("updateRequestsTimer")
1001 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1002 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1003 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1004 const localClientReqq = 1 << 5
1006 // See the order given in Transmission's tr_peerMsgsNew.
1007 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1008 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1009 conn.write(pp.Message{
1011 ExtendedID: pp.HandshakeExtendedID,
1012 ExtendedPayload: func() []byte {
1013 msg := pp.ExtendedHandshakeMessage{
1014 M: map[pp.ExtensionName]pp.ExtensionNumber{
1015 pp.ExtensionNameMetadata: metadataExtendedId,
1017 V: cl.config.ExtendedHandshakeClientVersion,
1018 Reqq: localClientReqq,
1019 YourIp: pp.CompactIp(conn.remoteIp()),
1020 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1021 Port: cl.incomingPeerPort(),
1022 MetadataSize: torrent.metadataSize(),
1023 // TODO: We can figured these out specific to the socket
1025 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1026 Ipv6: cl.config.PublicIp6.To16(),
1028 if !cl.config.DisablePEX {
1029 msg.M[pp.ExtensionNamePex] = pexExtendedId
1031 return bencode.MustMarshal(msg)
1036 if conn.fastEnabled() {
1037 if torrent.haveAllPieces() {
1038 conn.write(pp.Message{Type: pp.HaveAll})
1039 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1041 } else if !torrent.haveAnyPieces() {
1042 conn.write(pp.Message{Type: pp.HaveNone})
1043 conn.sentHaves.Clear()
1049 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1050 conn.write(pp.Message{
1057 func (cl *Client) dhtPort() (ret uint16) {
1058 if len(cl.dhtServers) == 0 {
1061 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1064 func (cl *Client) haveDhtServer() bool {
1065 return len(cl.dhtServers) > 0
1068 // Process incoming ut_metadata message.
1069 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1070 var d pp.ExtendedMetadataRequestMsg
1071 err := bencode.Unmarshal(payload, &d)
1072 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1073 } else if err != nil {
1074 return fmt.Errorf("error unmarshalling bencode: %s", err)
1078 case pp.DataMetadataExtensionMsgType:
1079 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1080 if !c.requestedMetadataPiece(piece) {
1081 return fmt.Errorf("got unexpected piece %d", piece)
1083 c.metadataRequests[piece] = false
1084 begin := len(payload) - d.PieceSize()
1085 if begin < 0 || begin >= len(payload) {
1086 return fmt.Errorf("data has bad offset in payload: %d", begin)
1088 t.saveMetadataPiece(piece, payload[begin:])
1089 c.lastUsefulChunkReceived = time.Now()
1090 err = t.maybeCompleteMetadata()
1092 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1093 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1094 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1095 // log consumers can filter for this message.
1096 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1099 case pp.RequestMetadataExtensionMsgType:
1100 if !t.haveMetadataPiece(piece) {
1101 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1104 start := (1 << 14) * piece
1105 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1106 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1108 case pp.RejectMetadataExtensionMsgType:
1111 return errors.New("unknown msg_type value")
1115 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1116 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1117 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1122 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1126 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1129 if _, ok := cl.ipBlockRange(ip); ok {
1132 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1138 // Return a Torrent ready for insertion into a Client.
1139 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1140 return cl.newTorrentOpt(addTorrentOpts{
1142 Storage: specStorage,
1146 // Return a Torrent ready for insertion into a Client.
1147 func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) {
1148 // use provided storage, if provided
1149 storageClient := cl.defaultStorage
1150 if opts.Storage != nil {
1151 storageClient = storage.NewClient(opts.Storage)
1156 infoHash: opts.InfoHash,
1157 peers: prioritizedPeers{
1159 getPrio: func(p PeerInfo) peerPriority {
1161 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1164 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1166 halfOpen: make(map[string]PeerInfo),
1167 pieceStateChanges: pubsub.NewPubSub(),
1169 storageOpener: storageClient,
1170 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1172 metadataChanged: sync.Cond{
1175 webSeeds: make(map[string]*Peer),
1176 gotMetainfoC: make(chan struct{}),
1178 t.networkingEnabled.Set()
1179 t.logger = cl.logger.WithContextValue(t)
1180 if opts.ChunkSize == 0 {
1181 opts.ChunkSize = defaultChunkSize
1183 t.setChunkSize(opts.ChunkSize)
1187 // A file-like handle to some torrent data resource.
1188 type Handle interface {
1195 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1196 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1199 // Adds a torrent by InfoHash with a custom Storage implementation.
1200 // If the torrent already exists then this Storage is ignored and the
1201 // existing torrent returned with `new` set to `false`
1202 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1205 t, ok := cl.torrents[infoHash]
1211 t = cl.newTorrent(infoHash, specStorage)
1212 cl.eachDhtServer(func(s DhtServer) {
1213 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1214 go t.dhtAnnouncer(s)
1217 cl.torrents[infoHash] = t
1218 cl.clearAcceptLimits()
1219 t.updateWantPeersEvent()
1220 // Tickle Client.waitAccept, new torrent may want conns.
1221 cl.event.Broadcast()
1225 // Adds a torrent by InfoHash with a custom Storage implementation.
1226 // If the torrent already exists then this Storage is ignored and the
1227 // existing torrent returned with `new` set to `false`
1228 func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) {
1229 infoHash := opts.InfoHash
1232 t, ok := cl.torrents[infoHash]
1238 t = cl.newTorrentOpt(opts)
1239 cl.eachDhtServer(func(s DhtServer) {
1240 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1241 go t.dhtAnnouncer(s)
1244 cl.torrents[infoHash] = t
1245 cl.clearAcceptLimits()
1246 t.updateWantPeersEvent()
1247 // Tickle Client.waitAccept, new torrent may want conns.
1248 cl.event.Broadcast()
1252 type addTorrentOpts struct {
1254 Storage storage.ClientImpl
1255 ChunkSize pp.Integer
1258 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1259 // Torrent.MergeSpec.
1260 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1261 t, new = cl.AddTorrentOpt(addTorrentOpts{
1262 InfoHash: spec.InfoHash,
1263 Storage: spec.Storage,
1264 ChunkSize: spec.ChunkSize,
1268 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1270 modSpec.ChunkSize = 0
1272 err = t.MergeSpec(&modSpec)
1273 if err != nil && new {
1279 type stringAddr string
1281 var _ net.Addr = stringAddr("")
1283 func (stringAddr) Network() string { return "" }
1284 func (me stringAddr) String() string { return string(me) }
1286 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1287 // spec.DisallowDataDownload/Upload will be read and applied
1288 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1289 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1290 if spec.DisplayName != "" {
1291 t.SetDisplayName(spec.DisplayName)
1293 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1294 if spec.InfoBytes != nil {
1295 err := t.SetInfoBytes(spec.InfoBytes)
1301 cl.AddDhtNodes(spec.DhtNodes)
1304 useTorrentSources(spec.Sources, t)
1305 for _, url := range spec.Webseeds {
1308 for _, peerAddr := range spec.PeerAddrs {
1310 Addr: stringAddr(peerAddr),
1311 Source: PeerSourceDirect,
1315 if spec.ChunkSize != 0 {
1316 panic("chunk size cannot be changed for existing Torrent")
1318 t.addTrackers(spec.Trackers)
1320 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1321 t.dataUploadDisallowed = spec.DisallowDataUpload
1325 func useTorrentSources(sources []string, t *Torrent) {
1326 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1327 ctx := context.Background()
1328 for i := 0; i < len(sources); i += 1 {
1331 if err := useTorrentSource(ctx, s, t); err != nil {
1332 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1334 t.logger.Printf("successfully used source %q", s)
1340 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1341 ctx, cancel := context.WithCancel(ctx)
1351 var req *http.Request
1352 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1355 var resp *http.Response
1356 if resp, err = http.DefaultClient.Do(req); err != nil {
1359 var mi metainfo.MetaInfo
1360 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1363 if ctx.Err() != nil {
1368 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1371 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1372 t, ok := cl.torrents[infoHash]
1374 err = fmt.Errorf("no such torrent")
1381 delete(cl.torrents, infoHash)
1385 func (cl *Client) allTorrentsCompleted() bool {
1386 for _, t := range cl.torrents {
1390 if !t.haveAllPieces() {
1397 // Returns true when all torrents are completely downloaded and false if the
1398 // client is stopped before that.
1399 func (cl *Client) WaitAll() bool {
1402 for !cl.allTorrentsCompleted() {
1403 if cl.closed.IsSet() {
1411 // Returns handles to all the torrents loaded in the Client.
1412 func (cl *Client) Torrents() []*Torrent {
1415 return cl.torrentsAsSlice()
1418 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1419 for _, t := range cl.torrents {
1420 ret = append(ret, t)
1425 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1426 spec, err := TorrentSpecFromMagnetUri(uri)
1430 T, _, err = cl.AddTorrentSpec(spec)
1434 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1435 ts, err := TorrentSpecFromMetaInfoErr(mi)
1439 T, _, err = cl.AddTorrentSpec(ts)
1443 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1444 mi, err := metainfo.LoadFromFile(filename)
1448 return cl.AddTorrent(mi)
1451 func (cl *Client) DhtServers() []DhtServer {
1452 return cl.dhtServers
1455 func (cl *Client) AddDhtNodes(nodes []string) {
1456 for _, n := range nodes {
1457 hmp := missinggo.SplitHostMaybePort(n)
1458 ip := net.ParseIP(hmp.Host)
1460 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1463 ni := krpc.NodeInfo{
1464 Addr: krpc.NodeAddr{
1469 cl.eachDhtServer(func(s DhtServer) {
1475 func (cl *Client) banPeerIP(ip net.IP) {
1476 cl.logger.Printf("banning ip %v", ip)
1477 if cl.badPeerIPs == nil {
1478 cl.badPeerIPs = make(map[string]struct{})
1480 cl.badPeerIPs[ip.String()] = struct{}{}
1483 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1492 PeerMaxRequests: 250,
1494 RemoteAddr: remoteAddr,
1496 callbacks: &cl.config.Callbacks,
1498 connString: connString,
1502 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1503 c.setRW(connStatsReadWriter{nc, c})
1504 c.r = &rateLimitedReader{
1505 l: cl.config.DownloadRateLimiter,
1508 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1509 for _, f := range cl.config.Callbacks.NewPeer {
1515 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1522 t.addPeers([]PeerInfo{{
1523 Addr: ipPortAddr{ip, port},
1524 Source: PeerSourceDhtAnnouncePeer,
1528 func firstNotNil(ips ...net.IP) net.IP {
1529 for _, ip := range ips {
1537 func (cl *Client) eachListener(f func(Listener) bool) {
1538 for _, s := range cl.listeners {
1545 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1546 for i := 0; i < len(cl.listeners); i += 1 {
1547 if ret = cl.listeners[i]; f(ret) {
1554 func (cl *Client) publicIp(peer net.IP) net.IP {
1555 // TODO: Use BEP 10 to determine how peers are seeing us.
1556 if peer.To4() != nil {
1558 cl.config.PublicIp4,
1559 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1564 cl.config.PublicIp6,
1565 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1569 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1570 l := cl.findListener(
1571 func(l Listener) bool {
1572 return f(addrIpOrNil(l.Addr()))
1578 return addrIpOrNil(l.Addr())
1581 // Our IP as a peer should see it.
1582 func (cl *Client) publicAddr(peer net.IP) IpPort {
1583 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1586 // ListenAddrs addresses currently being listened to.
1587 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1589 ret = make([]net.Addr, len(cl.listeners))
1590 for i := 0; i < len(cl.listeners); i += 1 {
1591 ret[i] = cl.listeners[i].Addr()
1597 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1598 ipa, ok := tryIpPortFromNetAddr(addr)
1602 ip := maskIpForAcceptLimiting(ipa.IP)
1603 if cl.acceptLimiter == nil {
1604 cl.acceptLimiter = make(map[ipStr]int)
1606 cl.acceptLimiter[ipStr(ip.String())]++
1609 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1610 if ip4 := ip.To4(); ip4 != nil {
1611 return ip4.Mask(net.CIDRMask(24, 32))
1616 func (cl *Client) clearAcceptLimits() {
1617 cl.acceptLimiter = nil
1620 func (cl *Client) acceptLimitClearer() {
1623 case <-cl.closed.Done():
1625 case <-time.After(15 * time.Minute):
1627 cl.clearAcceptLimits()
1633 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1634 if cl.config.DisableAcceptRateLimiting {
1637 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1640 func (cl *Client) rLock() {
1644 func (cl *Client) rUnlock() {
1648 func (cl *Client) lock() {
1652 func (cl *Client) unlock() {
1656 func (cl *Client) locker() *lockWithDeferreds {
1660 func (cl *Client) String() string {
1661 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1664 // Returns connection-level aggregate stats at the Client level. See the comment on
1665 // TorrentStats.ConnStats.
1666 func (cl *Client) ConnStats() ConnStats {
1667 return cl.stats.Copy()