17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/missinggo/slices"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() []string {
92 return cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() []string {
96 return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 func (cl *Client) PeerID() PeerID {
103 // Returns the port number for the first listener that has one. No longer assumes that all port
104 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
106 func (cl *Client) LocalPort() (port int) {
107 cl.eachListener(func(l Listener) bool {
108 port = addrPortOrZero(l.Addr())
114 func writeDhtServerStatus(w io.Writer, s DhtServer) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, " ID: %x\n", s.ID())
117 spew.Fdump(w, dhtStats)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s DhtServer) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, &cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
151 "%f%% of %d bytes (%s)",
152 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
154 humanize.Bytes(uint64(*t.length)))
156 w.WriteString("<missing metainfo>")
164 // Filters things that are less than warning from UPnP discovery.
165 func upnpDiscoverLogFilter(m log.Msg) bool {
166 level, ok := m.GetLevel()
167 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 func (cl *Client) initLogger() {
171 logger := cl.config.Logger
174 if !cl.config.Debug {
175 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178 cl.logger = logger.WithValues(cl)
181 func (cl *Client) announceKey() int32 {
182 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
187 cfg = NewDefaultClientConfig()
197 dopplegangerAddrs: make(map[string]struct{}),
198 torrents: make(map[metainfo.Hash]*Torrent),
199 dialRateLimiter: rate.NewLimiter(10, 10),
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 go cl.acceptLimitClearer()
210 cl.event.L = cl.locker()
211 storageImpl := cfg.DefaultStorage
212 if storageImpl == nil {
213 // We'd use mmap by default but HFS+ doesn't support sparse files.
214 storageImplCloser := storage.NewFile(cfg.DataDir)
215 cl.onClose = append(cl.onClose, func() {
216 if err := storageImplCloser.Close(); err != nil {
217 cl.logger.Printf("error closing default storage: %s", err)
220 storageImpl = storageImplCloser
222 cl.defaultStorage = storage.NewClient(storageImpl)
223 if cfg.IPBlocklist != nil {
224 cl.ipBlockList = cfg.IPBlocklist
227 if cfg.PeerID != "" {
228 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230 o := copy(cl.peerID[:], cfg.Bep20)
231 _, err = rand.Read(cl.peerID[o:])
233 panic("error generating peer id")
237 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
245 for _, _s := range sockets {
246 s := _s // Go is fucking retarded.
247 cl.onClose = append(cl.onClose, func() { s.Close() })
248 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
249 cl.dialers = append(cl.dialers, s)
250 cl.listeners = append(cl.listeners, s)
251 if cl.config.AcceptPeerConnections {
252 go cl.acceptConnections(s)
259 for _, s := range sockets {
260 if pc, ok := s.(net.PacketConn); ok {
261 ds, err := cl.NewAnacrolixDhtServer(pc)
265 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
266 cl.onClose = append(cl.onClose, func() { ds.Close() })
271 cl.websocketTrackers = websocketTrackers{
274 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
277 t, ok := cl.torrents[infoHash]
279 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
281 return t.announceRequest(event), nil
283 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
286 t, ok := cl.torrents[dcc.InfoHash]
288 cl.logger.WithDefaultLevel(log.Warning).Printf(
289 "got webrtc conn for unloaded torrent with infohash %x",
295 go t.onWebRtcConn(dc, dcc)
304 func (cl *Client) AddDhtServer(d DhtServer) {
305 cl.dhtServers = append(cl.dhtServers, d)
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
313 cl.dialers = append(cl.dialers, d)
314 for _, t := range cl.torrents {
319 func (cl *Client) Listeners() []Listener {
323 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
325 func (cl *Client) AddListener(l Listener) {
326 cl.listeners = append(cl.listeners, l)
327 if cl.config.AcceptPeerConnections {
328 go cl.acceptConnections(l)
332 func (cl *Client) firewallCallback(net.Addr) bool {
334 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
337 torrent.Add("connections firewalled", 1)
339 torrent.Add("connections not firewalled", 1)
344 func (cl *Client) listenOnNetwork(n network) bool {
345 if n.Ipv4 && cl.config.DisableIPv4 {
348 if n.Ipv6 && cl.config.DisableIPv6 {
351 if n.Tcp && cl.config.DisableTCP {
354 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
360 func (cl *Client) listenNetworks() (ns []network) {
361 for _, n := range allPeerNetworks {
362 if cl.listenOnNetwork(n) {
369 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
370 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
371 cfg := dht.ServerConfig{
372 IPBlocklist: cl.ipBlockList,
374 OnAnnouncePeer: cl.onDHTAnnouncePeer,
375 PublicIP: func() net.IP {
376 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
377 return cl.config.PublicIp6
379 return cl.config.PublicIp4
381 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
382 OnQuery: cl.config.DHTOnQuery,
383 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
385 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
388 s, err = dht.NewServer(&cfg)
391 ts, err := s.Bootstrap()
393 cl.logger.Printf("error bootstrapping dht: %s", err)
395 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
401 func (cl *Client) Closed() <-chan struct{} {
407 func (cl *Client) eachDhtServer(f func(DhtServer)) {
408 for _, ds := range cl.dhtServers {
413 // Stops the client. All connections to peers are closed and all activity will
415 func (cl *Client) Close() {
419 for _, t := range cl.torrents {
422 for i := range cl.onClose {
423 cl.onClose[len(cl.onClose)-1-i]()
428 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
429 if cl.ipBlockList == nil {
432 return cl.ipBlockList.Lookup(ip)
435 func (cl *Client) ipIsBlocked(ip net.IP) bool {
436 _, blocked := cl.ipBlockRange(ip)
440 func (cl *Client) wantConns() bool {
441 for _, t := range cl.torrents {
449 func (cl *Client) waitAccept() {
451 if cl.closed.IsSet() {
461 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
462 func (cl *Client) rejectAccepted(conn net.Conn) error {
464 return errors.New("don't want conns right now")
466 ra := conn.RemoteAddr()
467 if rip := addrIpOrNil(ra); rip != nil {
468 if cl.config.DisableIPv4Peers && rip.To4() != nil {
469 return errors.New("ipv4 peers disabled")
471 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
472 return errors.New("ipv4 disabled")
475 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
476 return errors.New("ipv6 disabled")
478 if cl.rateLimitAccept(rip) {
479 return errors.New("source IP accepted rate limited")
481 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
482 return errors.New("bad source addr")
488 func (cl *Client) acceptConnections(l Listener) {
490 conn, err := l.Accept()
491 torrent.Add("client listener accepts", 1)
492 conn = pproffd.WrapNetConn(conn)
494 closed := cl.closed.IsSet()
497 reject = cl.rejectAccepted(conn)
507 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
512 torrent.Add("rejected accepted connections", 1)
513 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
516 go cl.incomingConnection(conn)
518 log.Fmsg("accepted %q connection at %q from %q",
522 ).SetLevel(log.Debug).Log(cl.logger)
523 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
524 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
525 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
530 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
531 func regularNetConnPeerConnConnString(nc net.Conn) string {
532 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
535 func (cl *Client) incomingConnection(nc net.Conn) {
537 if tc, ok := nc.(*net.TCPConn); ok {
540 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
541 regularNetConnPeerConnConnString(nc))
543 c.Discovery = PeerSourceIncoming
544 cl.runReceivedConn(c)
547 // Returns a handle to the given torrent, if it's present in the client.
548 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
551 t, ok = cl.torrents[ih]
555 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
556 return cl.torrents[ih]
559 type DialResult struct {
564 func countDialResult(err error) {
566 torrent.Add("successful dials", 1)
568 torrent.Add("unsuccessful dials", 1)
572 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
573 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
574 if ret < minDialTimeout {
580 // Returns whether an address is known to connect to a client with our own ID.
581 func (cl *Client) dopplegangerAddr(addr string) bool {
582 _, ok := cl.dopplegangerAddrs[addr]
586 // Returns a connection over UTP or TCP, whichever is first to connect.
587 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
588 return DialFirst(ctx, addr, cl.dialers)
591 // Returns a connection over UTP or TCP, whichever is first to connect.
592 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
594 t := perf.NewTimer(perf.CallerName(0))
597 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
599 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
603 ctx, cancel := context.WithCancel(ctx)
604 // As soon as we return one connection, cancel the others.
607 resCh := make(chan DialResult, left)
608 for _, _s := range dialers {
613 dialFromSocket(ctx, s, addr),
618 // Wait for a successful connection.
620 defer perf.ScopeTimer()()
621 for ; left > 0 && res.Conn == nil; left-- {
625 // There are still incompleted dials.
627 for ; left > 0; left-- {
628 conn := (<-resCh).Conn
635 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
640 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
641 c, err := s.Dial(ctx, addr)
642 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
643 // it now in case we close the connection forthwith.
644 if tc, ok := c.(*net.TCPConn); ok {
651 func forgettableDialError(err error) bool {
652 return strings.Contains(err.Error(), "no suitable address found")
655 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
656 if _, ok := t.halfOpen[addr]; !ok {
657 panic("invariant broken")
659 delete(t.halfOpen, addr)
661 for _, t := range cl.torrents {
666 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
667 // for valid reasons.
668 func (cl *Client) initiateProtocolHandshakes(
672 outgoing, encryptHeader bool,
673 remoteAddr PeerRemoteAddr,
674 network, connString string,
676 c *PeerConn, err error,
678 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
679 c.headerEncrypted = encryptHeader
680 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
682 dl, ok := ctx.Deadline()
686 err = nc.SetDeadline(dl)
690 err = cl.initiateHandshakes(c, t)
694 // Returns nil connection and nil error if no connection could be established for valid reasons.
695 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
696 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
699 return t.dialTimeout()
702 dr := cl.dialFirst(dialCtx, addr.String())
705 if dialCtx.Err() != nil {
706 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
708 return nil, errors.New("dial failed")
710 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
717 // Returns nil connection and nil error if no connection could be established
718 // for valid reasons.
719 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
720 torrent.Add("establish outgoing connection", 1)
721 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
722 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
724 torrent.Add("initiated conn with preferred header obfuscation", 1)
727 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
728 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
729 // We should have just tried with the preferred header obfuscation. If it was required,
730 // there's nothing else to try.
733 // Try again with encryption if we didn't earlier, or without if we did.
734 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
736 torrent.Add("initiated conn with fallback header obfuscation", 1)
738 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
742 // Called to dial out and run a connection. The addr we're given is already
743 // considered half-open.
744 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
745 cl.dialRateLimiter.Wait(context.Background())
746 c, err := cl.establishOutgoingConn(t, addr)
749 // Don't release lock between here and addPeerConn, unless it's for
751 cl.noLongerHalfOpen(t, addr.String())
754 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
761 t.runHandshookConnLoggingErr(c)
764 // The port number for incoming peer connections. 0 if the client isn't listening.
765 func (cl *Client) incomingPeerPort() int {
766 return cl.LocalPort()
769 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
770 if c.headerEncrypted {
773 rw, c.cryptoMethod, err = mse.InitiateHandshake(
780 cl.config.CryptoProvides,
784 return fmt.Errorf("header obfuscation handshake: %w", err)
787 ih, err := cl.connBtHandshake(c, &t.infoHash)
789 return fmt.Errorf("bittorrent protocol handshake: %w", err)
791 if ih != t.infoHash {
792 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
797 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
798 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
799 func (cl *Client) forSkeys(f func([]byte) bool) {
802 if false { // Emulate the bug from #114
804 for ih := range cl.torrents {
808 for range cl.torrents {
815 for ih := range cl.torrents {
822 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
823 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
829 // Do encryption and bittorrent handshakes as receiver.
830 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
831 defer perf.ScopeTimerErr(&err)()
833 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
835 if err == nil || err == mse.ErrNoSecretKeyMatch {
836 if c.headerEncrypted {
837 torrent.Add("handshakes received encrypted", 1)
839 torrent.Add("handshakes received unencrypted", 1)
842 torrent.Add("handshakes received with error while handling encryption", 1)
845 if err == mse.ErrNoSecretKeyMatch {
850 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
851 err = errors.New("connection does not have required header obfuscation")
854 ih, err := cl.connBtHandshake(c, nil)
856 err = xerrors.Errorf("during bt handshake: %w", err)
865 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
866 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
871 c.PeerExtensionBytes = res.PeerExtensionBits
872 c.PeerID = res.PeerID
873 c.completedHandshake = time.Now()
874 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
880 func (cl *Client) runReceivedConn(c *PeerConn) {
881 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
885 t, err := cl.receiveHandshakes(c)
888 "error receiving handshakes on %v: %s", c, err,
889 ).SetLevel(log.Debug).
891 "network", c.Network,
893 torrent.Add("error receiving handshake", 1)
895 cl.onBadAccept(c.RemoteAddr)
900 torrent.Add("received handshake for unloaded torrent", 1)
901 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
903 cl.onBadAccept(c.RemoteAddr)
907 torrent.Add("received handshake for loaded torrent", 1)
910 t.runHandshookConnLoggingErr(c)
913 // Client lock must be held before entering this.
914 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
916 if c.PeerID == cl.peerID {
919 addr := c.conn.RemoteAddr().String()
920 cl.dopplegangerAddrs[addr] = struct{}{}
922 // Because the remote address is not necessarily the same as its client's torrent listen
923 // address, we won't record the remote address as a doppleganger. Instead, the initiator
924 // can record *us* as the doppleganger.
926 return errors.New("local and remote peer ids are the same")
928 c.conn.SetWriteDeadline(time.Time{})
929 c.r = deadlineReader{c.conn, c.r}
930 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
931 if connIsIpv6(c.conn) {
932 torrent.Add("completed handshake over ipv6", 1)
934 if err := t.addPeerConn(c); err != nil {
935 return fmt.Errorf("adding connection: %w", err)
937 defer t.dropConnection(c)
939 cl.sendInitialMessages(c, t)
940 err := c.mainReadLoop()
942 return fmt.Errorf("main read loop: %w", err)
947 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
948 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
949 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
950 const localClientReqq = 1 << 5
952 // See the order given in Transmission's tr_peerMsgsNew.
953 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
954 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
955 conn.write(pp.Message{
957 ExtendedID: pp.HandshakeExtendedID,
958 ExtendedPayload: func() []byte {
959 msg := pp.ExtendedHandshakeMessage{
960 M: map[pp.ExtensionName]pp.ExtensionNumber{
961 pp.ExtensionNameMetadata: metadataExtendedId,
963 V: cl.config.ExtendedHandshakeClientVersion,
964 Reqq: localClientReqq,
965 YourIp: pp.CompactIp(conn.remoteIp()),
966 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
967 Port: cl.incomingPeerPort(),
968 MetadataSize: torrent.metadataSize(),
969 // TODO: We can figured these out specific to the socket
971 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
972 Ipv6: cl.config.PublicIp6.To16(),
974 if !cl.config.DisablePEX {
975 msg.M[pp.ExtensionNamePex] = pexExtendedId
977 return bencode.MustMarshal(msg)
982 if conn.fastEnabled() {
983 if torrent.haveAllPieces() {
984 conn.write(pp.Message{Type: pp.HaveAll})
985 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
987 } else if !torrent.haveAnyPieces() {
988 conn.write(pp.Message{Type: pp.HaveNone})
989 conn.sentHaves.Clear()
995 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
996 conn.write(pp.Message{
1003 func (cl *Client) dhtPort() (ret uint16) {
1004 cl.eachDhtServer(func(s DhtServer) {
1005 ret = uint16(missinggo.AddrPort(s.Addr()))
1010 func (cl *Client) haveDhtServer() (ret bool) {
1011 cl.eachDhtServer(func(_ DhtServer) {
1017 // Process incoming ut_metadata message.
1018 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1019 var d pp.ExtendedMetadataRequestMsg
1020 err := bencode.Unmarshal(payload, &d)
1021 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1022 } else if err != nil {
1023 return fmt.Errorf("error unmarshalling bencode: %s", err)
1027 case pp.DataMetadataExtensionMsgType:
1028 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1029 if !c.requestedMetadataPiece(piece) {
1030 return fmt.Errorf("got unexpected piece %d", piece)
1032 c.metadataRequests[piece] = false
1033 begin := len(payload) - d.PieceSize()
1034 if begin < 0 || begin >= len(payload) {
1035 return fmt.Errorf("data has bad offset in payload: %d", begin)
1037 t.saveMetadataPiece(piece, payload[begin:])
1038 c.lastUsefulChunkReceived = time.Now()
1039 err = t.maybeCompleteMetadata()
1041 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1042 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1043 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1044 // log consumers can filter for this message.
1045 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1048 case pp.RequestMetadataExtensionMsgType:
1049 if !t.haveMetadataPiece(piece) {
1050 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1053 start := (1 << 14) * piece
1054 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1055 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1057 case pp.RejectMetadataExtensionMsgType:
1060 return errors.New("unknown msg_type value")
1064 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1065 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1066 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1071 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1075 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1078 if _, ok := cl.ipBlockRange(ip); ok {
1081 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1087 // Return a Torrent ready for insertion into a Client.
1088 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1089 // use provided storage, if provided
1090 storageClient := cl.defaultStorage
1091 if specStorage != nil {
1092 storageClient = storage.NewClient(specStorage)
1098 peers: prioritizedPeers{
1100 getPrio: func(p PeerInfo) peerPriority {
1102 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1105 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1107 halfOpen: make(map[string]PeerInfo),
1108 pieceStateChanges: pubsub.NewPubSub(),
1110 storageOpener: storageClient,
1111 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1113 networkingEnabled: true,
1114 metadataChanged: sync.Cond{
1117 webSeeds: make(map[string]*Peer),
1119 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1120 t.logger = cl.logger.WithContextValue(t)
1121 t.setChunkSize(defaultChunkSize)
1125 // A file-like handle to some torrent data resource.
1126 type Handle interface {
1133 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1134 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1137 // Adds a torrent by InfoHash with a custom Storage implementation.
1138 // If the torrent already exists then this Storage is ignored and the
1139 // existing torrent returned with `new` set to `false`
1140 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1143 t, ok := cl.torrents[infoHash]
1149 t = cl.newTorrent(infoHash, specStorage)
1150 cl.eachDhtServer(func(s DhtServer) {
1151 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1152 go t.dhtAnnouncer(s)
1155 cl.torrents[infoHash] = t
1156 cl.clearAcceptLimits()
1157 t.updateWantPeersEvent()
1158 // Tickle Client.waitAccept, new torrent may want conns.
1159 cl.event.Broadcast()
1163 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1164 // Torrent.MergeSpec.
1165 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1166 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1167 err = t.MergeSpec(spec)
1168 if err != nil && new {
1174 type stringAddr string
1176 var _ net.Addr = stringAddr("")
1178 func (stringAddr) Network() string { return "" }
1179 func (me stringAddr) String() string { return string(me) }
1181 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1182 // spec.DisallowDataDownload/Upload will be read and applied
1183 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1184 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1185 if spec.DisplayName != "" {
1186 t.SetDisplayName(spec.DisplayName)
1188 if spec.InfoBytes != nil {
1189 err := t.SetInfoBytes(spec.InfoBytes)
1195 cl.AddDhtNodes(spec.DhtNodes)
1198 useTorrentSources(spec.Sources, t)
1199 for _, url := range spec.Webseeds {
1202 for _, peerAddr := range spec.PeerAddrs {
1204 Addr: stringAddr(peerAddr),
1205 Source: PeerSourceDirect,
1209 if spec.ChunkSize != 0 {
1210 t.setChunkSize(pp.Integer(spec.ChunkSize))
1212 t.addTrackers(spec.Trackers)
1214 t.dataDownloadDisallowed = spec.DisallowDataDownload
1215 t.dataUploadDisallowed = spec.DisallowDataUpload
1219 func useTorrentSources(sources []string, t *Torrent) {
1220 for _, s := range sources {
1222 err := useTorrentSource(s, t)
1224 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1226 t.logger.Printf("successfully used source %q", s)
1232 func useTorrentSource(source string, t *Torrent) error {
1233 req, err := http.NewRequest(http.MethodGet, source, nil)
1237 ctx, cancel := context.WithCancel(context.Background())
1247 req = req.WithContext(ctx)
1248 resp, err := http.DefaultClient.Do(req)
1252 mi, err := metainfo.Load(resp.Body)
1254 if ctx.Err() != nil {
1259 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1262 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1263 t, ok := cl.torrents[infoHash]
1265 err = fmt.Errorf("no such torrent")
1272 delete(cl.torrents, infoHash)
1276 func (cl *Client) allTorrentsCompleted() bool {
1277 for _, t := range cl.torrents {
1281 if !t.haveAllPieces() {
1288 // Returns true when all torrents are completely downloaded and false if the
1289 // client is stopped before that.
1290 func (cl *Client) WaitAll() bool {
1293 for !cl.allTorrentsCompleted() {
1294 if cl.closed.IsSet() {
1302 // Returns handles to all the torrents loaded in the Client.
1303 func (cl *Client) Torrents() []*Torrent {
1306 return cl.torrentsAsSlice()
1309 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1310 for _, t := range cl.torrents {
1311 ret = append(ret, t)
1316 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1317 spec, err := TorrentSpecFromMagnetUri(uri)
1321 T, _, err = cl.AddTorrentSpec(spec)
1325 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1326 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1330 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1331 mi, err := metainfo.LoadFromFile(filename)
1335 return cl.AddTorrent(mi)
1338 func (cl *Client) DhtServers() []DhtServer {
1339 return cl.dhtServers
1342 func (cl *Client) AddDhtNodes(nodes []string) {
1343 for _, n := range nodes {
1344 hmp := missinggo.SplitHostMaybePort(n)
1345 ip := net.ParseIP(hmp.Host)
1347 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1350 ni := krpc.NodeInfo{
1351 Addr: krpc.NodeAddr{
1356 cl.eachDhtServer(func(s DhtServer) {
1362 func (cl *Client) banPeerIP(ip net.IP) {
1363 cl.logger.Printf("banning ip %v", ip)
1364 if cl.badPeerIPs == nil {
1365 cl.badPeerIPs = make(map[string]struct{})
1367 cl.badPeerIPs[ip.String()] = struct{}{}
1370 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1379 PeerMaxRequests: 250,
1381 RemoteAddr: remoteAddr,
1383 callbacks: &cl.config.Callbacks,
1385 connString: connString,
1389 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1390 c.setRW(connStatsReadWriter{nc, c})
1391 c.r = &rateLimitedReader{
1392 l: cl.config.DownloadRateLimiter,
1395 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1396 for _, f := range cl.config.Callbacks.NewPeer {
1402 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1409 t.addPeers([]PeerInfo{{
1410 Addr: ipPortAddr{ip, port},
1411 Source: PeerSourceDhtAnnouncePeer,
1415 func firstNotNil(ips ...net.IP) net.IP {
1416 for _, ip := range ips {
1424 func (cl *Client) eachDialer(f func(Dialer) bool) {
1425 for _, s := range cl.dialers {
1432 func (cl *Client) eachListener(f func(Listener) bool) {
1433 for _, s := range cl.listeners {
1440 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1441 cl.eachListener(func(l Listener) bool {
1448 func (cl *Client) publicIp(peer net.IP) net.IP {
1449 // TODO: Use BEP 10 to determine how peers are seeing us.
1450 if peer.To4() != nil {
1452 cl.config.PublicIp4,
1453 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1458 cl.config.PublicIp6,
1459 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1463 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1464 l := cl.findListener(
1465 func(l Listener) bool {
1466 return f(addrIpOrNil(l.Addr()))
1472 return addrIpOrNil(l.Addr())
1475 // Our IP as a peer should see it.
1476 func (cl *Client) publicAddr(peer net.IP) IpPort {
1477 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1480 // ListenAddrs addresses currently being listened to.
1481 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1484 cl.eachListener(func(l Listener) bool {
1485 ret = append(ret, l.Addr())
1491 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1492 ipa, ok := tryIpPortFromNetAddr(addr)
1496 ip := maskIpForAcceptLimiting(ipa.IP)
1497 if cl.acceptLimiter == nil {
1498 cl.acceptLimiter = make(map[ipStr]int)
1500 cl.acceptLimiter[ipStr(ip.String())]++
1503 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1504 if ip4 := ip.To4(); ip4 != nil {
1505 return ip4.Mask(net.CIDRMask(24, 32))
1510 func (cl *Client) clearAcceptLimits() {
1511 cl.acceptLimiter = nil
1514 func (cl *Client) acceptLimitClearer() {
1517 case <-cl.closed.LockedChan(cl.locker()):
1519 case <-time.After(15 * time.Minute):
1521 cl.clearAcceptLimits()
1527 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1528 if cl.config.DisableAcceptRateLimiting {
1531 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1534 func (cl *Client) rLock() {
1538 func (cl *Client) rUnlock() {
1542 func (cl *Client) lock() {
1546 func (cl *Client) unlock() {
1550 func (cl *Client) locker() *lockWithDeferreds {
1554 func (cl *Client) String() string {
1555 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1558 // Returns connection-level aggregate stats at the Client level. See the comment on
1559 // TorrentStats.ConnStats.
1560 func (cl *Client) ConnStats() ConnStats {
1561 return cl.stats.Copy()