18 "github.com/anacrolix/chansync/events"
19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/v2"
25 "github.com/anacrolix/missinggo/v2/bitmap"
26 "github.com/anacrolix/missinggo/v2/pproffd"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed chansync.SetOnce
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() (ips []string) {
91 ips = cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() (ips []string) {
97 ips = make([]string, len(cl.badPeerIPs))
99 for k := range cl.badPeerIPs {
106 func (cl *Client) PeerID() PeerID {
110 // Returns the port number for the first listener that has one. No longer assumes that all port
111 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
113 func (cl *Client) LocalPort() (port int) {
114 for i := 0; i < len(cl.listeners); i += 1 {
115 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
122 func writeDhtServerStatus(w io.Writer, s DhtServer) {
123 dhtStats := s.Stats()
124 fmt.Fprintf(w, " ID: %x\n", s.ID())
125 spew.Fdump(w, dhtStats)
128 // Writes out a human readable status of the client, such as for writing to a
130 func (cl *Client) WriteStatus(_w io.Writer) {
133 w := bufio.NewWriter(_w)
135 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
136 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
137 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
138 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
139 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
140 cl.eachDhtServer(func(s DhtServer) {
141 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
142 writeDhtServerStatus(w, s)
144 spew.Fdump(w, &cl.stats)
145 torrentsSlice := cl.torrentsAsSlice()
146 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
148 sort.Slice(torrentsSlice, func(l, r int) bool {
149 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
151 for _, t := range torrentsSlice {
153 fmt.Fprint(w, "<unknown name>")
155 fmt.Fprint(w, t.name())
161 "%f%% of %d bytes (%s)",
162 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
164 humanize.Bytes(uint64(*t.length)))
166 w.WriteString("<missing metainfo>")
174 // Filters things that are less than warning from UPnP discovery.
175 func upnpDiscoverLogFilter(m log.Msg) bool {
176 level, ok := m.GetLevel()
177 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
180 func (cl *Client) initLogger() {
181 logger := cl.config.Logger
184 if !cl.config.Debug {
185 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
188 cl.logger = logger.WithValues(cl)
191 func (cl *Client) announceKey() int32 {
192 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
195 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
196 func (cl *Client) init(cfg *ClientConfig) {
198 cl.dopplegangerAddrs = make(map[string]struct{})
199 cl.torrents = make(map[metainfo.Hash]*Torrent)
200 cl.dialRateLimiter = rate.NewLimiter(10, 10)
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 cl.event.L = cl.locker()
204 cl.ipBlockList = cfg.IPBlocklist
207 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
209 cfg = NewDefaultClientConfig()
215 go cl.acceptLimitClearer()
224 storageImpl := cfg.DefaultStorage
225 if storageImpl == nil {
226 // We'd use mmap by default but HFS+ doesn't support sparse files.
227 storageImplCloser := storage.NewFile(cfg.DataDir)
228 cl.onClose = append(cl.onClose, func() {
229 if err := storageImplCloser.Close(); err != nil {
230 cl.logger.Printf("error closing default storage: %s", err)
233 storageImpl = storageImplCloser
235 cl.defaultStorage = storage.NewClient(storageImpl)
237 if cfg.PeerID != "" {
238 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
240 o := copy(cl.peerID[:], cfg.Bep20)
241 _, err = rand.Read(cl.peerID[o:])
243 panic("error generating peer id")
247 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
255 for _, _s := range sockets {
256 s := _s // Go is fucking retarded.
257 cl.onClose = append(cl.onClose, func() { s.Close() })
258 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
259 cl.dialers = append(cl.dialers, s)
260 cl.listeners = append(cl.listeners, s)
261 if cl.config.AcceptPeerConnections {
262 go cl.acceptConnections(s)
269 for _, s := range sockets {
270 if pc, ok := s.(net.PacketConn); ok {
271 ds, err := cl.NewAnacrolixDhtServer(pc)
275 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
276 cl.onClose = append(cl.onClose, func() { ds.Close() })
281 cl.websocketTrackers = websocketTrackers{
284 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
287 t, ok := cl.torrents[infoHash]
289 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
291 return t.announceRequest(event), nil
293 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
296 t, ok := cl.torrents[dcc.InfoHash]
298 cl.logger.WithDefaultLevel(log.Warning).Printf(
299 "got webrtc conn for unloaded torrent with infohash %x",
305 go t.onWebRtcConn(dc, dcc)
316 func (cl *Client) AddDhtServer(d DhtServer) {
317 cl.dhtServers = append(cl.dhtServers, d)
320 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
321 // given address for any Torrent.
322 func (cl *Client) AddDialer(d Dialer) {
325 cl.dialers = append(cl.dialers, d)
326 for _, t := range cl.torrents {
331 func (cl *Client) Listeners() []Listener {
335 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
337 func (cl *Client) AddListener(l Listener) {
338 cl.listeners = append(cl.listeners, l)
339 if cl.config.AcceptPeerConnections {
340 go cl.acceptConnections(l)
344 func (cl *Client) firewallCallback(net.Addr) bool {
346 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
349 torrent.Add("connections firewalled", 1)
351 torrent.Add("connections not firewalled", 1)
356 func (cl *Client) listenOnNetwork(n network) bool {
357 if n.Ipv4 && cl.config.DisableIPv4 {
360 if n.Ipv6 && cl.config.DisableIPv6 {
363 if n.Tcp && cl.config.DisableTCP {
366 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
372 func (cl *Client) listenNetworks() (ns []network) {
373 for _, n := range allPeerNetworks {
374 if cl.listenOnNetwork(n) {
381 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
382 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
383 cfg := dht.ServerConfig{
384 IPBlocklist: cl.ipBlockList,
386 OnAnnouncePeer: cl.onDHTAnnouncePeer,
387 PublicIP: func() net.IP {
388 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
389 return cl.config.PublicIp6
391 return cl.config.PublicIp4
393 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
394 OnQuery: cl.config.DHTOnQuery,
395 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
397 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
400 s, err = dht.NewServer(&cfg)
403 ts, err := s.Bootstrap()
405 cl.logger.Printf("error bootstrapping dht: %s", err)
407 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
413 func (cl *Client) Closed() events.Done {
414 return cl.closed.Done()
417 func (cl *Client) eachDhtServer(f func(DhtServer)) {
418 for _, ds := range cl.dhtServers {
423 // Stops the client. All connections to peers are closed and all activity will
425 func (cl *Client) Close() (errs []error) {
427 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
430 for _, t := range cl.torrents {
431 err := t.close(&closeGroup)
433 errs = append(errs, err)
437 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
439 for i := range cl.onClose {
440 cl.onClose[len(cl.onClose)-1-i]()
446 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
447 if cl.ipBlockList == nil {
450 return cl.ipBlockList.Lookup(ip)
453 func (cl *Client) ipIsBlocked(ip net.IP) bool {
454 _, blocked := cl.ipBlockRange(ip)
458 func (cl *Client) wantConns() bool {
459 if cl.config.AlwaysWantConns {
462 for _, t := range cl.torrents {
470 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
471 func (cl *Client) rejectAccepted(conn net.Conn) error {
473 return errors.New("don't want conns right now")
475 ra := conn.RemoteAddr()
476 if rip := addrIpOrNil(ra); rip != nil {
477 if cl.config.DisableIPv4Peers && rip.To4() != nil {
478 return errors.New("ipv4 peers disabled")
480 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
481 return errors.New("ipv4 disabled")
484 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
485 return errors.New("ipv6 disabled")
487 if cl.rateLimitAccept(rip) {
488 return errors.New("source IP accepted rate limited")
490 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
491 return errors.New("bad source addr")
497 func (cl *Client) acceptConnections(l Listener) {
499 conn, err := l.Accept()
500 torrent.Add("client listener accepts", 1)
501 conn = pproffd.WrapNetConn(conn)
503 closed := cl.closed.IsSet()
506 reject = cl.rejectAccepted(conn)
516 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
521 torrent.Add("rejected accepted connections", 1)
522 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
525 go cl.incomingConnection(conn)
527 log.Fmsg("accepted %q connection at %q from %q",
531 ).SetLevel(log.Debug).Log(cl.logger)
532 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
533 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
534 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
539 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
540 func regularNetConnPeerConnConnString(nc net.Conn) string {
541 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
544 func (cl *Client) incomingConnection(nc net.Conn) {
546 if tc, ok := nc.(*net.TCPConn); ok {
549 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
550 regularNetConnPeerConnConnString(nc))
556 c.Discovery = PeerSourceIncoming
557 cl.runReceivedConn(c)
560 // Returns a handle to the given torrent, if it's present in the client.
561 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
564 t, ok = cl.torrents[ih]
568 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
569 return cl.torrents[ih]
572 type DialResult struct {
577 func countDialResult(err error) {
579 torrent.Add("successful dials", 1)
581 torrent.Add("unsuccessful dials", 1)
585 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
586 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
587 if ret < minDialTimeout {
593 // Returns whether an address is known to connect to a client with our own ID.
594 func (cl *Client) dopplegangerAddr(addr string) bool {
595 _, ok := cl.dopplegangerAddrs[addr]
599 // Returns a connection over UTP or TCP, whichever is first to connect.
600 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
601 return DialFirst(ctx, addr, cl.dialers)
604 // Returns a connection over UTP or TCP, whichever is first to connect.
605 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
607 t := perf.NewTimer(perf.CallerName(0))
610 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
612 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
616 ctx, cancel := context.WithCancel(ctx)
617 // As soon as we return one connection, cancel the others.
620 resCh := make(chan DialResult, left)
621 for _, _s := range dialers {
626 dialFromSocket(ctx, s, addr),
631 // Wait for a successful connection.
633 defer perf.ScopeTimer()()
634 for ; left > 0 && res.Conn == nil; left-- {
638 // There are still incompleted dials.
640 for ; left > 0; left-- {
641 conn := (<-resCh).Conn
648 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
653 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
654 c, err := s.Dial(ctx, addr)
655 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
656 // it now in case we close the connection forthwith.
657 if tc, ok := c.(*net.TCPConn); ok {
664 func forgettableDialError(err error) bool {
665 return strings.Contains(err.Error(), "no suitable address found")
668 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
669 if _, ok := t.halfOpen[addr]; !ok {
670 panic("invariant broken")
672 delete(t.halfOpen, addr)
674 for _, t := range cl.torrents {
679 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
680 // for valid reasons.
681 func (cl *Client) initiateProtocolHandshakes(
685 outgoing, encryptHeader bool,
686 remoteAddr PeerRemoteAddr,
687 network, connString string,
689 c *PeerConn, err error,
691 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
692 c.headerEncrypted = encryptHeader
693 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
695 dl, ok := ctx.Deadline()
699 err = nc.SetDeadline(dl)
703 err = cl.initiateHandshakes(c, t)
707 // Returns nil connection and nil error if no connection could be established for valid reasons.
708 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
709 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
712 return t.dialTimeout()
715 dr := cl.dialFirst(dialCtx, addr.String())
718 if dialCtx.Err() != nil {
719 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
721 return nil, errors.New("dial failed")
723 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
730 // Returns nil connection and nil error if no connection could be established
731 // for valid reasons.
732 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
733 torrent.Add("establish outgoing connection", 1)
734 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
735 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
737 torrent.Add("initiated conn with preferred header obfuscation", 1)
740 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
741 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
742 // We should have just tried with the preferred header obfuscation. If it was required,
743 // there's nothing else to try.
746 // Try again with encryption if we didn't earlier, or without if we did.
747 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
749 torrent.Add("initiated conn with fallback header obfuscation", 1)
751 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
755 // Called to dial out and run a connection. The addr we're given is already
756 // considered half-open.
757 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
758 cl.dialRateLimiter.Wait(context.Background())
759 c, err := cl.establishOutgoingConn(t, addr)
762 // Don't release lock between here and addPeerConn, unless it's for
764 cl.noLongerHalfOpen(t, addr.String())
767 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
774 t.runHandshookConnLoggingErr(c)
777 // The port number for incoming peer connections. 0 if the client isn't listening.
778 func (cl *Client) incomingPeerPort() int {
779 return cl.LocalPort()
782 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
783 if c.headerEncrypted {
786 rw, c.cryptoMethod, err = mse.InitiateHandshake(
793 cl.config.CryptoProvides,
797 return fmt.Errorf("header obfuscation handshake: %w", err)
800 ih, err := cl.connBtHandshake(c, &t.infoHash)
802 return fmt.Errorf("bittorrent protocol handshake: %w", err)
804 if ih != t.infoHash {
805 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
810 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
811 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
812 func (cl *Client) forSkeys(f func([]byte) bool) {
815 if false { // Emulate the bug from #114
817 for ih := range cl.torrents {
821 for range cl.torrents {
828 for ih := range cl.torrents {
835 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
836 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
842 // Do encryption and bittorrent handshakes as receiver.
843 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
844 defer perf.ScopeTimerErr(&err)()
846 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
848 if err == nil || err == mse.ErrNoSecretKeyMatch {
849 if c.headerEncrypted {
850 torrent.Add("handshakes received encrypted", 1)
852 torrent.Add("handshakes received unencrypted", 1)
855 torrent.Add("handshakes received with error while handling encryption", 1)
858 if err == mse.ErrNoSecretKeyMatch {
863 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
864 err = errors.New("connection does not have required header obfuscation")
867 ih, err := cl.connBtHandshake(c, nil)
869 return nil, fmt.Errorf("during bt handshake: %w", err)
877 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
878 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
883 c.PeerExtensionBytes = res.PeerExtensionBits
884 c.PeerID = res.PeerID
885 c.completedHandshake = time.Now()
886 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
892 func (cl *Client) runReceivedConn(c *PeerConn) {
893 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
897 t, err := cl.receiveHandshakes(c)
900 "error receiving handshakes on %v: %s", c, err,
901 ).SetLevel(log.Debug).
903 "network", c.Network,
905 torrent.Add("error receiving handshake", 1)
907 cl.onBadAccept(c.RemoteAddr)
912 torrent.Add("received handshake for unloaded torrent", 1)
913 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
915 cl.onBadAccept(c.RemoteAddr)
919 torrent.Add("received handshake for loaded torrent", 1)
922 t.runHandshookConnLoggingErr(c)
925 // Client lock must be held before entering this.
926 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
928 if c.PeerID == cl.peerID {
931 addr := c.conn.RemoteAddr().String()
932 cl.dopplegangerAddrs[addr] = struct{}{}
934 // Because the remote address is not necessarily the same as its client's torrent listen
935 // address, we won't record the remote address as a doppleganger. Instead, the initiator
936 // can record *us* as the doppleganger.
938 return errors.New("local and remote peer ids are the same")
940 c.conn.SetWriteDeadline(time.Time{})
941 c.r = deadlineReader{c.conn, c.r}
942 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
943 if connIsIpv6(c.conn) {
944 torrent.Add("completed handshake over ipv6", 1)
946 if err := t.addPeerConn(c); err != nil {
947 return fmt.Errorf("adding connection: %w", err)
949 defer t.dropConnection(c)
951 cl.sendInitialMessages(c, t)
952 err := c.mainReadLoop()
954 return fmt.Errorf("main read loop: %w", err)
959 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
960 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
961 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
962 const localClientReqq = 1 << 5
964 // See the order given in Transmission's tr_peerMsgsNew.
965 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
966 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
967 conn.write(pp.Message{
969 ExtendedID: pp.HandshakeExtendedID,
970 ExtendedPayload: func() []byte {
971 msg := pp.ExtendedHandshakeMessage{
972 M: map[pp.ExtensionName]pp.ExtensionNumber{
973 pp.ExtensionNameMetadata: metadataExtendedId,
975 V: cl.config.ExtendedHandshakeClientVersion,
976 Reqq: localClientReqq,
977 YourIp: pp.CompactIp(conn.remoteIp()),
978 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
979 Port: cl.incomingPeerPort(),
980 MetadataSize: torrent.metadataSize(),
981 // TODO: We can figured these out specific to the socket
983 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
984 Ipv6: cl.config.PublicIp6.To16(),
986 if !cl.config.DisablePEX {
987 msg.M[pp.ExtensionNamePex] = pexExtendedId
989 return bencode.MustMarshal(msg)
994 if conn.fastEnabled() {
995 if torrent.haveAllPieces() {
996 conn.write(pp.Message{Type: pp.HaveAll})
997 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
999 } else if !torrent.haveAnyPieces() {
1000 conn.write(pp.Message{Type: pp.HaveNone})
1001 conn.sentHaves.Clear()
1007 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1008 conn.write(pp.Message{
1015 func (cl *Client) dhtPort() (ret uint16) {
1016 if len(cl.dhtServers) == 0 {
1019 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1022 func (cl *Client) haveDhtServer() bool {
1023 return len(cl.dhtServers) > 0
1026 // Process incoming ut_metadata message.
1027 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1028 var d pp.ExtendedMetadataRequestMsg
1029 err := bencode.Unmarshal(payload, &d)
1030 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1031 } else if err != nil {
1032 return fmt.Errorf("error unmarshalling bencode: %s", err)
1036 case pp.DataMetadataExtensionMsgType:
1037 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1038 if !c.requestedMetadataPiece(piece) {
1039 return fmt.Errorf("got unexpected piece %d", piece)
1041 c.metadataRequests[piece] = false
1042 begin := len(payload) - d.PieceSize()
1043 if begin < 0 || begin >= len(payload) {
1044 return fmt.Errorf("data has bad offset in payload: %d", begin)
1046 t.saveMetadataPiece(piece, payload[begin:])
1047 c.lastUsefulChunkReceived = time.Now()
1048 err = t.maybeCompleteMetadata()
1050 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1051 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1052 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1053 // log consumers can filter for this message.
1054 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1057 case pp.RequestMetadataExtensionMsgType:
1058 if !t.haveMetadataPiece(piece) {
1059 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1062 start := (1 << 14) * piece
1063 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1064 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1066 case pp.RejectMetadataExtensionMsgType:
1069 return errors.New("unknown msg_type value")
1073 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1074 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1075 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1080 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1084 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1087 if _, ok := cl.ipBlockRange(ip); ok {
1090 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1096 // Return a Torrent ready for insertion into a Client.
1097 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1098 // use provided storage, if provided
1099 storageClient := cl.defaultStorage
1100 if specStorage != nil {
1101 storageClient = storage.NewClient(specStorage)
1107 peers: prioritizedPeers{
1109 getPrio: func(p PeerInfo) peerPriority {
1111 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1114 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1116 halfOpen: make(map[string]PeerInfo),
1117 pieceStateChanges: pubsub.NewPubSub(),
1119 storageOpener: storageClient,
1120 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1122 metadataChanged: sync.Cond{
1125 webSeeds: make(map[string]*Peer),
1126 gotMetainfoC: make(chan struct{}),
1128 t.networkingEnabled.Set()
1129 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1130 t.logger = cl.logger.WithContextValue(t)
1131 t.setChunkSize(defaultChunkSize)
1135 // A file-like handle to some torrent data resource.
1136 type Handle interface {
1143 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1144 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1147 // Adds a torrent by InfoHash with a custom Storage implementation.
1148 // If the torrent already exists then this Storage is ignored and the
1149 // existing torrent returned with `new` set to `false`
1150 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1153 t, ok := cl.torrents[infoHash]
1159 t = cl.newTorrent(infoHash, specStorage)
1160 cl.eachDhtServer(func(s DhtServer) {
1161 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1162 go t.dhtAnnouncer(s)
1165 cl.torrents[infoHash] = t
1166 cl.clearAcceptLimits()
1167 t.updateWantPeersEvent()
1168 // Tickle Client.waitAccept, new torrent may want conns.
1169 cl.event.Broadcast()
1173 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1174 // Torrent.MergeSpec.
1175 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1176 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1177 err = t.MergeSpec(spec)
1178 if err != nil && new {
1184 type stringAddr string
1186 var _ net.Addr = stringAddr("")
1188 func (stringAddr) Network() string { return "" }
1189 func (me stringAddr) String() string { return string(me) }
1191 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1192 // spec.DisallowDataDownload/Upload will be read and applied
1193 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1194 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1195 if spec.DisplayName != "" {
1196 t.SetDisplayName(spec.DisplayName)
1198 if spec.InfoBytes != nil {
1199 err := t.SetInfoBytes(spec.InfoBytes)
1205 cl.AddDhtNodes(spec.DhtNodes)
1208 useTorrentSources(spec.Sources, t)
1209 for _, url := range spec.Webseeds {
1212 for _, peerAddr := range spec.PeerAddrs {
1214 Addr: stringAddr(peerAddr),
1215 Source: PeerSourceDirect,
1219 if spec.ChunkSize != 0 {
1220 t.setChunkSize(pp.Integer(spec.ChunkSize))
1222 t.addTrackers(spec.Trackers)
1224 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1225 t.dataUploadDisallowed = spec.DisallowDataUpload
1229 func useTorrentSources(sources []string, t *Torrent) {
1230 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1231 ctx := context.Background()
1232 for i := 0; i < len(sources); i += 1 {
1235 if err := useTorrentSource(ctx, s, t); err != nil {
1236 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1238 t.logger.Printf("successfully used source %q", s)
1244 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1245 ctx, cancel := context.WithCancel(ctx)
1255 var req *http.Request
1256 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1259 var resp *http.Response
1260 if resp, err = http.DefaultClient.Do(req); err != nil {
1263 var mi metainfo.MetaInfo
1264 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1267 if ctx.Err() != nil {
1272 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1275 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1276 t, ok := cl.torrents[infoHash]
1278 err = fmt.Errorf("no such torrent")
1285 delete(cl.torrents, infoHash)
1289 func (cl *Client) allTorrentsCompleted() bool {
1290 for _, t := range cl.torrents {
1294 if !t.haveAllPieces() {
1301 // Returns true when all torrents are completely downloaded and false if the
1302 // client is stopped before that.
1303 func (cl *Client) WaitAll() bool {
1306 for !cl.allTorrentsCompleted() {
1307 if cl.closed.IsSet() {
1315 // Returns handles to all the torrents loaded in the Client.
1316 func (cl *Client) Torrents() []*Torrent {
1319 return cl.torrentsAsSlice()
1322 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1323 for _, t := range cl.torrents {
1324 ret = append(ret, t)
1329 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1330 spec, err := TorrentSpecFromMagnetUri(uri)
1334 T, _, err = cl.AddTorrentSpec(spec)
1338 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1339 ts, err := TorrentSpecFromMetaInfoErr(mi)
1343 T, _, err = cl.AddTorrentSpec(ts)
1347 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1348 mi, err := metainfo.LoadFromFile(filename)
1352 return cl.AddTorrent(mi)
1355 func (cl *Client) DhtServers() []DhtServer {
1356 return cl.dhtServers
1359 func (cl *Client) AddDhtNodes(nodes []string) {
1360 for _, n := range nodes {
1361 hmp := missinggo.SplitHostMaybePort(n)
1362 ip := net.ParseIP(hmp.Host)
1364 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1367 ni := krpc.NodeInfo{
1368 Addr: krpc.NodeAddr{
1373 cl.eachDhtServer(func(s DhtServer) {
1379 func (cl *Client) banPeerIP(ip net.IP) {
1380 cl.logger.Printf("banning ip %v", ip)
1381 if cl.badPeerIPs == nil {
1382 cl.badPeerIPs = make(map[string]struct{})
1384 cl.badPeerIPs[ip.String()] = struct{}{}
1387 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1396 PeerMaxRequests: 250,
1398 RemoteAddr: remoteAddr,
1400 callbacks: &cl.config.Callbacks,
1402 connString: connString,
1406 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1407 c.setRW(connStatsReadWriter{nc, c})
1408 c.r = &rateLimitedReader{
1409 l: cl.config.DownloadRateLimiter,
1412 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1413 for _, f := range cl.config.Callbacks.NewPeer {
1419 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1426 t.addPeers([]PeerInfo{{
1427 Addr: ipPortAddr{ip, port},
1428 Source: PeerSourceDhtAnnouncePeer,
1432 func firstNotNil(ips ...net.IP) net.IP {
1433 for _, ip := range ips {
1441 func (cl *Client) eachListener(f func(Listener) bool) {
1442 for _, s := range cl.listeners {
1449 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1450 for i := 0; i < len(cl.listeners); i += 1 {
1451 if ret = cl.listeners[i]; f(ret) {
1458 func (cl *Client) publicIp(peer net.IP) net.IP {
1459 // TODO: Use BEP 10 to determine how peers are seeing us.
1460 if peer.To4() != nil {
1462 cl.config.PublicIp4,
1463 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1468 cl.config.PublicIp6,
1469 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1473 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1474 l := cl.findListener(
1475 func(l Listener) bool {
1476 return f(addrIpOrNil(l.Addr()))
1482 return addrIpOrNil(l.Addr())
1485 // Our IP as a peer should see it.
1486 func (cl *Client) publicAddr(peer net.IP) IpPort {
1487 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1490 // ListenAddrs addresses currently being listened to.
1491 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1493 ret = make([]net.Addr, len(cl.listeners))
1494 for i := 0; i < len(cl.listeners); i += 1 {
1495 ret[i] = cl.listeners[i].Addr()
1501 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1502 ipa, ok := tryIpPortFromNetAddr(addr)
1506 ip := maskIpForAcceptLimiting(ipa.IP)
1507 if cl.acceptLimiter == nil {
1508 cl.acceptLimiter = make(map[ipStr]int)
1510 cl.acceptLimiter[ipStr(ip.String())]++
1513 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1514 if ip4 := ip.To4(); ip4 != nil {
1515 return ip4.Mask(net.CIDRMask(24, 32))
1520 func (cl *Client) clearAcceptLimits() {
1521 cl.acceptLimiter = nil
1524 func (cl *Client) acceptLimitClearer() {
1527 case <-cl.closed.Done():
1529 case <-time.After(15 * time.Minute):
1531 cl.clearAcceptLimits()
1537 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1538 if cl.config.DisableAcceptRateLimiting {
1541 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1544 func (cl *Client) rLock() {
1548 func (cl *Client) rUnlock() {
1552 func (cl *Client) lock() {
1556 func (cl *Client) unlock() {
1560 func (cl *Client) locker() *lockWithDeferreds {
1564 func (cl *Client) String() string {
1565 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1568 // Returns connection-level aggregate stats at the Client level. See the comment on
1569 // TorrentStats.ConnStats.
1570 func (cl *Client) ConnStats() ConnStats {
1571 return cl.stats.Copy()