17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/davecgh/go-spew/spew"
27 "github.com/dustin/go-humanize"
28 "github.com/google/btree"
29 "golang.org/x/time/rate"
30 "golang.org/x/xerrors"
32 "github.com/anacrolix/missinggo/v2"
33 "github.com/anacrolix/missinggo/v2/conntrack"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/iplist"
37 "github.com/anacrolix/torrent/metainfo"
38 "github.com/anacrolix/torrent/mse"
39 pp "github.com/anacrolix/torrent/peer_protocol"
40 "github.com/anacrolix/torrent/storage"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
62 dhtServers []DhtServer
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 // Returns the port number for the first listener that has one. No longer assumes that all port
95 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l Listener) bool {
99 port = addrPortOrZero(l.Addr())
105 func writeDhtServerStatus(w io.Writer, s DhtServer) {
106 dhtStats := s.Stats()
107 fmt.Fprintf(w, " ID: %x\n", s.ID())
108 spew.Fdump(w, dhtStats)
111 // Writes out a human readable status of the client, such as for writing to a
113 func (cl *Client) WriteStatus(_w io.Writer) {
116 w := bufio.NewWriter(_w)
118 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
119 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
120 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
121 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
122 cl.eachDhtServer(func(s DhtServer) {
123 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
124 writeDhtServerStatus(w, s)
126 spew.Fdump(w, &cl.stats)
127 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
129 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
130 return l.InfoHash().AsString() < r.InfoHash().AsString()
133 fmt.Fprint(w, "<unknown name>")
135 fmt.Fprint(w, t.name())
139 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
141 w.WriteString("<missing metainfo>")
149 const debugLogValue = log.Debug
151 func (cl *Client) debugLogFilter(m log.Msg) bool {
155 return !m.HasValue(debugLogValue)
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
162 func (cl *Client) announceKey() int32 {
163 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
166 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
168 cfg = NewDefaultClientConfig()
178 dopplegangerAddrs: make(map[string]struct{}),
179 torrents: make(map[metainfo.Hash]*Torrent),
180 dialRateLimiter: rate.NewLimiter(10, 10),
182 go cl.acceptLimitClearer()
190 cl.extensionBytes = defaultPeerExtensionBytes()
191 cl.event.L = cl.locker()
192 storageImpl := cfg.DefaultStorage
193 if storageImpl == nil {
194 // We'd use mmap by default but HFS+ doesn't support sparse files.
195 storageImplCloser := storage.NewFile(cfg.DataDir)
196 cl.onClose = append(cl.onClose, func() {
197 if err := storageImplCloser.Close(); err != nil {
198 cl.logger.Printf("error closing default storage: %s", err)
201 storageImpl = storageImplCloser
203 cl.defaultStorage = storage.NewClient(storageImpl)
204 if cfg.IPBlocklist != nil {
205 cl.ipBlockList = cfg.IPBlocklist
208 if cfg.PeerID != "" {
209 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
211 o := copy(cl.peerID[:], cfg.Bep20)
212 _, err = rand.Read(cl.peerID[o:])
214 panic("error generating peer id")
218 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
226 for _, _s := range sockets {
227 s := _s // Go is fucking retarded.
228 cl.onClose = append(cl.onClose, func() { s.Close() })
229 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
230 cl.dialers = append(cl.dialers, s)
231 cl.listeners = append(cl.listeners, s)
232 go cl.acceptConnections(s)
238 for _, s := range sockets {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newAnacrolixDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
245 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 func (cl *Client) AddDhtServer(d DhtServer) {
254 cl.dhtServers = append(cl.dhtServers, d)
257 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
258 // given address for any Torrent.
259 func (cl *Client) AddDialer(d Dialer) {
260 cl.dialers = append(cl.dialers, d)
263 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
265 func (cl *Client) AddListener(l Listener) {
266 cl.listeners = append(cl.listeners, l)
267 go cl.acceptConnections(l)
270 func (cl *Client) firewallCallback(net.Addr) bool {
272 block := !cl.wantConns()
275 torrent.Add("connections firewalled", 1)
277 torrent.Add("connections not firewalled", 1)
282 func (cl *Client) listenOnNetwork(n network) bool {
283 if n.Ipv4 && cl.config.DisableIPv4 {
286 if n.Ipv6 && cl.config.DisableIPv6 {
289 if n.Tcp && cl.config.DisableTCP {
292 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
298 func (cl *Client) listenNetworks() (ns []network) {
299 for _, n := range allPeerNetworks {
300 if cl.listenOnNetwork(n) {
307 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
308 cfg := dht.ServerConfig{
309 IPBlocklist: cl.ipBlockList,
311 OnAnnouncePeer: cl.onDHTAnnouncePeer,
312 PublicIP: func() net.IP {
313 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
314 return cl.config.PublicIp6
316 return cl.config.PublicIp4
318 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
319 ConnectionTracking: cl.config.ConnTracker,
320 OnQuery: cl.config.DHTOnQuery,
321 Logger: cl.logger.WithText(func(m log.Msg) string {
322 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
325 s, err = dht.NewServer(&cfg)
328 ts, err := s.Bootstrap()
330 cl.logger.Printf("error bootstrapping dht: %s", err)
332 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
338 func (cl *Client) Closed() <-chan struct{} {
344 func (cl *Client) eachDhtServer(f func(DhtServer)) {
345 for _, ds := range cl.dhtServers {
350 // Stops the client. All connections to peers are closed and all activity will
352 func (cl *Client) Close() {
356 for _, t := range cl.torrents {
359 for i := range cl.onClose {
360 cl.onClose[len(cl.onClose)-1-i]()
365 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
366 if cl.ipBlockList == nil {
369 return cl.ipBlockList.Lookup(ip)
372 func (cl *Client) ipIsBlocked(ip net.IP) bool {
373 _, blocked := cl.ipBlockRange(ip)
377 func (cl *Client) wantConns() bool {
378 for _, t := range cl.torrents {
386 func (cl *Client) waitAccept() {
388 if cl.closed.IsSet() {
398 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
399 func (cl *Client) rejectAccepted(conn net.Conn) error {
400 ra := conn.RemoteAddr()
401 if rip := addrIpOrNil(ra); rip != nil {
402 if cl.config.DisableIPv4Peers && rip.To4() != nil {
403 return errors.New("ipv4 peers disabled")
405 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
406 return errors.New("ipv4 disabled")
409 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
410 return errors.New("ipv6 disabled")
412 if cl.rateLimitAccept(rip) {
413 return errors.New("source IP accepted rate limited")
415 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
416 return errors.New("bad source addr")
422 func (cl *Client) acceptConnections(l net.Listener) {
424 conn, err := l.Accept()
425 torrent.Add("client listener accepts", 1)
426 conn = pproffd.WrapNetConn(conn)
428 closed := cl.closed.IsSet()
431 reject = cl.rejectAccepted(conn)
441 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
446 torrent.Add("rejected accepted connections", 1)
447 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
450 go cl.incomingConnection(conn)
452 log.Fmsg("accepted %q connection at %q from %q",
456 ).AddValue(debugLogValue).Log(cl.logger)
457 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
458 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
459 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
464 func (cl *Client) incomingConnection(nc net.Conn) {
466 if tc, ok := nc.(*net.TCPConn); ok {
469 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network())
470 c.Discovery = PeerSourceIncoming
471 cl.runReceivedConn(c)
474 // Returns a handle to the given torrent, if it's present in the client.
475 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
478 t, ok = cl.torrents[ih]
482 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
483 return cl.torrents[ih]
486 type dialResult struct {
491 func countDialResult(err error) {
493 torrent.Add("successful dials", 1)
495 torrent.Add("unsuccessful dials", 1)
499 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
500 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
501 if ret < minDialTimeout {
507 // Returns whether an address is known to connect to a client with our own ID.
508 func (cl *Client) dopplegangerAddr(addr string) bool {
509 _, ok := cl.dopplegangerAddrs[addr]
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
516 t := perf.NewTimer(perf.CallerName(0))
519 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
521 t.Mark("returned conn over " + res.Network)
525 ctx, cancel := context.WithCancel(ctx)
526 // As soon as we return one connection, cancel the others.
529 resCh := make(chan dialResult, left)
533 cl.eachDialer(func(s Dialer) bool {
536 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
539 cl.dialFromSocket(ctx, s, addr),
540 s.LocalAddr().Network(),
547 // Wait for a successful connection.
549 defer perf.ScopeTimer()()
550 for ; left > 0 && res.Conn == nil; left-- {
554 // There are still incompleted dials.
556 for ; left > 0; left-- {
557 conn := (<-resCh).Conn
564 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
566 //if res.Conn != nil {
567 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
569 // cl.logger.Printf("failed to dial %s", addr)
574 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
575 network := s.LocalAddr().Network()
576 cte := cl.config.ConnTracker.Wait(
578 conntrack.Entry{network, s.LocalAddr().String(), addr},
579 "dial torrent client",
582 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
583 // which dial errors allow us to forget the connection tracking entry handle.
584 if ctx.Err() != nil {
590 c, err := s.Dial(ctx, addr)
591 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
592 // it now in case we close the connection forthwith.
593 if tc, ok := c.(*net.TCPConn); ok {
598 if err != nil && forgettableDialError(err) {
605 return closeWrapper{c, func() error {
612 func forgettableDialError(err error) bool {
613 return strings.Contains(err.Error(), "no suitable address found")
616 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
617 if _, ok := t.halfOpen[addr]; !ok {
618 panic("invariant broken")
620 delete(t.halfOpen, addr)
624 // Performs initiator handshakes and returns a connection. Returns nil
625 // *connection if no connection for valid reasons.
626 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr, network string) (c *PeerConn, err error) {
627 c = cl.newConnection(nc, true, remoteAddr, network)
628 c.headerEncrypted = encryptHeader
629 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
631 dl, ok := ctx.Deadline()
635 err = nc.SetDeadline(dl)
639 err = cl.initiateHandshakes(c, t)
643 // Returns nil connection and nil error if no connection could be established
644 // for valid reasons.
645 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
646 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
649 return t.dialTimeout()
652 dr := cl.dialFirst(dialCtx, addr.String())
655 if dialCtx.Err() != nil {
656 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
658 return nil, errors.New("dial failed")
660 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
667 // Returns nil connection and nil error if no connection could be established
668 // for valid reasons.
669 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
670 torrent.Add("establish outgoing connection", 1)
671 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
672 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
674 torrent.Add("initiated conn with preferred header obfuscation", 1)
677 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
678 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
679 // We should have just tried with the preferred header obfuscation. If it was required,
680 // there's nothing else to try.
683 // Try again with encryption if we didn't earlier, or without if we did.
684 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
686 torrent.Add("initiated conn with fallback header obfuscation", 1)
688 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
692 // Called to dial out and run a connection. The addr we're given is already
693 // considered half-open.
694 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
695 cl.dialRateLimiter.Wait(context.Background())
696 c, err := cl.establishOutgoingConn(t, addr)
699 // Don't release lock between here and addConnection, unless it's for
701 cl.noLongerHalfOpen(t, addr.String())
704 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
711 cl.runHandshookConn(c, t)
714 // The port number for incoming peer connections. 0 if the client isn't listening.
715 func (cl *Client) incomingPeerPort() int {
716 return cl.LocalPort()
719 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
720 if c.headerEncrypted {
723 rw, c.cryptoMethod, err = mse.InitiateHandshake(
730 cl.config.CryptoProvides,
734 return xerrors.Errorf("header obfuscation handshake: %w", err)
737 ih, err := cl.connBtHandshake(c, &t.infoHash)
739 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
741 if ih != t.infoHash {
742 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
747 // Calls f with any secret keys.
748 func (cl *Client) forSkeys(f func([]byte) bool) {
751 if false { // Emulate the bug from #114
753 for ih := range cl.torrents {
757 for range cl.torrents {
764 for ih := range cl.torrents {
771 // Do encryption and bittorrent handshakes as receiver.
772 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
773 defer perf.ScopeTimerErr(&err)()
775 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
777 if err == nil || err == mse.ErrNoSecretKeyMatch {
778 if c.headerEncrypted {
779 torrent.Add("handshakes received encrypted", 1)
781 torrent.Add("handshakes received unencrypted", 1)
784 torrent.Add("handshakes received with error while handling encryption", 1)
787 if err == mse.ErrNoSecretKeyMatch {
792 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
793 err = errors.New("connection not have required header obfuscation")
796 ih, err := cl.connBtHandshake(c, nil)
798 err = xerrors.Errorf("during bt handshake: %w", err)
807 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
808 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
813 c.PeerExtensionBytes = res.PeerExtensionBits
814 c.PeerID = res.PeerID
815 c.completedHandshake = time.Now()
819 func (cl *Client) runReceivedConn(c *PeerConn) {
820 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
824 t, err := cl.receiveHandshakes(c)
827 "error receiving handshakes on %v: %s", c, err,
831 "network", c.network,
833 torrent.Add("error receiving handshake", 1)
835 cl.onBadAccept(c.remoteAddr)
840 torrent.Add("received handshake for unloaded torrent", 1)
841 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
843 cl.onBadAccept(c.remoteAddr)
847 torrent.Add("received handshake for loaded torrent", 1)
850 cl.runHandshookConn(c, t)
853 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
855 if c.PeerID == cl.peerID {
858 addr := c.conn.RemoteAddr().String()
859 cl.dopplegangerAddrs[addr] = struct{}{}
861 // Because the remote address is not necessarily the same as its client's torrent listen
862 // address, we won't record the remote address as a doppleganger. Instead, the initiator
863 // can record *us* as the doppleganger.
867 c.conn.SetWriteDeadline(time.Time{})
868 c.r = deadlineReader{c.conn, c.r}
869 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
870 if connIsIpv6(c.conn) {
871 torrent.Add("completed handshake over ipv6", 1)
873 if err := t.addConnection(c); err != nil {
874 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
877 defer t.dropConnection(c)
878 go c.writer(time.Minute)
879 cl.sendInitialMessages(c, t)
880 err := c.mainReadLoop()
881 if err != nil && cl.config.Debug {
882 cl.logger.Printf("error during connection main read loop: %s", err)
886 // See the order given in Transmission's tr_peerMsgsNew.
887 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
888 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
889 conn.post(pp.Message{
891 ExtendedID: pp.HandshakeExtendedID,
892 ExtendedPayload: func() []byte {
893 msg := pp.ExtendedHandshakeMessage{
894 M: map[pp.ExtensionName]pp.ExtensionNumber{
895 pp.ExtensionNameMetadata: metadataExtendedId,
897 V: cl.config.ExtendedHandshakeClientVersion,
898 Reqq: 64, // TODO: Really?
899 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
900 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
901 Port: cl.incomingPeerPort(),
902 MetadataSize: torrent.metadataSize(),
903 // TODO: We can figured these out specific to the socket
905 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
906 Ipv6: cl.config.PublicIp6.To16(),
908 if !cl.config.DisablePEX {
909 msg.M[pp.ExtensionNamePex] = pexExtendedId
911 return bencode.MustMarshal(msg)
916 if conn.fastEnabled() {
917 if torrent.haveAllPieces() {
918 conn.post(pp.Message{Type: pp.HaveAll})
919 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
921 } else if !torrent.haveAnyPieces() {
922 conn.post(pp.Message{Type: pp.HaveNone})
923 conn.sentHaves.Clear()
929 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
930 conn.post(pp.Message{
937 func (cl *Client) sendInitialPEX(conn *PeerConn, t *Torrent) {
938 xid, ok := conn.PeerExtensionIDs[pp.ExtensionNamePex]
942 m, seq := t.pex.Genmsg(0)
945 cl.logger.Printf("no initial PEX this time")
946 // FIXME see how can we schedule another initial for later
949 conn.logger.Printf("sending initial PEX message: %v", m)
950 conn.post(m.Message(xid))
953 func (cl *Client) dhtPort() (ret uint16) {
954 cl.eachDhtServer(func(s DhtServer) {
955 ret = uint16(missinggo.AddrPort(s.Addr()))
960 func (cl *Client) haveDhtServer() (ret bool) {
961 cl.eachDhtServer(func(_ DhtServer) {
967 // Process incoming ut_metadata message.
968 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
970 err := bencode.Unmarshal(payload, &d)
971 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
972 } else if err != nil {
973 return fmt.Errorf("error unmarshalling bencode: %s", err)
975 msgType, ok := d["msg_type"]
977 return errors.New("missing msg_type field")
981 case pp.DataMetadataExtensionMsgType:
982 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
983 if !c.requestedMetadataPiece(piece) {
984 return fmt.Errorf("got unexpected piece %d", piece)
986 c.metadataRequests[piece] = false
987 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
988 if begin < 0 || begin >= len(payload) {
989 return fmt.Errorf("data has bad offset in payload: %d", begin)
991 t.saveMetadataPiece(piece, payload[begin:])
992 c.lastUsefulChunkReceived = time.Now()
993 return t.maybeCompleteMetadata()
994 case pp.RequestMetadataExtensionMsgType:
995 if !t.haveMetadataPiece(piece) {
996 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
999 start := (1 << 14) * piece
1000 c.logger.Printf("sending metadata piece %d", piece)
1001 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1003 case pp.RejectMetadataExtensionMsgType:
1006 return errors.New("unknown msg_type value")
1010 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1011 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1012 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1017 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1021 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1024 if _, ok := cl.ipBlockRange(ip); ok {
1027 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1033 // Return a Torrent ready for insertion into a Client.
1034 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1035 // use provided storage, if provided
1036 storageClient := cl.defaultStorage
1037 if specStorage != nil {
1038 storageClient = storage.NewClient(specStorage)
1044 peers: prioritizedPeers{
1046 getPrio: func(p Peer) peerPriority {
1047 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1050 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1052 halfOpen: make(map[string]Peer),
1053 pieceStateChanges: pubsub.NewPubSub(),
1055 storageOpener: storageClient,
1056 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1058 networkingEnabled: true,
1059 metadataChanged: sync.Cond{
1063 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1064 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1065 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1066 return fmt.Sprintf("%v: %s", t, m.Text())
1068 t.setChunkSize(defaultChunkSize)
1072 // A file-like handle to some torrent data resource.
1073 type Handle interface {
1080 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1081 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1084 // Adds a torrent by InfoHash with a custom Storage implementation.
1085 // If the torrent already exists then this Storage is ignored and the
1086 // existing torrent returned with `new` set to `false`
1087 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1090 t, ok := cl.torrents[infoHash]
1096 t = cl.newTorrent(infoHash, specStorage)
1097 cl.eachDhtServer(func(s DhtServer) {
1098 go t.dhtAnnouncer(s)
1100 cl.torrents[infoHash] = t
1101 cl.clearAcceptLimits()
1102 t.updateWantPeersEvent()
1103 // Tickle Client.waitAccept, new torrent may want conns.
1104 cl.event.Broadcast()
1108 // Add or merge a torrent spec. If the torrent is already present, the
1109 // trackers will be merged with the existing ones. If the Info isn't yet
1110 // known, it will be set. The display name is replaced if the new spec
1111 // provides one. Returns new if the torrent wasn't already in the client.
1112 // Note that any `Storage` defined on the spec will be ignored if the
1113 // torrent is already present (i.e. `new` return value is `true`)
1114 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1115 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1116 if spec.DisplayName != "" {
1117 t.SetDisplayName(spec.DisplayName)
1119 if spec.InfoBytes != nil {
1120 err = t.SetInfoBytes(spec.InfoBytes)
1127 if spec.ChunkSize != 0 {
1128 t.setChunkSize(pp.Integer(spec.ChunkSize))
1130 t.addTrackers(spec.Trackers)
1135 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1136 t, ok := cl.torrents[infoHash]
1138 err = fmt.Errorf("no such torrent")
1145 delete(cl.torrents, infoHash)
1149 func (cl *Client) allTorrentsCompleted() bool {
1150 for _, t := range cl.torrents {
1154 if !t.haveAllPieces() {
1161 // Returns true when all torrents are completely downloaded and false if the
1162 // client is stopped before that.
1163 func (cl *Client) WaitAll() bool {
1166 for !cl.allTorrentsCompleted() {
1167 if cl.closed.IsSet() {
1175 // Returns handles to all the torrents loaded in the Client.
1176 func (cl *Client) Torrents() []*Torrent {
1179 return cl.torrentsAsSlice()
1182 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1183 for _, t := range cl.torrents {
1184 ret = append(ret, t)
1189 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1190 spec, err := TorrentSpecFromMagnetURI(uri)
1194 T, _, err = cl.AddTorrentSpec(spec)
1198 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1199 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1201 slices.MakeInto(&ss, mi.Nodes)
1206 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1207 mi, err := metainfo.LoadFromFile(filename)
1211 return cl.AddTorrent(mi)
1214 func (cl *Client) DhtServers() []DhtServer {
1215 return cl.dhtServers
1218 func (cl *Client) AddDHTNodes(nodes []string) {
1219 for _, n := range nodes {
1220 hmp := missinggo.SplitHostMaybePort(n)
1221 ip := net.ParseIP(hmp.Host)
1223 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1226 ni := krpc.NodeInfo{
1227 Addr: krpc.NodeAddr{
1232 cl.eachDhtServer(func(s DhtServer) {
1238 func (cl *Client) banPeerIP(ip net.IP) {
1239 cl.logger.Printf("banning ip %v", ip)
1240 if cl.badPeerIPs == nil {
1241 cl.badPeerIPs = make(map[string]struct{})
1243 cl.badPeerIPs[ip.String()] = struct{}{}
1246 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network string) (c *PeerConn) {
1252 PeerMaxRequests: 250,
1253 writeBuffer: new(bytes.Buffer),
1254 remoteAddr: remoteAddr,
1257 c.logger = cl.logger.WithValues(c,
1258 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1259 ).WithText(func(m log.Msg) string {
1260 return fmt.Sprintf("%v: %s", c, m.Text())
1262 c.writerCond.L = cl.locker()
1263 c.setRW(connStatsReadWriter{nc, c})
1264 c.r = &rateLimitedReader{
1265 l: cl.config.DownloadRateLimiter,
1268 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1272 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1280 Addr: ipPortAddr{ip, port},
1281 Source: PeerSourceDhtAnnouncePeer,
1285 func firstNotNil(ips ...net.IP) net.IP {
1286 for _, ip := range ips {
1294 func (cl *Client) eachDialer(f func(Dialer) bool) {
1295 for _, s := range cl.dialers {
1302 func (cl *Client) eachListener(f func(Listener) bool) {
1303 for _, s := range cl.listeners {
1310 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1311 cl.eachListener(func(l Listener) bool {
1318 func (cl *Client) publicIp(peer net.IP) net.IP {
1319 // TODO: Use BEP 10 to determine how peers are seeing us.
1320 if peer.To4() != nil {
1322 cl.config.PublicIp4,
1323 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1328 cl.config.PublicIp6,
1329 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1333 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1336 func(l net.Listener) bool {
1337 return f(addrIpOrNil(l.Addr()))
1343 // Our IP as a peer should see it.
1344 func (cl *Client) publicAddr(peer net.IP) IpPort {
1345 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1348 // ListenAddrs addresses currently being listened to.
1349 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1352 cl.eachListener(func(l Listener) bool {
1353 ret = append(ret, l.Addr())
1359 func (cl *Client) onBadAccept(addr net.Addr) {
1360 ipa, ok := tryIpPortFromNetAddr(addr)
1364 ip := maskIpForAcceptLimiting(ipa.IP)
1365 if cl.acceptLimiter == nil {
1366 cl.acceptLimiter = make(map[ipStr]int)
1368 cl.acceptLimiter[ipStr(ip.String())]++
1371 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1372 if ip4 := ip.To4(); ip4 != nil {
1373 return ip4.Mask(net.CIDRMask(24, 32))
1378 func (cl *Client) clearAcceptLimits() {
1379 cl.acceptLimiter = nil
1382 func (cl *Client) acceptLimitClearer() {
1385 case <-cl.closed.LockedChan(cl.locker()):
1387 case <-time.After(15 * time.Minute):
1389 cl.clearAcceptLimits()
1395 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1396 if cl.config.DisableAcceptRateLimiting {
1399 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1402 func (cl *Client) rLock() {
1406 func (cl *Client) rUnlock() {
1410 func (cl *Client) lock() {
1414 func (cl *Client) unlock() {
1418 func (cl *Client) locker() *lockWithDeferreds {
1422 func (cl *Client) String() string {
1423 return fmt.Sprintf("<%[1]T %[1]p>", cl)