17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/davecgh/go-spew/spew"
27 "github.com/dustin/go-humanize"
28 "github.com/google/btree"
29 "golang.org/x/time/rate"
30 "golang.org/x/xerrors"
32 "github.com/anacrolix/missinggo/v2"
33 "github.com/anacrolix/missinggo/v2/conntrack"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/iplist"
37 "github.com/anacrolix/torrent/metainfo"
38 "github.com/anacrolix/torrent/mse"
39 pp "github.com/anacrolix/torrent/peer_protocol"
40 "github.com/anacrolix/torrent/storage"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
62 dhtServers []DhtServer
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 // Returns the port number for the first listener that has one. No longer assumes that all port
95 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l Listener) bool {
99 port = addrPortOrZero(l.Addr())
105 func writeDhtServerStatus(w io.Writer, s DhtServer) {
106 dhtStats := s.Stats()
107 fmt.Fprintf(w, " ID: %x\n", s.ID())
108 spew.Fdump(w, dhtStats)
111 // Writes out a human readable status of the client, such as for writing to a
113 func (cl *Client) WriteStatus(_w io.Writer) {
116 w := bufio.NewWriter(_w)
118 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
119 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
120 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
121 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
122 cl.eachDhtServer(func(s DhtServer) {
123 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
124 writeDhtServerStatus(w, s)
126 spew.Fdump(w, &cl.stats)
127 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
129 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
130 return l.InfoHash().AsString() < r.InfoHash().AsString()
133 fmt.Fprint(w, "<unknown name>")
135 fmt.Fprint(w, t.name())
139 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
141 w.WriteString("<missing metainfo>")
149 func (cl *Client) initLogger() {
150 cl.logger = cl.config.Logger.WithValues(cl)
151 if !cl.config.Debug {
152 cl.logger = cl.logger.FilterLevel(log.Info)
156 func (cl *Client) announceKey() int32 {
157 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
160 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
162 cfg = NewDefaultClientConfig()
172 dopplegangerAddrs: make(map[string]struct{}),
173 torrents: make(map[metainfo.Hash]*Torrent),
174 dialRateLimiter: rate.NewLimiter(10, 10),
176 go cl.acceptLimitClearer()
184 cl.extensionBytes = defaultPeerExtensionBytes()
185 cl.event.L = cl.locker()
186 storageImpl := cfg.DefaultStorage
187 if storageImpl == nil {
188 // We'd use mmap by default but HFS+ doesn't support sparse files.
189 storageImplCloser := storage.NewFile(cfg.DataDir)
190 cl.onClose = append(cl.onClose, func() {
191 if err := storageImplCloser.Close(); err != nil {
192 cl.logger.Printf("error closing default storage: %s", err)
195 storageImpl = storageImplCloser
197 cl.defaultStorage = storage.NewClient(storageImpl)
198 if cfg.IPBlocklist != nil {
199 cl.ipBlockList = cfg.IPBlocklist
202 if cfg.PeerID != "" {
203 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
205 o := copy(cl.peerID[:], cfg.Bep20)
206 _, err = rand.Read(cl.peerID[o:])
208 panic("error generating peer id")
212 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
220 for _, _s := range sockets {
221 s := _s // Go is fucking retarded.
222 cl.onClose = append(cl.onClose, func() { s.Close() })
223 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
224 cl.dialers = append(cl.dialers, s)
225 cl.listeners = append(cl.listeners, s)
226 go cl.acceptConnections(s)
232 for _, s := range sockets {
233 if pc, ok := s.(net.PacketConn); ok {
234 ds, err := cl.newAnacrolixDhtServer(pc)
238 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
239 cl.onClose = append(cl.onClose, func() { ds.Close() })
247 func (cl *Client) AddDhtServer(d DhtServer) {
248 cl.dhtServers = append(cl.dhtServers, d)
251 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
252 // given address for any Torrent.
253 func (cl *Client) AddDialer(d Dialer) {
256 cl.dialers = append(cl.dialers, d)
257 for _, t := range cl.torrents {
262 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
264 func (cl *Client) AddListener(l Listener) {
265 cl.listeners = append(cl.listeners, l)
266 go cl.acceptConnections(l)
269 func (cl *Client) firewallCallback(net.Addr) bool {
271 block := !cl.wantConns()
274 torrent.Add("connections firewalled", 1)
276 torrent.Add("connections not firewalled", 1)
281 func (cl *Client) listenOnNetwork(n network) bool {
282 if n.Ipv4 && cl.config.DisableIPv4 {
285 if n.Ipv6 && cl.config.DisableIPv6 {
288 if n.Tcp && cl.config.DisableTCP {
291 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
297 func (cl *Client) listenNetworks() (ns []network) {
298 for _, n := range allPeerNetworks {
299 if cl.listenOnNetwork(n) {
306 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
307 cfg := dht.ServerConfig{
308 IPBlocklist: cl.ipBlockList,
310 OnAnnouncePeer: cl.onDHTAnnouncePeer,
311 PublicIP: func() net.IP {
312 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
313 return cl.config.PublicIp6
315 return cl.config.PublicIp4
317 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
318 ConnectionTracking: cl.config.ConnTracker,
319 OnQuery: cl.config.DHTOnQuery,
320 Logger: cl.logger.WithText(func(m log.Msg) string {
321 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
324 s, err = dht.NewServer(&cfg)
327 ts, err := s.Bootstrap()
329 cl.logger.Printf("error bootstrapping dht: %s", err)
331 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
337 func (cl *Client) Closed() <-chan struct{} {
343 func (cl *Client) eachDhtServer(f func(DhtServer)) {
344 for _, ds := range cl.dhtServers {
349 // Stops the client. All connections to peers are closed and all activity will
351 func (cl *Client) Close() {
355 for _, t := range cl.torrents {
358 for i := range cl.onClose {
359 cl.onClose[len(cl.onClose)-1-i]()
364 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
365 if cl.ipBlockList == nil {
368 return cl.ipBlockList.Lookup(ip)
371 func (cl *Client) ipIsBlocked(ip net.IP) bool {
372 _, blocked := cl.ipBlockRange(ip)
376 func (cl *Client) wantConns() bool {
377 for _, t := range cl.torrents {
385 func (cl *Client) waitAccept() {
387 if cl.closed.IsSet() {
397 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
398 func (cl *Client) rejectAccepted(conn net.Conn) error {
399 ra := conn.RemoteAddr()
400 if rip := addrIpOrNil(ra); rip != nil {
401 if cl.config.DisableIPv4Peers && rip.To4() != nil {
402 return errors.New("ipv4 peers disabled")
404 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
405 return errors.New("ipv4 disabled")
408 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
409 return errors.New("ipv6 disabled")
411 if cl.rateLimitAccept(rip) {
412 return errors.New("source IP accepted rate limited")
414 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
415 return errors.New("bad source addr")
421 func (cl *Client) acceptConnections(l net.Listener) {
423 conn, err := l.Accept()
424 torrent.Add("client listener accepts", 1)
425 conn = pproffd.WrapNetConn(conn)
427 closed := cl.closed.IsSet()
430 reject = cl.rejectAccepted(conn)
440 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
445 torrent.Add("rejected accepted connections", 1)
446 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
449 go cl.incomingConnection(conn)
451 log.Fmsg("accepted %q connection at %q from %q",
455 ).SetLevel(log.Debug).Log(cl.logger)
456 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
457 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
458 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
463 func regularConnString(nc net.Conn) string {
464 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
467 func (cl *Client) incomingConnection(nc net.Conn) {
469 if tc, ok := nc.(*net.TCPConn); ok {
472 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
473 regularConnString(nc))
474 c.Discovery = PeerSourceIncoming
475 cl.runReceivedConn(c)
478 // Returns a handle to the given torrent, if it's present in the client.
479 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
482 t, ok = cl.torrents[ih]
486 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
487 return cl.torrents[ih]
490 type dialResult struct {
495 func countDialResult(err error) {
497 torrent.Add("successful dials", 1)
499 torrent.Add("unsuccessful dials", 1)
503 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
504 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
505 if ret < minDialTimeout {
511 // Returns whether an address is known to connect to a client with our own ID.
512 func (cl *Client) dopplegangerAddr(addr string) bool {
513 _, ok := cl.dopplegangerAddrs[addr]
517 // Returns a connection over UTP or TCP, whichever is first to connect.
518 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
520 t := perf.NewTimer(perf.CallerName(0))
523 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
525 t.Mark("returned conn over " + res.Network)
529 ctx, cancel := context.WithCancel(ctx)
530 // As soon as we return one connection, cancel the others.
533 resCh := make(chan dialResult, left)
537 cl.eachDialer(func(s Dialer) bool {
540 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
543 cl.dialFromSocket(ctx, s, addr),
544 s.LocalAddr().Network(),
551 // Wait for a successful connection.
553 defer perf.ScopeTimer()()
554 for ; left > 0 && res.Conn == nil; left-- {
558 // There are still incompleted dials.
560 for ; left > 0; left-- {
561 conn := (<-resCh).Conn
568 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
570 //if res.Conn != nil {
571 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
573 // cl.logger.Printf("failed to dial %s", addr)
578 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
579 network := s.LocalAddr().Network()
580 cte := cl.config.ConnTracker.Wait(
582 conntrack.Entry{network, s.LocalAddr().String(), addr},
583 "dial torrent client",
586 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
587 // which dial errors allow us to forget the connection tracking entry handle.
588 if ctx.Err() != nil {
594 c, err := s.Dial(ctx, addr)
595 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
596 // it now in case we close the connection forthwith.
597 if tc, ok := c.(*net.TCPConn); ok {
602 if err != nil && forgettableDialError(err) {
609 return closeWrapper{c, func() error {
616 func forgettableDialError(err error) bool {
617 return strings.Contains(err.Error(), "no suitable address found")
620 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
621 if _, ok := t.halfOpen[addr]; !ok {
622 panic("invariant broken")
624 delete(t.halfOpen, addr)
628 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
629 // for valid reasons.
630 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr,
631 network, connString string,
632 ) (c *PeerConn, err error) {
633 c = cl.newConnection(nc, true, remoteAddr, network, connString)
634 c.headerEncrypted = encryptHeader
635 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
637 dl, ok := ctx.Deadline()
641 err = nc.SetDeadline(dl)
645 err = cl.initiateHandshakes(c, t)
649 // Returns nil connection and nil error if no connection could be established for valid reasons.
650 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
651 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
654 return t.dialTimeout()
657 dr := cl.dialFirst(dialCtx, addr.String())
660 if dialCtx.Err() != nil {
661 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
663 return nil, errors.New("dial failed")
665 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
672 // Returns nil connection and nil error if no connection could be established
673 // for valid reasons.
674 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
675 torrent.Add("establish outgoing connection", 1)
676 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
677 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
679 torrent.Add("initiated conn with preferred header obfuscation", 1)
682 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
683 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
684 // We should have just tried with the preferred header obfuscation. If it was required,
685 // there's nothing else to try.
688 // Try again with encryption if we didn't earlier, or without if we did.
689 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
691 torrent.Add("initiated conn with fallback header obfuscation", 1)
693 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
697 // Called to dial out and run a connection. The addr we're given is already
698 // considered half-open.
699 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
700 cl.dialRateLimiter.Wait(context.Background())
701 c, err := cl.establishOutgoingConn(t, addr)
704 // Don't release lock between here and addConnection, unless it's for
706 cl.noLongerHalfOpen(t, addr.String())
709 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
716 cl.runHandshookConn(c, t)
719 // The port number for incoming peer connections. 0 if the client isn't listening.
720 func (cl *Client) incomingPeerPort() int {
721 return cl.LocalPort()
724 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
725 if c.headerEncrypted {
728 rw, c.cryptoMethod, err = mse.InitiateHandshake(
735 cl.config.CryptoProvides,
739 return xerrors.Errorf("header obfuscation handshake: %w", err)
742 ih, err := cl.connBtHandshake(c, &t.infoHash)
744 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
746 if ih != t.infoHash {
747 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
752 // Calls f with any secret keys.
753 func (cl *Client) forSkeys(f func([]byte) bool) {
756 if false { // Emulate the bug from #114
758 for ih := range cl.torrents {
762 for range cl.torrents {
769 for ih := range cl.torrents {
776 // Do encryption and bittorrent handshakes as receiver.
777 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
778 defer perf.ScopeTimerErr(&err)()
780 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
782 if err == nil || err == mse.ErrNoSecretKeyMatch {
783 if c.headerEncrypted {
784 torrent.Add("handshakes received encrypted", 1)
786 torrent.Add("handshakes received unencrypted", 1)
789 torrent.Add("handshakes received with error while handling encryption", 1)
792 if err == mse.ErrNoSecretKeyMatch {
797 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
798 err = errors.New("connection not have required header obfuscation")
801 ih, err := cl.connBtHandshake(c, nil)
803 err = xerrors.Errorf("during bt handshake: %w", err)
812 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
813 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
818 c.PeerExtensionBytes = res.PeerExtensionBits
819 c.PeerID = res.PeerID
820 c.completedHandshake = time.Now()
824 func (cl *Client) runReceivedConn(c *PeerConn) {
825 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
829 t, err := cl.receiveHandshakes(c)
832 "error receiving handshakes on %v: %s", c, err,
833 ).SetLevel(log.Debug).
835 "network", c.network,
837 torrent.Add("error receiving handshake", 1)
839 cl.onBadAccept(c.remoteAddr)
844 torrent.Add("received handshake for unloaded torrent", 1)
845 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
847 cl.onBadAccept(c.remoteAddr)
851 torrent.Add("received handshake for loaded torrent", 1)
854 cl.runHandshookConn(c, t)
857 // Client lock must be held before entering this.
858 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
860 if c.PeerID == cl.peerID {
863 addr := c.conn.RemoteAddr().String()
864 cl.dopplegangerAddrs[addr] = struct{}{}
866 // Because the remote address is not necessarily the same as its client's torrent listen
867 // address, we won't record the remote address as a doppleganger. Instead, the initiator
868 // can record *us* as the doppleganger.
872 c.conn.SetWriteDeadline(time.Time{})
873 c.r = deadlineReader{c.conn, c.r}
874 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
875 if connIsIpv6(c.conn) {
876 torrent.Add("completed handshake over ipv6", 1)
878 if err := t.addConnection(c); err != nil {
879 log.Fmsg("error adding connection: %s", err).AddValues(c).SetLevel(log.Debug).Log(t.logger)
882 defer t.dropConnection(c)
883 go c.writer(time.Minute)
884 cl.sendInitialMessages(c, t)
885 err := c.mainReadLoop()
886 if err != nil && cl.config.Debug {
887 cl.logger.Printf("error during connection main read loop: %s", err)
891 // See the order given in Transmission's tr_peerMsgsNew.
892 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
893 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
894 conn.post(pp.Message{
896 ExtendedID: pp.HandshakeExtendedID,
897 ExtendedPayload: func() []byte {
898 msg := pp.ExtendedHandshakeMessage{
899 M: map[pp.ExtensionName]pp.ExtensionNumber{
900 pp.ExtensionNameMetadata: metadataExtendedId,
902 V: cl.config.ExtendedHandshakeClientVersion,
903 Reqq: 64, // TODO: Really?
904 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
905 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
906 Port: cl.incomingPeerPort(),
907 MetadataSize: torrent.metadataSize(),
908 // TODO: We can figured these out specific to the socket
910 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
911 Ipv6: cl.config.PublicIp6.To16(),
913 if !cl.config.DisablePEX {
914 msg.M[pp.ExtensionNamePex] = pexExtendedId
916 return bencode.MustMarshal(msg)
921 if conn.fastEnabled() {
922 if torrent.haveAllPieces() {
923 conn.post(pp.Message{Type: pp.HaveAll})
924 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
926 } else if !torrent.haveAnyPieces() {
927 conn.post(pp.Message{Type: pp.HaveNone})
928 conn.sentHaves.Clear()
934 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
935 conn.post(pp.Message{
942 func (cl *Client) dhtPort() (ret uint16) {
943 cl.eachDhtServer(func(s DhtServer) {
944 ret = uint16(missinggo.AddrPort(s.Addr()))
949 func (cl *Client) haveDhtServer() (ret bool) {
950 cl.eachDhtServer(func(_ DhtServer) {
956 // Process incoming ut_metadata message.
957 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
959 err := bencode.Unmarshal(payload, &d)
960 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
961 } else if err != nil {
962 return fmt.Errorf("error unmarshalling bencode: %s", err)
964 msgType, ok := d["msg_type"]
966 return errors.New("missing msg_type field")
970 case pp.DataMetadataExtensionMsgType:
971 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
972 if !c.requestedMetadataPiece(piece) {
973 return fmt.Errorf("got unexpected piece %d", piece)
975 c.metadataRequests[piece] = false
976 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
977 if begin < 0 || begin >= len(payload) {
978 return fmt.Errorf("data has bad offset in payload: %d", begin)
980 t.saveMetadataPiece(piece, payload[begin:])
981 c.lastUsefulChunkReceived = time.Now()
982 return t.maybeCompleteMetadata()
983 case pp.RequestMetadataExtensionMsgType:
984 if !t.haveMetadataPiece(piece) {
985 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
988 start := (1 << 14) * piece
989 c.logger.Printf("sending metadata piece %d", piece)
990 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
992 case pp.RejectMetadataExtensionMsgType:
995 return errors.New("unknown msg_type value")
999 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1000 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1001 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1006 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1010 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1013 if _, ok := cl.ipBlockRange(ip); ok {
1016 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1022 // Return a Torrent ready for insertion into a Client.
1023 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1024 // use provided storage, if provided
1025 storageClient := cl.defaultStorage
1026 if specStorage != nil {
1027 storageClient = storage.NewClient(specStorage)
1033 peers: prioritizedPeers{
1035 getPrio: func(p Peer) peerPriority {
1036 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1039 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1041 halfOpen: make(map[string]Peer),
1042 pieceStateChanges: pubsub.NewPubSub(),
1044 storageOpener: storageClient,
1045 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1047 networkingEnabled: true,
1048 metadataChanged: sync.Cond{
1052 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1053 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1054 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1055 return fmt.Sprintf("%v: %s", t, m.Text())
1057 t.setChunkSize(defaultChunkSize)
1061 // A file-like handle to some torrent data resource.
1062 type Handle interface {
1069 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1070 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1073 // Adds a torrent by InfoHash with a custom Storage implementation.
1074 // If the torrent already exists then this Storage is ignored and the
1075 // existing torrent returned with `new` set to `false`
1076 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1079 t, ok := cl.torrents[infoHash]
1085 t = cl.newTorrent(infoHash, specStorage)
1086 cl.eachDhtServer(func(s DhtServer) {
1087 go t.dhtAnnouncer(s)
1089 cl.torrents[infoHash] = t
1090 cl.clearAcceptLimits()
1091 t.updateWantPeersEvent()
1092 // Tickle Client.waitAccept, new torrent may want conns.
1093 cl.event.Broadcast()
1097 // Add or merge a torrent spec. If the torrent is already present, the
1098 // trackers will be merged with the existing ones. If the Info isn't yet
1099 // known, it will be set. The display name is replaced if the new spec
1100 // provides one. Returns new if the torrent wasn't already in the client.
1101 // Note that any `Storage` defined on the spec will be ignored if the
1102 // torrent is already present (i.e. `new` return value is `true`)
1103 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1104 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1105 if spec.DisplayName != "" {
1106 t.SetDisplayName(spec.DisplayName)
1108 if spec.InfoBytes != nil {
1109 err = t.SetInfoBytes(spec.InfoBytes)
1116 if spec.ChunkSize != 0 {
1117 t.setChunkSize(pp.Integer(spec.ChunkSize))
1119 t.addTrackers(spec.Trackers)
1124 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1125 t, ok := cl.torrents[infoHash]
1127 err = fmt.Errorf("no such torrent")
1134 delete(cl.torrents, infoHash)
1138 func (cl *Client) allTorrentsCompleted() bool {
1139 for _, t := range cl.torrents {
1143 if !t.haveAllPieces() {
1150 // Returns true when all torrents are completely downloaded and false if the
1151 // client is stopped before that.
1152 func (cl *Client) WaitAll() bool {
1155 for !cl.allTorrentsCompleted() {
1156 if cl.closed.IsSet() {
1164 // Returns handles to all the torrents loaded in the Client.
1165 func (cl *Client) Torrents() []*Torrent {
1168 return cl.torrentsAsSlice()
1171 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1172 for _, t := range cl.torrents {
1173 ret = append(ret, t)
1178 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1179 spec, err := TorrentSpecFromMagnetURI(uri)
1183 T, _, err = cl.AddTorrentSpec(spec)
1187 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1188 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1190 slices.MakeInto(&ss, mi.Nodes)
1195 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1196 mi, err := metainfo.LoadFromFile(filename)
1200 return cl.AddTorrent(mi)
1203 func (cl *Client) DhtServers() []DhtServer {
1204 return cl.dhtServers
1207 func (cl *Client) AddDHTNodes(nodes []string) {
1208 for _, n := range nodes {
1209 hmp := missinggo.SplitHostMaybePort(n)
1210 ip := net.ParseIP(hmp.Host)
1212 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1215 ni := krpc.NodeInfo{
1216 Addr: krpc.NodeAddr{
1221 cl.eachDhtServer(func(s DhtServer) {
1227 func (cl *Client) banPeerIP(ip net.IP) {
1228 cl.logger.Printf("banning ip %v", ip)
1229 if cl.badPeerIPs == nil {
1230 cl.badPeerIPs = make(map[string]struct{})
1232 cl.badPeerIPs[ip.String()] = struct{}{}
1235 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1241 PeerMaxRequests: 250,
1242 writeBuffer: new(bytes.Buffer),
1243 remoteAddr: remoteAddr,
1245 connString: connString,
1247 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1248 return fmt.Sprintf("%v: %s", c, m.Text())
1250 c.writerCond.L = cl.locker()
1251 c.setRW(connStatsReadWriter{nc, c})
1252 c.r = &rateLimitedReader{
1253 l: cl.config.DownloadRateLimiter,
1256 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1260 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1268 Addr: ipPortAddr{ip, port},
1269 Source: PeerSourceDhtAnnouncePeer,
1273 func firstNotNil(ips ...net.IP) net.IP {
1274 for _, ip := range ips {
1282 func (cl *Client) eachDialer(f func(Dialer) bool) {
1283 for _, s := range cl.dialers {
1290 func (cl *Client) eachListener(f func(Listener) bool) {
1291 for _, s := range cl.listeners {
1298 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1299 cl.eachListener(func(l Listener) bool {
1306 func (cl *Client) publicIp(peer net.IP) net.IP {
1307 // TODO: Use BEP 10 to determine how peers are seeing us.
1308 if peer.To4() != nil {
1310 cl.config.PublicIp4,
1311 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1316 cl.config.PublicIp6,
1317 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1321 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1322 l := cl.findListener(
1323 func(l net.Listener) bool {
1324 return f(addrIpOrNil(l.Addr()))
1330 return addrIpOrNil(l.Addr())
1333 // Our IP as a peer should see it.
1334 func (cl *Client) publicAddr(peer net.IP) IpPort {
1335 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1338 // ListenAddrs addresses currently being listened to.
1339 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1342 cl.eachListener(func(l Listener) bool {
1343 ret = append(ret, l.Addr())
1349 func (cl *Client) onBadAccept(addr net.Addr) {
1350 ipa, ok := tryIpPortFromNetAddr(addr)
1354 ip := maskIpForAcceptLimiting(ipa.IP)
1355 if cl.acceptLimiter == nil {
1356 cl.acceptLimiter = make(map[ipStr]int)
1358 cl.acceptLimiter[ipStr(ip.String())]++
1361 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1362 if ip4 := ip.To4(); ip4 != nil {
1363 return ip4.Mask(net.CIDRMask(24, 32))
1368 func (cl *Client) clearAcceptLimits() {
1369 cl.acceptLimiter = nil
1372 func (cl *Client) acceptLimitClearer() {
1375 case <-cl.closed.LockedChan(cl.locker()):
1377 case <-time.After(15 * time.Minute):
1379 cl.clearAcceptLimits()
1385 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1386 if cl.config.DisableAcceptRateLimiting {
1389 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1392 func (cl *Client) rLock() {
1396 func (cl *Client) rUnlock() {
1400 func (cl *Client) lock() {
1404 func (cl *Client) unlock() {
1408 func (cl *Client) locker() *lockWithDeferreds {
1412 func (cl *Client) String() string {
1413 return fmt.Sprintf("<%[1]T %[1]p>", cl)