17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/davecgh/go-spew/spew"
27 "github.com/dustin/go-humanize"
28 "github.com/google/btree"
29 "golang.org/x/time/rate"
30 "golang.org/x/xerrors"
32 "github.com/anacrolix/missinggo/v2"
33 "github.com/anacrolix/missinggo/v2/conntrack"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/iplist"
37 "github.com/anacrolix/torrent/metainfo"
38 "github.com/anacrolix/torrent/mse"
39 pp "github.com/anacrolix/torrent/peer_protocol"
40 "github.com/anacrolix/torrent/storage"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
62 dhtServers []DhtServer
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 // Returns the port number for the first listener that has one. No longer assumes that all port
95 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l Listener) bool {
99 port = addrPortOrZero(l.Addr())
105 func writeDhtServerStatus(w io.Writer, s DhtServer) {
106 dhtStats := s.Stats()
107 fmt.Fprintf(w, " ID: %x\n", s.ID())
108 spew.Fdump(w, dhtStats)
111 // Writes out a human readable status of the client, such as for writing to a
113 func (cl *Client) WriteStatus(_w io.Writer) {
116 w := bufio.NewWriter(_w)
118 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
119 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
120 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
121 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
122 cl.eachDhtServer(func(s DhtServer) {
123 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
124 writeDhtServerStatus(w, s)
126 spew.Fdump(w, &cl.stats)
127 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
129 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
130 return l.InfoHash().AsString() < r.InfoHash().AsString()
133 fmt.Fprint(w, "<unknown name>")
135 fmt.Fprint(w, t.name())
139 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
141 w.WriteString("<missing metainfo>")
149 const debugLogValue = log.Debug
151 func (cl *Client) debugLogFilter(m log.Msg) bool {
155 return !m.HasValue(debugLogValue)
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
162 func (cl *Client) announceKey() int32 {
163 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
166 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
168 cfg = NewDefaultClientConfig()
178 dopplegangerAddrs: make(map[string]struct{}),
179 torrents: make(map[metainfo.Hash]*Torrent),
180 dialRateLimiter: rate.NewLimiter(10, 10),
182 go cl.acceptLimitClearer()
190 cl.extensionBytes = defaultPeerExtensionBytes()
191 cl.event.L = cl.locker()
192 storageImpl := cfg.DefaultStorage
193 if storageImpl == nil {
194 // We'd use mmap by default but HFS+ doesn't support sparse files.
195 storageImplCloser := storage.NewFile(cfg.DataDir)
196 cl.onClose = append(cl.onClose, func() {
197 if err := storageImplCloser.Close(); err != nil {
198 cl.logger.Printf("error closing default storage: %s", err)
201 storageImpl = storageImplCloser
203 cl.defaultStorage = storage.NewClient(storageImpl)
204 if cfg.IPBlocklist != nil {
205 cl.ipBlockList = cfg.IPBlocklist
208 if cfg.PeerID != "" {
209 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
211 o := copy(cl.peerID[:], cfg.Bep20)
212 _, err = rand.Read(cl.peerID[o:])
214 panic("error generating peer id")
218 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
226 for _, _s := range sockets {
227 s := _s // Go is fucking retarded.
228 cl.onClose = append(cl.onClose, func() { s.Close() })
229 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
230 cl.dialers = append(cl.dialers, s)
231 cl.listeners = append(cl.listeners, s)
232 go cl.acceptConnections(s)
238 for _, s := range sockets {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newAnacrolixDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
245 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 func (cl *Client) AddDhtServer(d DhtServer) {
254 cl.dhtServers = append(cl.dhtServers, d)
257 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
258 // given address for any Torrent.
259 func (cl *Client) AddDialer(d Dialer) {
260 cl.dialers = append(cl.dialers, d)
263 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
265 func (cl *Client) AddListener(l Listener) {
266 cl.listeners = append(cl.listeners, l)
267 go cl.acceptConnections(l)
270 func (cl *Client) firewallCallback(net.Addr) bool {
272 block := !cl.wantConns()
275 torrent.Add("connections firewalled", 1)
277 torrent.Add("connections not firewalled", 1)
282 func (cl *Client) listenOnNetwork(n network) bool {
283 if n.Ipv4 && cl.config.DisableIPv4 {
286 if n.Ipv6 && cl.config.DisableIPv6 {
289 if n.Tcp && cl.config.DisableTCP {
292 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
298 func (cl *Client) listenNetworks() (ns []network) {
299 for _, n := range allPeerNetworks {
300 if cl.listenOnNetwork(n) {
307 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
308 cfg := dht.ServerConfig{
309 IPBlocklist: cl.ipBlockList,
311 OnAnnouncePeer: cl.onDHTAnnouncePeer,
312 PublicIP: func() net.IP {
313 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
314 return cl.config.PublicIp6
316 return cl.config.PublicIp4
318 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
319 ConnectionTracking: cl.config.ConnTracker,
320 OnQuery: cl.config.DHTOnQuery,
321 Logger: cl.logger.WithText(func(m log.Msg) string {
322 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
325 s, err = dht.NewServer(&cfg)
328 ts, err := s.Bootstrap()
330 cl.logger.Printf("error bootstrapping dht: %s", err)
332 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
338 func (cl *Client) Closed() <-chan struct{} {
344 func (cl *Client) eachDhtServer(f func(DhtServer)) {
345 for _, ds := range cl.dhtServers {
350 // Stops the client. All connections to peers are closed and all activity will
352 func (cl *Client) Close() {
356 for _, t := range cl.torrents {
359 for i := range cl.onClose {
360 cl.onClose[len(cl.onClose)-1-i]()
365 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
366 if cl.ipBlockList == nil {
369 return cl.ipBlockList.Lookup(ip)
372 func (cl *Client) ipIsBlocked(ip net.IP) bool {
373 _, blocked := cl.ipBlockRange(ip)
377 func (cl *Client) wantConns() bool {
378 for _, t := range cl.torrents {
386 func (cl *Client) waitAccept() {
388 if cl.closed.IsSet() {
398 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
399 func (cl *Client) rejectAccepted(conn net.Conn) error {
400 ra := conn.RemoteAddr()
401 if rip := addrIpOrNil(ra); rip != nil {
402 if cl.config.DisableIPv4Peers && rip.To4() != nil {
403 return errors.New("ipv4 peers disabled")
405 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
406 return errors.New("ipv4 disabled")
409 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
410 return errors.New("ipv6 disabled")
412 if cl.rateLimitAccept(rip) {
413 return errors.New("source IP accepted rate limited")
415 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
416 return errors.New("bad source addr")
422 func (cl *Client) acceptConnections(l net.Listener) {
424 conn, err := l.Accept()
425 torrent.Add("client listener accepts", 1)
426 conn = pproffd.WrapNetConn(conn)
428 closed := cl.closed.IsSet()
431 reject = cl.rejectAccepted(conn)
441 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
446 torrent.Add("rejected accepted connections", 1)
447 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
450 go cl.incomingConnection(conn)
452 log.Fmsg("accepted %q connection at %q from %q",
456 ).AddValue(debugLogValue).Log(cl.logger)
457 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
458 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
459 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
464 func regularConnString(nc net.Conn) string {
465 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
468 func (cl *Client) incomingConnection(nc net.Conn) {
470 if tc, ok := nc.(*net.TCPConn); ok {
473 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
474 regularConnString(nc))
475 c.Discovery = PeerSourceIncoming
476 cl.runReceivedConn(c)
479 // Returns a handle to the given torrent, if it's present in the client.
480 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
483 t, ok = cl.torrents[ih]
487 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
488 return cl.torrents[ih]
491 type dialResult struct {
496 func countDialResult(err error) {
498 torrent.Add("successful dials", 1)
500 torrent.Add("unsuccessful dials", 1)
504 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
505 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
506 if ret < minDialTimeout {
512 // Returns whether an address is known to connect to a client with our own ID.
513 func (cl *Client) dopplegangerAddr(addr string) bool {
514 _, ok := cl.dopplegangerAddrs[addr]
518 // Returns a connection over UTP or TCP, whichever is first to connect.
519 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
521 t := perf.NewTimer(perf.CallerName(0))
524 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
526 t.Mark("returned conn over " + res.Network)
530 ctx, cancel := context.WithCancel(ctx)
531 // As soon as we return one connection, cancel the others.
534 resCh := make(chan dialResult, left)
538 cl.eachDialer(func(s Dialer) bool {
541 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
544 cl.dialFromSocket(ctx, s, addr),
545 s.LocalAddr().Network(),
552 // Wait for a successful connection.
554 defer perf.ScopeTimer()()
555 for ; left > 0 && res.Conn == nil; left-- {
559 // There are still incompleted dials.
561 for ; left > 0; left-- {
562 conn := (<-resCh).Conn
569 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
571 //if res.Conn != nil {
572 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
574 // cl.logger.Printf("failed to dial %s", addr)
579 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
580 network := s.LocalAddr().Network()
581 cte := cl.config.ConnTracker.Wait(
583 conntrack.Entry{network, s.LocalAddr().String(), addr},
584 "dial torrent client",
587 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
588 // which dial errors allow us to forget the connection tracking entry handle.
589 if ctx.Err() != nil {
595 c, err := s.Dial(ctx, addr)
596 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
597 // it now in case we close the connection forthwith.
598 if tc, ok := c.(*net.TCPConn); ok {
603 if err != nil && forgettableDialError(err) {
610 return closeWrapper{c, func() error {
617 func forgettableDialError(err error) bool {
618 return strings.Contains(err.Error(), "no suitable address found")
621 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
622 if _, ok := t.halfOpen[addr]; !ok {
623 panic("invariant broken")
625 delete(t.halfOpen, addr)
629 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
630 // for valid reasons.
631 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr,
632 network, connString string,
633 ) (c *PeerConn, err error) {
634 c = cl.newConnection(nc, true, remoteAddr, network, connString)
635 c.headerEncrypted = encryptHeader
636 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
638 dl, ok := ctx.Deadline()
642 err = nc.SetDeadline(dl)
646 err = cl.initiateHandshakes(c, t)
650 // Returns nil connection and nil error if no connection could be established for valid reasons.
651 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
652 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
655 return t.dialTimeout()
658 dr := cl.dialFirst(dialCtx, addr.String())
661 if dialCtx.Err() != nil {
662 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
664 return nil, errors.New("dial failed")
666 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
673 // Returns nil connection and nil error if no connection could be established
674 // for valid reasons.
675 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
676 torrent.Add("establish outgoing connection", 1)
677 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
678 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
680 torrent.Add("initiated conn with preferred header obfuscation", 1)
683 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
684 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
685 // We should have just tried with the preferred header obfuscation. If it was required,
686 // there's nothing else to try.
689 // Try again with encryption if we didn't earlier, or without if we did.
690 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
692 torrent.Add("initiated conn with fallback header obfuscation", 1)
694 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
698 // Called to dial out and run a connection. The addr we're given is already
699 // considered half-open.
700 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
701 cl.dialRateLimiter.Wait(context.Background())
702 c, err := cl.establishOutgoingConn(t, addr)
705 // Don't release lock between here and addConnection, unless it's for
707 cl.noLongerHalfOpen(t, addr.String())
710 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
717 cl.runHandshookConn(c, t)
720 // The port number for incoming peer connections. 0 if the client isn't listening.
721 func (cl *Client) incomingPeerPort() int {
722 return cl.LocalPort()
725 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
726 if c.headerEncrypted {
729 rw, c.cryptoMethod, err = mse.InitiateHandshake(
736 cl.config.CryptoProvides,
740 return xerrors.Errorf("header obfuscation handshake: %w", err)
743 ih, err := cl.connBtHandshake(c, &t.infoHash)
745 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
747 if ih != t.infoHash {
748 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
753 // Calls f with any secret keys.
754 func (cl *Client) forSkeys(f func([]byte) bool) {
757 if false { // Emulate the bug from #114
759 for ih := range cl.torrents {
763 for range cl.torrents {
770 for ih := range cl.torrents {
777 // Do encryption and bittorrent handshakes as receiver.
778 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
779 defer perf.ScopeTimerErr(&err)()
781 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
783 if err == nil || err == mse.ErrNoSecretKeyMatch {
784 if c.headerEncrypted {
785 torrent.Add("handshakes received encrypted", 1)
787 torrent.Add("handshakes received unencrypted", 1)
790 torrent.Add("handshakes received with error while handling encryption", 1)
793 if err == mse.ErrNoSecretKeyMatch {
798 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
799 err = errors.New("connection not have required header obfuscation")
802 ih, err := cl.connBtHandshake(c, nil)
804 err = xerrors.Errorf("during bt handshake: %w", err)
813 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
814 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
819 c.PeerExtensionBytes = res.PeerExtensionBits
820 c.PeerID = res.PeerID
821 c.completedHandshake = time.Now()
825 func (cl *Client) runReceivedConn(c *PeerConn) {
826 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
830 t, err := cl.receiveHandshakes(c)
833 "error receiving handshakes on %v: %s", c, err,
837 "network", c.network,
839 torrent.Add("error receiving handshake", 1)
841 cl.onBadAccept(c.remoteAddr)
846 torrent.Add("received handshake for unloaded torrent", 1)
847 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
849 cl.onBadAccept(c.remoteAddr)
853 torrent.Add("received handshake for loaded torrent", 1)
856 cl.runHandshookConn(c, t)
859 // Client lock must be held before entering this.
860 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
862 if c.PeerID == cl.peerID {
865 addr := c.conn.RemoteAddr().String()
866 cl.dopplegangerAddrs[addr] = struct{}{}
868 // Because the remote address is not necessarily the same as its client's torrent listen
869 // address, we won't record the remote address as a doppleganger. Instead, the initiator
870 // can record *us* as the doppleganger.
874 c.conn.SetWriteDeadline(time.Time{})
875 c.r = deadlineReader{c.conn, c.r}
876 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
877 if connIsIpv6(c.conn) {
878 torrent.Add("completed handshake over ipv6", 1)
880 if err := t.addConnection(c); err != nil {
881 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
884 defer t.dropConnection(c)
885 go c.writer(time.Minute)
886 cl.sendInitialMessages(c, t)
887 err := c.mainReadLoop()
888 if err != nil && cl.config.Debug {
889 cl.logger.Printf("error during connection main read loop: %s", err)
893 // See the order given in Transmission's tr_peerMsgsNew.
894 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
895 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
896 conn.post(pp.Message{
898 ExtendedID: pp.HandshakeExtendedID,
899 ExtendedPayload: func() []byte {
900 msg := pp.ExtendedHandshakeMessage{
901 M: map[pp.ExtensionName]pp.ExtensionNumber{
902 pp.ExtensionNameMetadata: metadataExtendedId,
904 V: cl.config.ExtendedHandshakeClientVersion,
905 Reqq: 64, // TODO: Really?
906 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
907 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
908 Port: cl.incomingPeerPort(),
909 MetadataSize: torrent.metadataSize(),
910 // TODO: We can figured these out specific to the socket
912 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
913 Ipv6: cl.config.PublicIp6.To16(),
915 if !cl.config.DisablePEX {
916 msg.M[pp.ExtensionNamePex] = pexExtendedId
918 return bencode.MustMarshal(msg)
923 if conn.fastEnabled() {
924 if torrent.haveAllPieces() {
925 conn.post(pp.Message{Type: pp.HaveAll})
926 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
928 } else if !torrent.haveAnyPieces() {
929 conn.post(pp.Message{Type: pp.HaveNone})
930 conn.sentHaves.Clear()
936 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
937 conn.post(pp.Message{
944 func (cl *Client) dhtPort() (ret uint16) {
945 cl.eachDhtServer(func(s DhtServer) {
946 ret = uint16(missinggo.AddrPort(s.Addr()))
951 func (cl *Client) haveDhtServer() (ret bool) {
952 cl.eachDhtServer(func(_ DhtServer) {
958 // Process incoming ut_metadata message.
959 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
961 err := bencode.Unmarshal(payload, &d)
962 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
963 } else if err != nil {
964 return fmt.Errorf("error unmarshalling bencode: %s", err)
966 msgType, ok := d["msg_type"]
968 return errors.New("missing msg_type field")
972 case pp.DataMetadataExtensionMsgType:
973 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
974 if !c.requestedMetadataPiece(piece) {
975 return fmt.Errorf("got unexpected piece %d", piece)
977 c.metadataRequests[piece] = false
978 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
979 if begin < 0 || begin >= len(payload) {
980 return fmt.Errorf("data has bad offset in payload: %d", begin)
982 t.saveMetadataPiece(piece, payload[begin:])
983 c.lastUsefulChunkReceived = time.Now()
984 return t.maybeCompleteMetadata()
985 case pp.RequestMetadataExtensionMsgType:
986 if !t.haveMetadataPiece(piece) {
987 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
990 start := (1 << 14) * piece
991 c.logger.Printf("sending metadata piece %d", piece)
992 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
994 case pp.RejectMetadataExtensionMsgType:
997 return errors.New("unknown msg_type value")
1001 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1002 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1003 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1008 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1012 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1015 if _, ok := cl.ipBlockRange(ip); ok {
1018 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1024 // Return a Torrent ready for insertion into a Client.
1025 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1026 // use provided storage, if provided
1027 storageClient := cl.defaultStorage
1028 if specStorage != nil {
1029 storageClient = storage.NewClient(specStorage)
1035 peers: prioritizedPeers{
1037 getPrio: func(p Peer) peerPriority {
1038 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1041 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1043 halfOpen: make(map[string]Peer),
1044 pieceStateChanges: pubsub.NewPubSub(),
1046 storageOpener: storageClient,
1047 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1049 networkingEnabled: true,
1050 metadataChanged: sync.Cond{
1054 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1055 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1056 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1057 return fmt.Sprintf("%v: %s", t, m.Text())
1059 t.setChunkSize(defaultChunkSize)
1063 // A file-like handle to some torrent data resource.
1064 type Handle interface {
1071 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1072 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1075 // Adds a torrent by InfoHash with a custom Storage implementation.
1076 // If the torrent already exists then this Storage is ignored and the
1077 // existing torrent returned with `new` set to `false`
1078 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1081 t, ok := cl.torrents[infoHash]
1087 t = cl.newTorrent(infoHash, specStorage)
1088 cl.eachDhtServer(func(s DhtServer) {
1089 go t.dhtAnnouncer(s)
1091 cl.torrents[infoHash] = t
1092 cl.clearAcceptLimits()
1093 t.updateWantPeersEvent()
1094 // Tickle Client.waitAccept, new torrent may want conns.
1095 cl.event.Broadcast()
1099 // Add or merge a torrent spec. If the torrent is already present, the
1100 // trackers will be merged with the existing ones. If the Info isn't yet
1101 // known, it will be set. The display name is replaced if the new spec
1102 // provides one. Returns new if the torrent wasn't already in the client.
1103 // Note that any `Storage` defined on the spec will be ignored if the
1104 // torrent is already present (i.e. `new` return value is `true`)
1105 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1106 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1107 if spec.DisplayName != "" {
1108 t.SetDisplayName(spec.DisplayName)
1110 if spec.InfoBytes != nil {
1111 err = t.SetInfoBytes(spec.InfoBytes)
1118 if spec.ChunkSize != 0 {
1119 t.setChunkSize(pp.Integer(spec.ChunkSize))
1121 t.addTrackers(spec.Trackers)
1126 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1127 t, ok := cl.torrents[infoHash]
1129 err = fmt.Errorf("no such torrent")
1136 delete(cl.torrents, infoHash)
1140 func (cl *Client) allTorrentsCompleted() bool {
1141 for _, t := range cl.torrents {
1145 if !t.haveAllPieces() {
1152 // Returns true when all torrents are completely downloaded and false if the
1153 // client is stopped before that.
1154 func (cl *Client) WaitAll() bool {
1157 for !cl.allTorrentsCompleted() {
1158 if cl.closed.IsSet() {
1166 // Returns handles to all the torrents loaded in the Client.
1167 func (cl *Client) Torrents() []*Torrent {
1170 return cl.torrentsAsSlice()
1173 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1174 for _, t := range cl.torrents {
1175 ret = append(ret, t)
1180 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1181 spec, err := TorrentSpecFromMagnetURI(uri)
1185 T, _, err = cl.AddTorrentSpec(spec)
1189 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1190 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1192 slices.MakeInto(&ss, mi.Nodes)
1197 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1198 mi, err := metainfo.LoadFromFile(filename)
1202 return cl.AddTorrent(mi)
1205 func (cl *Client) DhtServers() []DhtServer {
1206 return cl.dhtServers
1209 func (cl *Client) AddDHTNodes(nodes []string) {
1210 for _, n := range nodes {
1211 hmp := missinggo.SplitHostMaybePort(n)
1212 ip := net.ParseIP(hmp.Host)
1214 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1217 ni := krpc.NodeInfo{
1218 Addr: krpc.NodeAddr{
1223 cl.eachDhtServer(func(s DhtServer) {
1229 func (cl *Client) banPeerIP(ip net.IP) {
1230 cl.logger.Printf("banning ip %v", ip)
1231 if cl.badPeerIPs == nil {
1232 cl.badPeerIPs = make(map[string]struct{})
1234 cl.badPeerIPs[ip.String()] = struct{}{}
1237 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1243 PeerMaxRequests: 250,
1244 writeBuffer: new(bytes.Buffer),
1245 remoteAddr: remoteAddr,
1247 connString: connString,
1249 c.logger = cl.logger.WithValues(c,
1250 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1251 ).WithText(func(m log.Msg) string {
1252 return fmt.Sprintf("%v: %s", c, m.Text())
1254 c.writerCond.L = cl.locker()
1255 c.setRW(connStatsReadWriter{nc, c})
1256 c.r = &rateLimitedReader{
1257 l: cl.config.DownloadRateLimiter,
1260 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1264 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1272 Addr: ipPortAddr{ip, port},
1273 Source: PeerSourceDhtAnnouncePeer,
1277 func firstNotNil(ips ...net.IP) net.IP {
1278 for _, ip := range ips {
1286 func (cl *Client) eachDialer(f func(Dialer) bool) {
1287 for _, s := range cl.dialers {
1294 func (cl *Client) eachListener(f func(Listener) bool) {
1295 for _, s := range cl.listeners {
1302 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1303 cl.eachListener(func(l Listener) bool {
1310 func (cl *Client) publicIp(peer net.IP) net.IP {
1311 // TODO: Use BEP 10 to determine how peers are seeing us.
1312 if peer.To4() != nil {
1314 cl.config.PublicIp4,
1315 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1320 cl.config.PublicIp6,
1321 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1325 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1328 func(l net.Listener) bool {
1329 return f(addrIpOrNil(l.Addr()))
1335 // Our IP as a peer should see it.
1336 func (cl *Client) publicAddr(peer net.IP) IpPort {
1337 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1340 // ListenAddrs addresses currently being listened to.
1341 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1344 cl.eachListener(func(l Listener) bool {
1345 ret = append(ret, l.Addr())
1351 func (cl *Client) onBadAccept(addr net.Addr) {
1352 ipa, ok := tryIpPortFromNetAddr(addr)
1356 ip := maskIpForAcceptLimiting(ipa.IP)
1357 if cl.acceptLimiter == nil {
1358 cl.acceptLimiter = make(map[ipStr]int)
1360 cl.acceptLimiter[ipStr(ip.String())]++
1363 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1364 if ip4 := ip.To4(); ip4 != nil {
1365 return ip4.Mask(net.CIDRMask(24, 32))
1370 func (cl *Client) clearAcceptLimits() {
1371 cl.acceptLimiter = nil
1374 func (cl *Client) acceptLimitClearer() {
1377 case <-cl.closed.LockedChan(cl.locker()):
1379 case <-time.After(15 * time.Minute):
1381 cl.clearAcceptLimits()
1387 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1388 if cl.config.DisableAcceptRateLimiting {
1391 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1394 func (cl *Client) rLock() {
1398 func (cl *Client) rUnlock() {
1402 func (cl *Client) lock() {
1406 func (cl *Client) unlock() {
1410 func (cl *Client) locker() *lockWithDeferreds {
1414 func (cl *Client) String() string {
1415 return fmt.Sprintf("<%[1]T %[1]p>", cl)