17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/davecgh/go-spew/spew"
27 "github.com/dustin/go-humanize"
28 "github.com/google/btree"
29 "golang.org/x/time/rate"
30 "golang.org/x/xerrors"
32 "github.com/anacrolix/missinggo/v2"
33 "github.com/anacrolix/missinggo/v2/conntrack"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/iplist"
37 "github.com/anacrolix/torrent/metainfo"
38 "github.com/anacrolix/torrent/mse"
39 pp "github.com/anacrolix/torrent/peer_protocol"
40 "github.com/anacrolix/torrent/storage"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
62 dhtServers []DhtServer
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 // Returns the port number for the first listener that has one. No longer assumes that all port
95 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l Listener) bool {
99 port = addrPortOrZero(l.Addr())
105 func writeDhtServerStatus(w io.Writer, s DhtServer) {
106 dhtStats := s.Stats()
107 fmt.Fprintf(w, " ID: %x\n", s.ID())
108 spew.Fdump(w, dhtStats)
111 // Writes out a human readable status of the client, such as for writing to a
113 func (cl *Client) WriteStatus(_w io.Writer) {
116 w := bufio.NewWriter(_w)
118 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
119 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
120 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
121 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
122 cl.eachDhtServer(func(s DhtServer) {
123 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
124 writeDhtServerStatus(w, s)
126 spew.Fdump(w, &cl.stats)
127 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
129 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
130 return l.InfoHash().AsString() < r.InfoHash().AsString()
133 fmt.Fprint(w, "<unknown name>")
135 fmt.Fprint(w, t.name())
139 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
141 w.WriteString("<missing metainfo>")
149 const debugLogValue = log.Debug
151 func (cl *Client) debugLogFilter(m log.Msg) bool {
155 return !m.HasValue(debugLogValue)
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
162 func (cl *Client) announceKey() int32 {
163 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
166 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
168 cfg = NewDefaultClientConfig()
178 dopplegangerAddrs: make(map[string]struct{}),
179 torrents: make(map[metainfo.Hash]*Torrent),
180 dialRateLimiter: rate.NewLimiter(10, 10),
182 go cl.acceptLimitClearer()
190 cl.extensionBytes = defaultPeerExtensionBytes()
191 cl.event.L = cl.locker()
192 storageImpl := cfg.DefaultStorage
193 if storageImpl == nil {
194 // We'd use mmap by default but HFS+ doesn't support sparse files.
195 storageImplCloser := storage.NewFile(cfg.DataDir)
196 cl.onClose = append(cl.onClose, func() {
197 if err := storageImplCloser.Close(); err != nil {
198 cl.logger.Printf("error closing default storage: %s", err)
201 storageImpl = storageImplCloser
203 cl.defaultStorage = storage.NewClient(storageImpl)
204 if cfg.IPBlocklist != nil {
205 cl.ipBlockList = cfg.IPBlocklist
208 if cfg.PeerID != "" {
209 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
211 o := copy(cl.peerID[:], cfg.Bep20)
212 _, err = rand.Read(cl.peerID[o:])
214 panic("error generating peer id")
218 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
226 for _, _s := range sockets {
227 s := _s // Go is fucking retarded.
228 cl.onClose = append(cl.onClose, func() { s.Close() })
229 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
230 cl.dialers = append(cl.dialers, s)
231 cl.listeners = append(cl.listeners, s)
232 go cl.acceptConnections(s)
238 for _, s := range sockets {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newAnacrolixDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
245 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 func (cl *Client) AddDhtServer(d DhtServer) {
254 cl.dhtServers = append(cl.dhtServers, d)
257 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
258 // given address for any Torrent.
259 func (cl *Client) AddDialer(d Dialer) {
260 cl.dialers = append(cl.dialers, d)
263 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
265 func (cl *Client) AddListener(l Listener) {
266 cl.listeners = append(cl.listeners, l)
267 go cl.acceptConnections(l)
270 func (cl *Client) firewallCallback(net.Addr) bool {
272 block := !cl.wantConns()
275 torrent.Add("connections firewalled", 1)
277 torrent.Add("connections not firewalled", 1)
282 func (cl *Client) listenOnNetwork(n network) bool {
283 if n.Ipv4 && cl.config.DisableIPv4 {
286 if n.Ipv6 && cl.config.DisableIPv6 {
289 if n.Tcp && cl.config.DisableTCP {
292 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
298 func (cl *Client) listenNetworks() (ns []network) {
299 for _, n := range allPeerNetworks {
300 if cl.listenOnNetwork(n) {
307 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
308 cfg := dht.ServerConfig{
309 IPBlocklist: cl.ipBlockList,
311 OnAnnouncePeer: cl.onDHTAnnouncePeer,
312 PublicIP: func() net.IP {
313 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
314 return cl.config.PublicIp6
316 return cl.config.PublicIp4
318 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
319 ConnectionTracking: cl.config.ConnTracker,
320 OnQuery: cl.config.DHTOnQuery,
321 Logger: cl.logger.WithText(func(m log.Msg) string {
322 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
325 s, err = dht.NewServer(&cfg)
328 ts, err := s.Bootstrap()
330 cl.logger.Printf("error bootstrapping dht: %s", err)
332 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
338 func (cl *Client) Closed() <-chan struct{} {
344 func (cl *Client) eachDhtServer(f func(DhtServer)) {
345 for _, ds := range cl.dhtServers {
350 // Stops the client. All connections to peers are closed and all activity will
352 func (cl *Client) Close() {
356 for _, t := range cl.torrents {
359 for i := range cl.onClose {
360 cl.onClose[len(cl.onClose)-1-i]()
365 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
366 if cl.ipBlockList == nil {
369 return cl.ipBlockList.Lookup(ip)
372 func (cl *Client) ipIsBlocked(ip net.IP) bool {
373 _, blocked := cl.ipBlockRange(ip)
377 func (cl *Client) wantConns() bool {
378 for _, t := range cl.torrents {
386 func (cl *Client) waitAccept() {
388 if cl.closed.IsSet() {
398 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
399 func (cl *Client) rejectAccepted(conn net.Conn) error {
400 ra := conn.RemoteAddr()
401 if rip := addrIpOrNil(ra); rip != nil {
402 if cl.config.DisableIPv4Peers && rip.To4() != nil {
403 return errors.New("ipv4 peers disabled")
405 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
406 return errors.New("ipv4 disabled")
409 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
410 return errors.New("ipv6 disabled")
412 if cl.rateLimitAccept(rip) {
413 return errors.New("source IP accepted rate limited")
415 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
416 return errors.New("bad source addr")
422 func (cl *Client) acceptConnections(l net.Listener) {
424 conn, err := l.Accept()
425 torrent.Add("client listener accepts", 1)
426 conn = pproffd.WrapNetConn(conn)
428 closed := cl.closed.IsSet()
431 reject = cl.rejectAccepted(conn)
441 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
446 torrent.Add("rejected accepted connections", 1)
447 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
450 go cl.incomingConnection(conn)
452 log.Fmsg("accepted %q connection at %q from %q",
456 ).AddValue(debugLogValue).Log(cl.logger)
457 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
458 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
459 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
464 func (cl *Client) incomingConnection(nc net.Conn) {
466 if tc, ok := nc.(*net.TCPConn); ok {
469 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network())
470 c.Discovery = PeerSourceIncoming
471 cl.runReceivedConn(c)
474 // Returns a handle to the given torrent, if it's present in the client.
475 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
478 t, ok = cl.torrents[ih]
482 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
483 return cl.torrents[ih]
486 type dialResult struct {
491 func countDialResult(err error) {
493 torrent.Add("successful dials", 1)
495 torrent.Add("unsuccessful dials", 1)
499 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
500 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
501 if ret < minDialTimeout {
507 // Returns whether an address is known to connect to a client with our own ID.
508 func (cl *Client) dopplegangerAddr(addr string) bool {
509 _, ok := cl.dopplegangerAddrs[addr]
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
516 t := perf.NewTimer(perf.CallerName(0))
519 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
521 t.Mark("returned conn over " + res.Network)
525 ctx, cancel := context.WithCancel(ctx)
526 // As soon as we return one connection, cancel the others.
529 resCh := make(chan dialResult, left)
533 cl.eachDialer(func(s Dialer) bool {
536 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
539 cl.dialFromSocket(ctx, s, addr),
540 s.LocalAddr().Network(),
547 // Wait for a successful connection.
549 defer perf.ScopeTimer()()
550 for ; left > 0 && res.Conn == nil; left-- {
554 // There are still incompleted dials.
556 for ; left > 0; left-- {
557 conn := (<-resCh).Conn
564 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
566 //if res.Conn != nil {
567 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
569 // cl.logger.Printf("failed to dial %s", addr)
574 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
575 network := s.LocalAddr().Network()
576 cte := cl.config.ConnTracker.Wait(
578 conntrack.Entry{network, s.LocalAddr().String(), addr},
579 "dial torrent client",
582 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
583 // which dial errors allow us to forget the connection tracking entry handle.
584 if ctx.Err() != nil {
590 c, err := s.Dial(ctx, addr)
591 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
592 // it now in case we close the connection forthwith.
593 if tc, ok := c.(*net.TCPConn); ok {
598 if err != nil && forgettableDialError(err) {
605 return closeWrapper{c, func() error {
612 func forgettableDialError(err error) bool {
613 return strings.Contains(err.Error(), "no suitable address found")
616 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
617 if _, ok := t.halfOpen[addr]; !ok {
618 panic("invariant broken")
620 delete(t.halfOpen, addr)
624 // Performs initiator handshakes and returns a connection. Returns nil
625 // *connection if no connection for valid reasons.
626 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr, network string) (c *PeerConn, err error) {
627 c = cl.newConnection(nc, true, remoteAddr, network)
628 c.headerEncrypted = encryptHeader
629 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
631 dl, ok := ctx.Deadline()
635 err = nc.SetDeadline(dl)
639 err = cl.initiateHandshakes(c, t)
643 // Returns nil connection and nil error if no connection could be established
644 // for valid reasons.
645 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
646 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
649 return t.dialTimeout()
652 dr := cl.dialFirst(dialCtx, addr.String())
655 if dialCtx.Err() != nil {
656 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
658 return nil, errors.New("dial failed")
660 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
667 // Returns nil connection and nil error if no connection could be established
668 // for valid reasons.
669 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
670 torrent.Add("establish outgoing connection", 1)
671 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
672 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
674 torrent.Add("initiated conn with preferred header obfuscation", 1)
677 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
678 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
679 // We should have just tried with the preferred header obfuscation. If it was required,
680 // there's nothing else to try.
683 // Try again with encryption if we didn't earlier, or without if we did.
684 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
686 torrent.Add("initiated conn with fallback header obfuscation", 1)
688 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
692 // Called to dial out and run a connection. The addr we're given is already
693 // considered half-open.
694 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
695 cl.dialRateLimiter.Wait(context.Background())
696 c, err := cl.establishOutgoingConn(t, addr)
699 // Don't release lock between here and addConnection, unless it's for
701 cl.noLongerHalfOpen(t, addr.String())
704 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
711 cl.runHandshookConn(c, t)
714 // The port number for incoming peer connections. 0 if the client isn't listening.
715 func (cl *Client) incomingPeerPort() int {
716 return cl.LocalPort()
719 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
720 if c.headerEncrypted {
723 rw, c.cryptoMethod, err = mse.InitiateHandshake(
730 cl.config.CryptoProvides,
734 return xerrors.Errorf("header obfuscation handshake: %w", err)
737 ih, err := cl.connBtHandshake(c, &t.infoHash)
739 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
741 if ih != t.infoHash {
742 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
747 // Calls f with any secret keys.
748 func (cl *Client) forSkeys(f func([]byte) bool) {
751 if false { // Emulate the bug from #114
753 for ih := range cl.torrents {
757 for range cl.torrents {
764 for ih := range cl.torrents {
771 // Do encryption and bittorrent handshakes as receiver.
772 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
773 defer perf.ScopeTimerErr(&err)()
775 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
777 if err == nil || err == mse.ErrNoSecretKeyMatch {
778 if c.headerEncrypted {
779 torrent.Add("handshakes received encrypted", 1)
781 torrent.Add("handshakes received unencrypted", 1)
784 torrent.Add("handshakes received with error while handling encryption", 1)
787 if err == mse.ErrNoSecretKeyMatch {
792 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
793 err = errors.New("connection not have required header obfuscation")
796 ih, err := cl.connBtHandshake(c, nil)
798 err = xerrors.Errorf("during bt handshake: %w", err)
807 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
808 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
813 c.PeerExtensionBytes = res.PeerExtensionBits
814 c.PeerID = res.PeerID
815 c.completedHandshake = time.Now()
819 func (cl *Client) runReceivedConn(c *PeerConn) {
820 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
824 t, err := cl.receiveHandshakes(c)
827 "error receiving handshakes on %v: %s", c, err,
831 "network", c.network,
833 torrent.Add("error receiving handshake", 1)
835 cl.onBadAccept(c.remoteAddr)
840 torrent.Add("received handshake for unloaded torrent", 1)
841 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
843 cl.onBadAccept(c.remoteAddr)
847 torrent.Add("received handshake for loaded torrent", 1)
850 cl.runHandshookConn(c, t)
853 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
855 if c.PeerID == cl.peerID {
858 addr := c.conn.RemoteAddr().String()
859 cl.dopplegangerAddrs[addr] = struct{}{}
861 // Because the remote address is not necessarily the same as its client's torrent listen
862 // address, we won't record the remote address as a doppleganger. Instead, the initiator
863 // can record *us* as the doppleganger.
867 c.conn.SetWriteDeadline(time.Time{})
868 c.r = deadlineReader{c.conn, c.r}
869 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
870 if connIsIpv6(c.conn) {
871 torrent.Add("completed handshake over ipv6", 1)
873 if err := t.addConnection(c); err != nil {
874 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
877 defer t.dropConnection(c)
878 go c.writer(time.Minute)
879 cl.sendInitialMessages(c, t)
880 err := c.mainReadLoop()
881 if err != nil && cl.config.Debug {
882 cl.logger.Printf("error during connection main read loop: %s", err)
886 // See the order given in Transmission's tr_peerMsgsNew.
887 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
888 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
889 conn.post(pp.Message{
891 ExtendedID: pp.HandshakeExtendedID,
892 ExtendedPayload: func() []byte {
893 msg := pp.ExtendedHandshakeMessage{
894 M: map[pp.ExtensionName]pp.ExtensionNumber{
895 pp.ExtensionNameMetadata: metadataExtendedId,
897 V: cl.config.ExtendedHandshakeClientVersion,
898 Reqq: 64, // TODO: Really?
899 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
900 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
901 Port: cl.incomingPeerPort(),
902 MetadataSize: torrent.metadataSize(),
903 // TODO: We can figured these out specific to the socket
905 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
906 Ipv6: cl.config.PublicIp6.To16(),
908 if !cl.config.DisablePEX {
909 msg.M[pp.ExtensionNamePex] = pexExtendedId
911 return bencode.MustMarshal(msg)
916 if conn.fastEnabled() {
917 if torrent.haveAllPieces() {
918 conn.post(pp.Message{Type: pp.HaveAll})
919 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
921 } else if !torrent.haveAnyPieces() {
922 conn.post(pp.Message{Type: pp.HaveNone})
923 conn.sentHaves.Clear()
929 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
930 conn.post(pp.Message{
937 func (cl *Client) dhtPort() (ret uint16) {
938 cl.eachDhtServer(func(s DhtServer) {
939 ret = uint16(missinggo.AddrPort(s.Addr()))
944 func (cl *Client) haveDhtServer() (ret bool) {
945 cl.eachDhtServer(func(_ DhtServer) {
951 // Process incoming ut_metadata message.
952 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
954 err := bencode.Unmarshal(payload, &d)
955 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
956 } else if err != nil {
957 return fmt.Errorf("error unmarshalling bencode: %s", err)
959 msgType, ok := d["msg_type"]
961 return errors.New("missing msg_type field")
965 case pp.DataMetadataExtensionMsgType:
966 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
967 if !c.requestedMetadataPiece(piece) {
968 return fmt.Errorf("got unexpected piece %d", piece)
970 c.metadataRequests[piece] = false
971 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
972 if begin < 0 || begin >= len(payload) {
973 return fmt.Errorf("data has bad offset in payload: %d", begin)
975 t.saveMetadataPiece(piece, payload[begin:])
976 c.lastUsefulChunkReceived = time.Now()
977 return t.maybeCompleteMetadata()
978 case pp.RequestMetadataExtensionMsgType:
979 if !t.haveMetadataPiece(piece) {
980 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
983 start := (1 << 14) * piece
984 c.logger.Printf("sending metadata piece %d", piece)
985 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
987 case pp.RejectMetadataExtensionMsgType:
990 return errors.New("unknown msg_type value")
994 func (cl *Client) badPeerAddr(addr net.Addr) bool {
995 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
996 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1001 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1005 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1008 if _, ok := cl.ipBlockRange(ip); ok {
1011 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1017 // Return a Torrent ready for insertion into a Client.
1018 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1019 // use provided storage, if provided
1020 storageClient := cl.defaultStorage
1021 if specStorage != nil {
1022 storageClient = storage.NewClient(specStorage)
1028 peers: prioritizedPeers{
1030 getPrio: func(p Peer) peerPriority {
1031 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1034 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1036 halfOpen: make(map[string]Peer),
1037 pieceStateChanges: pubsub.NewPubSub(),
1039 storageOpener: storageClient,
1040 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1042 networkingEnabled: true,
1043 metadataChanged: sync.Cond{
1047 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1048 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1049 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1050 return fmt.Sprintf("%v: %s", t, m.Text())
1052 t.setChunkSize(defaultChunkSize)
1056 // A file-like handle to some torrent data resource.
1057 type Handle interface {
1064 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1065 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1068 // Adds a torrent by InfoHash with a custom Storage implementation.
1069 // If the torrent already exists then this Storage is ignored and the
1070 // existing torrent returned with `new` set to `false`
1071 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1074 t, ok := cl.torrents[infoHash]
1080 t = cl.newTorrent(infoHash, specStorage)
1081 cl.eachDhtServer(func(s DhtServer) {
1082 go t.dhtAnnouncer(s)
1084 cl.torrents[infoHash] = t
1085 cl.clearAcceptLimits()
1086 t.updateWantPeersEvent()
1087 // Tickle Client.waitAccept, new torrent may want conns.
1088 cl.event.Broadcast()
1092 // Add or merge a torrent spec. If the torrent is already present, the
1093 // trackers will be merged with the existing ones. If the Info isn't yet
1094 // known, it will be set. The display name is replaced if the new spec
1095 // provides one. Returns new if the torrent wasn't already in the client.
1096 // Note that any `Storage` defined on the spec will be ignored if the
1097 // torrent is already present (i.e. `new` return value is `true`)
1098 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1099 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1100 if spec.DisplayName != "" {
1101 t.SetDisplayName(spec.DisplayName)
1103 if spec.InfoBytes != nil {
1104 err = t.SetInfoBytes(spec.InfoBytes)
1111 if spec.ChunkSize != 0 {
1112 t.setChunkSize(pp.Integer(spec.ChunkSize))
1114 t.addTrackers(spec.Trackers)
1119 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1120 t, ok := cl.torrents[infoHash]
1122 err = fmt.Errorf("no such torrent")
1129 delete(cl.torrents, infoHash)
1133 func (cl *Client) allTorrentsCompleted() bool {
1134 for _, t := range cl.torrents {
1138 if !t.haveAllPieces() {
1145 // Returns true when all torrents are completely downloaded and false if the
1146 // client is stopped before that.
1147 func (cl *Client) WaitAll() bool {
1150 for !cl.allTorrentsCompleted() {
1151 if cl.closed.IsSet() {
1159 // Returns handles to all the torrents loaded in the Client.
1160 func (cl *Client) Torrents() []*Torrent {
1163 return cl.torrentsAsSlice()
1166 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1167 for _, t := range cl.torrents {
1168 ret = append(ret, t)
1173 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1174 spec, err := TorrentSpecFromMagnetURI(uri)
1178 T, _, err = cl.AddTorrentSpec(spec)
1182 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1183 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1185 slices.MakeInto(&ss, mi.Nodes)
1190 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1191 mi, err := metainfo.LoadFromFile(filename)
1195 return cl.AddTorrent(mi)
1198 func (cl *Client) DhtServers() []DhtServer {
1199 return cl.dhtServers
1202 func (cl *Client) AddDHTNodes(nodes []string) {
1203 for _, n := range nodes {
1204 hmp := missinggo.SplitHostMaybePort(n)
1205 ip := net.ParseIP(hmp.Host)
1207 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1210 ni := krpc.NodeInfo{
1211 Addr: krpc.NodeAddr{
1216 cl.eachDhtServer(func(s DhtServer) {
1222 func (cl *Client) banPeerIP(ip net.IP) {
1223 cl.logger.Printf("banning ip %v", ip)
1224 if cl.badPeerIPs == nil {
1225 cl.badPeerIPs = make(map[string]struct{})
1227 cl.badPeerIPs[ip.String()] = struct{}{}
1230 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network string) (c *PeerConn) {
1236 PeerMaxRequests: 250,
1237 writeBuffer: new(bytes.Buffer),
1238 remoteAddr: remoteAddr,
1241 c.logger = cl.logger.WithValues(c,
1242 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1243 ).WithText(func(m log.Msg) string {
1244 return fmt.Sprintf("%v: %s", c, m.Text())
1246 c.writerCond.L = cl.locker()
1247 c.setRW(connStatsReadWriter{nc, c})
1248 c.r = &rateLimitedReader{
1249 l: cl.config.DownloadRateLimiter,
1252 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1256 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1264 Addr: ipPortAddr{ip, port},
1265 Source: PeerSourceDhtAnnouncePeer,
1269 func firstNotNil(ips ...net.IP) net.IP {
1270 for _, ip := range ips {
1278 func (cl *Client) eachDialer(f func(Dialer) bool) {
1279 for _, s := range cl.dialers {
1286 func (cl *Client) eachListener(f func(Listener) bool) {
1287 for _, s := range cl.listeners {
1294 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1295 cl.eachListener(func(l Listener) bool {
1302 func (cl *Client) publicIp(peer net.IP) net.IP {
1303 // TODO: Use BEP 10 to determine how peers are seeing us.
1304 if peer.To4() != nil {
1306 cl.config.PublicIp4,
1307 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1312 cl.config.PublicIp6,
1313 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1317 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1320 func(l net.Listener) bool {
1321 return f(addrIpOrNil(l.Addr()))
1327 // Our IP as a peer should see it.
1328 func (cl *Client) publicAddr(peer net.IP) IpPort {
1329 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1332 // ListenAddrs addresses currently being listened to.
1333 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1336 cl.eachListener(func(l Listener) bool {
1337 ret = append(ret, l.Addr())
1343 func (cl *Client) onBadAccept(addr net.Addr) {
1344 ipa, ok := tryIpPortFromNetAddr(addr)
1348 ip := maskIpForAcceptLimiting(ipa.IP)
1349 if cl.acceptLimiter == nil {
1350 cl.acceptLimiter = make(map[ipStr]int)
1352 cl.acceptLimiter[ipStr(ip.String())]++
1355 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1356 if ip4 := ip.To4(); ip4 != nil {
1357 return ip4.Mask(net.CIDRMask(24, 32))
1362 func (cl *Client) clearAcceptLimits() {
1363 cl.acceptLimiter = nil
1366 func (cl *Client) acceptLimitClearer() {
1369 case <-cl.closed.LockedChan(cl.locker()):
1371 case <-time.After(15 * time.Minute):
1373 cl.clearAcceptLimits()
1379 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1380 if cl.config.DisableAcceptRateLimiting {
1383 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1386 func (cl *Client) rLock() {
1390 func (cl *Client) rUnlock() {
1394 func (cl *Client) lock() {
1398 func (cl *Client) unlock() {
1402 func (cl *Client) locker() *lockWithDeferreds {
1406 func (cl *Client) String() string {
1407 return fmt.Sprintf("<%[1]T %[1]p>", cl)