17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/anacrolix/missinggo/slices"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed chansync.SetOnce
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() []string {
92 return cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() []string {
96 return slices.FromMapKeys(cl.badPeerIPs).([]string)
99 func (cl *Client) PeerID() PeerID {
103 // Returns the port number for the first listener that has one. No longer assumes that all port
104 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
106 func (cl *Client) LocalPort() (port int) {
107 cl.eachListener(func(l Listener) bool {
108 port = addrPortOrZero(l.Addr())
114 func writeDhtServerStatus(w io.Writer, s DhtServer) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, " ID: %x\n", s.ID())
117 spew.Fdump(w, dhtStats)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s DhtServer) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, &cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
151 "%f%% of %d bytes (%s)",
152 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
154 humanize.Bytes(uint64(*t.length)))
156 w.WriteString("<missing metainfo>")
164 // Filters things that are less than warning from UPnP discovery.
165 func upnpDiscoverLogFilter(m log.Msg) bool {
166 level, ok := m.GetLevel()
167 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
170 func (cl *Client) initLogger() {
171 logger := cl.config.Logger
174 if !cl.config.Debug {
175 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
178 cl.logger = logger.WithValues(cl)
181 func (cl *Client) announceKey() int32 {
182 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
185 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
187 cfg = NewDefaultClientConfig()
197 dopplegangerAddrs: make(map[string]struct{}),
198 torrents: make(map[metainfo.Hash]*Torrent),
199 dialRateLimiter: rate.NewLimiter(10, 10),
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 go cl.acceptLimitClearer()
210 cl.event.L = cl.locker()
211 storageImpl := cfg.DefaultStorage
212 if storageImpl == nil {
213 // We'd use mmap by default but HFS+ doesn't support sparse files.
214 storageImplCloser := storage.NewFile(cfg.DataDir)
215 cl.onClose = append(cl.onClose, func() {
216 if err := storageImplCloser.Close(); err != nil {
217 cl.logger.Printf("error closing default storage: %s", err)
220 storageImpl = storageImplCloser
222 cl.defaultStorage = storage.NewClient(storageImpl)
223 if cfg.IPBlocklist != nil {
224 cl.ipBlockList = cfg.IPBlocklist
227 if cfg.PeerID != "" {
228 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
230 o := copy(cl.peerID[:], cfg.Bep20)
231 _, err = rand.Read(cl.peerID[o:])
233 panic("error generating peer id")
237 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
245 for _, _s := range sockets {
246 s := _s // Go is fucking retarded.
247 cl.onClose = append(cl.onClose, func() { s.Close() })
248 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
249 cl.dialers = append(cl.dialers, s)
250 cl.listeners = append(cl.listeners, s)
251 if cl.config.AcceptPeerConnections {
252 go cl.acceptConnections(s)
259 for _, s := range sockets {
260 if pc, ok := s.(net.PacketConn); ok {
261 ds, err := cl.NewAnacrolixDhtServer(pc)
265 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
266 cl.onClose = append(cl.onClose, func() { ds.Close() })
271 cl.websocketTrackers = websocketTrackers{
274 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
277 t, ok := cl.torrents[infoHash]
279 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
281 return t.announceRequest(event), nil
283 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
286 t, ok := cl.torrents[dcc.InfoHash]
288 cl.logger.WithDefaultLevel(log.Warning).Printf(
289 "got webrtc conn for unloaded torrent with infohash %x",
295 go t.onWebRtcConn(dc, dcc)
304 func (cl *Client) AddDhtServer(d DhtServer) {
305 cl.dhtServers = append(cl.dhtServers, d)
308 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
309 // given address for any Torrent.
310 func (cl *Client) AddDialer(d Dialer) {
313 cl.dialers = append(cl.dialers, d)
314 for _, t := range cl.torrents {
319 func (cl *Client) Listeners() []Listener {
323 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
325 func (cl *Client) AddListener(l Listener) {
326 cl.listeners = append(cl.listeners, l)
327 if cl.config.AcceptPeerConnections {
328 go cl.acceptConnections(l)
332 func (cl *Client) firewallCallback(net.Addr) bool {
334 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
337 torrent.Add("connections firewalled", 1)
339 torrent.Add("connections not firewalled", 1)
344 func (cl *Client) listenOnNetwork(n network) bool {
345 if n.Ipv4 && cl.config.DisableIPv4 {
348 if n.Ipv6 && cl.config.DisableIPv6 {
351 if n.Tcp && cl.config.DisableTCP {
354 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
360 func (cl *Client) listenNetworks() (ns []network) {
361 for _, n := range allPeerNetworks {
362 if cl.listenOnNetwork(n) {
369 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
370 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
371 cfg := dht.ServerConfig{
372 IPBlocklist: cl.ipBlockList,
374 OnAnnouncePeer: cl.onDHTAnnouncePeer,
375 PublicIP: func() net.IP {
376 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
377 return cl.config.PublicIp6
379 return cl.config.PublicIp4
381 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
382 OnQuery: cl.config.DHTOnQuery,
383 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
385 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
388 s, err = dht.NewServer(&cfg)
391 ts, err := s.Bootstrap()
393 cl.logger.Printf("error bootstrapping dht: %s", err)
395 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
401 func (cl *Client) Closed() chansync.Done {
402 return cl.closed.Done()
405 func (cl *Client) eachDhtServer(f func(DhtServer)) {
406 for _, ds := range cl.dhtServers {
411 // Stops the client. All connections to peers are closed and all activity will
413 func (cl *Client) Close() {
417 for _, t := range cl.torrents {
420 for i := range cl.onClose {
421 cl.onClose[len(cl.onClose)-1-i]()
426 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
427 if cl.ipBlockList == nil {
430 return cl.ipBlockList.Lookup(ip)
433 func (cl *Client) ipIsBlocked(ip net.IP) bool {
434 _, blocked := cl.ipBlockRange(ip)
438 func (cl *Client) wantConns() bool {
439 for _, t := range cl.torrents {
447 func (cl *Client) waitAccept() {
449 if cl.closed.IsSet() {
459 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
460 func (cl *Client) rejectAccepted(conn net.Conn) error {
462 return errors.New("don't want conns right now")
464 ra := conn.RemoteAddr()
465 if rip := addrIpOrNil(ra); rip != nil {
466 if cl.config.DisableIPv4Peers && rip.To4() != nil {
467 return errors.New("ipv4 peers disabled")
469 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
470 return errors.New("ipv4 disabled")
473 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
474 return errors.New("ipv6 disabled")
476 if cl.rateLimitAccept(rip) {
477 return errors.New("source IP accepted rate limited")
479 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
480 return errors.New("bad source addr")
486 func (cl *Client) acceptConnections(l Listener) {
488 conn, err := l.Accept()
489 torrent.Add("client listener accepts", 1)
490 conn = pproffd.WrapNetConn(conn)
492 closed := cl.closed.IsSet()
495 reject = cl.rejectAccepted(conn)
505 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
510 torrent.Add("rejected accepted connections", 1)
511 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
514 go cl.incomingConnection(conn)
516 log.Fmsg("accepted %q connection at %q from %q",
520 ).SetLevel(log.Debug).Log(cl.logger)
521 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
522 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
523 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
528 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
529 func regularNetConnPeerConnConnString(nc net.Conn) string {
530 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
533 func (cl *Client) incomingConnection(nc net.Conn) {
535 if tc, ok := nc.(*net.TCPConn); ok {
538 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
539 regularNetConnPeerConnConnString(nc))
541 c.Discovery = PeerSourceIncoming
542 cl.runReceivedConn(c)
545 // Returns a handle to the given torrent, if it's present in the client.
546 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
549 t, ok = cl.torrents[ih]
553 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
554 return cl.torrents[ih]
557 type DialResult struct {
562 func countDialResult(err error) {
564 torrent.Add("successful dials", 1)
566 torrent.Add("unsuccessful dials", 1)
570 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
571 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
572 if ret < minDialTimeout {
578 // Returns whether an address is known to connect to a client with our own ID.
579 func (cl *Client) dopplegangerAddr(addr string) bool {
580 _, ok := cl.dopplegangerAddrs[addr]
584 // Returns a connection over UTP or TCP, whichever is first to connect.
585 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
586 return DialFirst(ctx, addr, cl.dialers)
589 // Returns a connection over UTP or TCP, whichever is first to connect.
590 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
592 t := perf.NewTimer(perf.CallerName(0))
595 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
597 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
601 ctx, cancel := context.WithCancel(ctx)
602 // As soon as we return one connection, cancel the others.
605 resCh := make(chan DialResult, left)
606 for _, _s := range dialers {
611 dialFromSocket(ctx, s, addr),
616 // Wait for a successful connection.
618 defer perf.ScopeTimer()()
619 for ; left > 0 && res.Conn == nil; left-- {
623 // There are still incompleted dials.
625 for ; left > 0; left-- {
626 conn := (<-resCh).Conn
633 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
638 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
639 c, err := s.Dial(ctx, addr)
640 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
641 // it now in case we close the connection forthwith.
642 if tc, ok := c.(*net.TCPConn); ok {
649 func forgettableDialError(err error) bool {
650 return strings.Contains(err.Error(), "no suitable address found")
653 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
654 if _, ok := t.halfOpen[addr]; !ok {
655 panic("invariant broken")
657 delete(t.halfOpen, addr)
659 for _, t := range cl.torrents {
664 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
665 // for valid reasons.
666 func (cl *Client) initiateProtocolHandshakes(
670 outgoing, encryptHeader bool,
671 remoteAddr PeerRemoteAddr,
672 network, connString string,
674 c *PeerConn, err error,
676 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
677 c.headerEncrypted = encryptHeader
678 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
680 dl, ok := ctx.Deadline()
684 err = nc.SetDeadline(dl)
688 err = cl.initiateHandshakes(c, t)
692 // Returns nil connection and nil error if no connection could be established for valid reasons.
693 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
694 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
697 return t.dialTimeout()
700 dr := cl.dialFirst(dialCtx, addr.String())
703 if dialCtx.Err() != nil {
704 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
706 return nil, errors.New("dial failed")
708 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
715 // Returns nil connection and nil error if no connection could be established
716 // for valid reasons.
717 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
718 torrent.Add("establish outgoing connection", 1)
719 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
720 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
722 torrent.Add("initiated conn with preferred header obfuscation", 1)
725 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
726 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
727 // We should have just tried with the preferred header obfuscation. If it was required,
728 // there's nothing else to try.
731 // Try again with encryption if we didn't earlier, or without if we did.
732 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with fallback header obfuscation", 1)
736 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
740 // Called to dial out and run a connection. The addr we're given is already
741 // considered half-open.
742 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
743 cl.dialRateLimiter.Wait(context.Background())
744 c, err := cl.establishOutgoingConn(t, addr)
747 // Don't release lock between here and addPeerConn, unless it's for
749 cl.noLongerHalfOpen(t, addr.String())
752 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
759 t.runHandshookConnLoggingErr(c)
762 // The port number for incoming peer connections. 0 if the client isn't listening.
763 func (cl *Client) incomingPeerPort() int {
764 return cl.LocalPort()
767 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
768 if c.headerEncrypted {
771 rw, c.cryptoMethod, err = mse.InitiateHandshake(
778 cl.config.CryptoProvides,
782 return fmt.Errorf("header obfuscation handshake: %w", err)
785 ih, err := cl.connBtHandshake(c, &t.infoHash)
787 return fmt.Errorf("bittorrent protocol handshake: %w", err)
789 if ih != t.infoHash {
790 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
795 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
796 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
797 func (cl *Client) forSkeys(f func([]byte) bool) {
800 if false { // Emulate the bug from #114
802 for ih := range cl.torrents {
806 for range cl.torrents {
813 for ih := range cl.torrents {
820 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
821 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
827 // Do encryption and bittorrent handshakes as receiver.
828 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
829 defer perf.ScopeTimerErr(&err)()
831 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
833 if err == nil || err == mse.ErrNoSecretKeyMatch {
834 if c.headerEncrypted {
835 torrent.Add("handshakes received encrypted", 1)
837 torrent.Add("handshakes received unencrypted", 1)
840 torrent.Add("handshakes received with error while handling encryption", 1)
843 if err == mse.ErrNoSecretKeyMatch {
848 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
849 err = errors.New("connection does not have required header obfuscation")
852 ih, err := cl.connBtHandshake(c, nil)
854 err = xerrors.Errorf("during bt handshake: %w", err)
863 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
864 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
869 c.PeerExtensionBytes = res.PeerExtensionBits
870 c.PeerID = res.PeerID
871 c.completedHandshake = time.Now()
872 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
878 func (cl *Client) runReceivedConn(c *PeerConn) {
879 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
883 t, err := cl.receiveHandshakes(c)
886 "error receiving handshakes on %v: %s", c, err,
887 ).SetLevel(log.Debug).
889 "network", c.Network,
891 torrent.Add("error receiving handshake", 1)
893 cl.onBadAccept(c.RemoteAddr)
898 torrent.Add("received handshake for unloaded torrent", 1)
899 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
901 cl.onBadAccept(c.RemoteAddr)
905 torrent.Add("received handshake for loaded torrent", 1)
908 t.runHandshookConnLoggingErr(c)
911 // Client lock must be held before entering this.
912 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
914 if c.PeerID == cl.peerID {
917 addr := c.conn.RemoteAddr().String()
918 cl.dopplegangerAddrs[addr] = struct{}{}
920 // Because the remote address is not necessarily the same as its client's torrent listen
921 // address, we won't record the remote address as a doppleganger. Instead, the initiator
922 // can record *us* as the doppleganger.
924 return errors.New("local and remote peer ids are the same")
926 c.conn.SetWriteDeadline(time.Time{})
927 c.r = deadlineReader{c.conn, c.r}
928 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
929 if connIsIpv6(c.conn) {
930 torrent.Add("completed handshake over ipv6", 1)
932 if err := t.addPeerConn(c); err != nil {
933 return fmt.Errorf("adding connection: %w", err)
935 defer t.dropConnection(c)
937 cl.sendInitialMessages(c, t)
938 err := c.mainReadLoop()
940 return fmt.Errorf("main read loop: %w", err)
945 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
946 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
947 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
948 const localClientReqq = 1 << 5
950 // See the order given in Transmission's tr_peerMsgsNew.
951 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
952 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
953 conn.write(pp.Message{
955 ExtendedID: pp.HandshakeExtendedID,
956 ExtendedPayload: func() []byte {
957 msg := pp.ExtendedHandshakeMessage{
958 M: map[pp.ExtensionName]pp.ExtensionNumber{
959 pp.ExtensionNameMetadata: metadataExtendedId,
961 V: cl.config.ExtendedHandshakeClientVersion,
962 Reqq: localClientReqq,
963 YourIp: pp.CompactIp(conn.remoteIp()),
964 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
965 Port: cl.incomingPeerPort(),
966 MetadataSize: torrent.metadataSize(),
967 // TODO: We can figured these out specific to the socket
969 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
970 Ipv6: cl.config.PublicIp6.To16(),
972 if !cl.config.DisablePEX {
973 msg.M[pp.ExtensionNamePex] = pexExtendedId
975 return bencode.MustMarshal(msg)
980 if conn.fastEnabled() {
981 if torrent.haveAllPieces() {
982 conn.write(pp.Message{Type: pp.HaveAll})
983 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
985 } else if !torrent.haveAnyPieces() {
986 conn.write(pp.Message{Type: pp.HaveNone})
987 conn.sentHaves.Clear()
993 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
994 conn.write(pp.Message{
1001 func (cl *Client) dhtPort() (ret uint16) {
1002 cl.eachDhtServer(func(s DhtServer) {
1003 ret = uint16(missinggo.AddrPort(s.Addr()))
1008 func (cl *Client) haveDhtServer() (ret bool) {
1009 cl.eachDhtServer(func(_ DhtServer) {
1015 // Process incoming ut_metadata message.
1016 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1017 var d pp.ExtendedMetadataRequestMsg
1018 err := bencode.Unmarshal(payload, &d)
1019 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1020 } else if err != nil {
1021 return fmt.Errorf("error unmarshalling bencode: %s", err)
1025 case pp.DataMetadataExtensionMsgType:
1026 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1027 if !c.requestedMetadataPiece(piece) {
1028 return fmt.Errorf("got unexpected piece %d", piece)
1030 c.metadataRequests[piece] = false
1031 begin := len(payload) - d.PieceSize()
1032 if begin < 0 || begin >= len(payload) {
1033 return fmt.Errorf("data has bad offset in payload: %d", begin)
1035 t.saveMetadataPiece(piece, payload[begin:])
1036 c.lastUsefulChunkReceived = time.Now()
1037 err = t.maybeCompleteMetadata()
1039 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1040 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1041 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1042 // log consumers can filter for this message.
1043 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1046 case pp.RequestMetadataExtensionMsgType:
1047 if !t.haveMetadataPiece(piece) {
1048 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1051 start := (1 << 14) * piece
1052 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1053 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1055 case pp.RejectMetadataExtensionMsgType:
1058 return errors.New("unknown msg_type value")
1062 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1063 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1064 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1069 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1073 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1076 if _, ok := cl.ipBlockRange(ip); ok {
1079 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1085 // Return a Torrent ready for insertion into a Client.
1086 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1087 // use provided storage, if provided
1088 storageClient := cl.defaultStorage
1089 if specStorage != nil {
1090 storageClient = storage.NewClient(specStorage)
1096 peers: prioritizedPeers{
1098 getPrio: func(p PeerInfo) peerPriority {
1100 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1103 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1105 halfOpen: make(map[string]PeerInfo),
1106 pieceStateChanges: pubsub.NewPubSub(),
1108 storageOpener: storageClient,
1109 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1111 networkingEnabled: true,
1112 metadataChanged: sync.Cond{
1115 webSeeds: make(map[string]*Peer),
1117 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1118 t.logger = cl.logger.WithContextValue(t)
1119 t.setChunkSize(defaultChunkSize)
1123 // A file-like handle to some torrent data resource.
1124 type Handle interface {
1131 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1132 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1135 // Adds a torrent by InfoHash with a custom Storage implementation.
1136 // If the torrent already exists then this Storage is ignored and the
1137 // existing torrent returned with `new` set to `false`
1138 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1141 t, ok := cl.torrents[infoHash]
1147 t = cl.newTorrent(infoHash, specStorage)
1148 cl.eachDhtServer(func(s DhtServer) {
1149 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1150 go t.dhtAnnouncer(s)
1153 cl.torrents[infoHash] = t
1154 cl.clearAcceptLimits()
1155 t.updateWantPeersEvent()
1156 // Tickle Client.waitAccept, new torrent may want conns.
1157 cl.event.Broadcast()
1161 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1162 // Torrent.MergeSpec.
1163 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1164 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1165 err = t.MergeSpec(spec)
1166 if err != nil && new {
1172 type stringAddr string
1174 var _ net.Addr = stringAddr("")
1176 func (stringAddr) Network() string { return "" }
1177 func (me stringAddr) String() string { return string(me) }
1179 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1180 // spec.DisallowDataDownload/Upload will be read and applied
1181 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1182 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1183 if spec.DisplayName != "" {
1184 t.SetDisplayName(spec.DisplayName)
1186 if spec.InfoBytes != nil {
1187 err := t.SetInfoBytes(spec.InfoBytes)
1193 cl.AddDhtNodes(spec.DhtNodes)
1196 useTorrentSources(spec.Sources, t)
1197 for _, url := range spec.Webseeds {
1200 for _, peerAddr := range spec.PeerAddrs {
1202 Addr: stringAddr(peerAddr),
1203 Source: PeerSourceDirect,
1207 if spec.ChunkSize != 0 {
1208 t.setChunkSize(pp.Integer(spec.ChunkSize))
1210 t.addTrackers(spec.Trackers)
1212 t.dataDownloadDisallowed = spec.DisallowDataDownload
1213 t.dataUploadDisallowed = spec.DisallowDataUpload
1217 func useTorrentSources(sources []string, t *Torrent) {
1218 for _, s := range sources {
1220 err := useTorrentSource(s, t)
1222 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1224 t.logger.Printf("successfully used source %q", s)
1230 func useTorrentSource(source string, t *Torrent) error {
1231 req, err := http.NewRequest(http.MethodGet, source, nil)
1235 ctx, cancel := context.WithCancel(context.Background())
1245 req = req.WithContext(ctx)
1246 resp, err := http.DefaultClient.Do(req)
1250 mi, err := metainfo.Load(resp.Body)
1252 if ctx.Err() != nil {
1257 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1260 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1261 t, ok := cl.torrents[infoHash]
1263 err = fmt.Errorf("no such torrent")
1270 delete(cl.torrents, infoHash)
1274 func (cl *Client) allTorrentsCompleted() bool {
1275 for _, t := range cl.torrents {
1279 if !t.haveAllPieces() {
1286 // Returns true when all torrents are completely downloaded and false if the
1287 // client is stopped before that.
1288 func (cl *Client) WaitAll() bool {
1291 for !cl.allTorrentsCompleted() {
1292 if cl.closed.IsSet() {
1300 // Returns handles to all the torrents loaded in the Client.
1301 func (cl *Client) Torrents() []*Torrent {
1304 return cl.torrentsAsSlice()
1307 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1308 for _, t := range cl.torrents {
1309 ret = append(ret, t)
1314 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1315 spec, err := TorrentSpecFromMagnetUri(uri)
1319 T, _, err = cl.AddTorrentSpec(spec)
1323 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1324 ts, err := TorrentSpecFromMetaInfoErr(mi)
1328 T, _, err = cl.AddTorrentSpec(ts)
1332 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1333 mi, err := metainfo.LoadFromFile(filename)
1337 return cl.AddTorrent(mi)
1340 func (cl *Client) DhtServers() []DhtServer {
1341 return cl.dhtServers
1344 func (cl *Client) AddDhtNodes(nodes []string) {
1345 for _, n := range nodes {
1346 hmp := missinggo.SplitHostMaybePort(n)
1347 ip := net.ParseIP(hmp.Host)
1349 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1352 ni := krpc.NodeInfo{
1353 Addr: krpc.NodeAddr{
1358 cl.eachDhtServer(func(s DhtServer) {
1364 func (cl *Client) banPeerIP(ip net.IP) {
1365 cl.logger.Printf("banning ip %v", ip)
1366 if cl.badPeerIPs == nil {
1367 cl.badPeerIPs = make(map[string]struct{})
1369 cl.badPeerIPs[ip.String()] = struct{}{}
1372 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1381 PeerMaxRequests: 250,
1383 RemoteAddr: remoteAddr,
1385 callbacks: &cl.config.Callbacks,
1387 connString: connString,
1391 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1392 c.setRW(connStatsReadWriter{nc, c})
1393 c.r = &rateLimitedReader{
1394 l: cl.config.DownloadRateLimiter,
1397 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1398 for _, f := range cl.config.Callbacks.NewPeer {
1404 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1411 t.addPeers([]PeerInfo{{
1412 Addr: ipPortAddr{ip, port},
1413 Source: PeerSourceDhtAnnouncePeer,
1417 func firstNotNil(ips ...net.IP) net.IP {
1418 for _, ip := range ips {
1426 func (cl *Client) eachDialer(f func(Dialer) bool) {
1427 for _, s := range cl.dialers {
1434 func (cl *Client) eachListener(f func(Listener) bool) {
1435 for _, s := range cl.listeners {
1442 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1443 cl.eachListener(func(l Listener) bool {
1450 func (cl *Client) publicIp(peer net.IP) net.IP {
1451 // TODO: Use BEP 10 to determine how peers are seeing us.
1452 if peer.To4() != nil {
1454 cl.config.PublicIp4,
1455 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1460 cl.config.PublicIp6,
1461 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1465 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1466 l := cl.findListener(
1467 func(l Listener) bool {
1468 return f(addrIpOrNil(l.Addr()))
1474 return addrIpOrNil(l.Addr())
1477 // Our IP as a peer should see it.
1478 func (cl *Client) publicAddr(peer net.IP) IpPort {
1479 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1482 // ListenAddrs addresses currently being listened to.
1483 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1486 cl.eachListener(func(l Listener) bool {
1487 ret = append(ret, l.Addr())
1493 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1494 ipa, ok := tryIpPortFromNetAddr(addr)
1498 ip := maskIpForAcceptLimiting(ipa.IP)
1499 if cl.acceptLimiter == nil {
1500 cl.acceptLimiter = make(map[ipStr]int)
1502 cl.acceptLimiter[ipStr(ip.String())]++
1505 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1506 if ip4 := ip.To4(); ip4 != nil {
1507 return ip4.Mask(net.CIDRMask(24, 32))
1512 func (cl *Client) clearAcceptLimits() {
1513 cl.acceptLimiter = nil
1516 func (cl *Client) acceptLimitClearer() {
1519 case <-cl.closed.Done():
1521 case <-time.After(15 * time.Minute):
1523 cl.clearAcceptLimits()
1529 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1530 if cl.config.DisableAcceptRateLimiting {
1533 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1536 func (cl *Client) rLock() {
1540 func (cl *Client) rUnlock() {
1544 func (cl *Client) lock() {
1548 func (cl *Client) unlock() {
1552 func (cl *Client) locker() *lockWithDeferreds {
1556 func (cl *Client) String() string {
1557 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1560 // Returns connection-level aggregate stats at the Client level. See the comment on
1561 // TorrentStats.ConnStats.
1562 func (cl *Client) ConnStats() ConnStats {
1563 return cl.stats.Copy()