17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2"
26 "github.com/anacrolix/missinggo/v2/conntrack"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/torrent/bencode"
35 "github.com/anacrolix/torrent/iplist"
36 "github.com/anacrolix/torrent/metainfo"
37 "github.com/anacrolix/torrent/mse"
38 pp "github.com/anacrolix/torrent/peer_protocol"
39 "github.com/anacrolix/torrent/storage"
42 // Clients contain zero or more Torrents. A Client manages a blocklist, the
43 // TCP/UDP protocol ports, and DHT as desired.
45 // An aggregate of stats over all connections. First in struct to ensure
46 // 64-bit alignment of fields. See #262.
51 closed missinggo.Event
57 defaultStorage *storage.Client
61 dhtServers []DhtServer
62 ipBlockList iplist.Ranger
63 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
64 extensionBytes pp.PeerExtensionBits
66 // Set of addresses that have our client ID. This intentionally will
67 // include ourselves if we end up trying to connect to our own address
68 // through legitimate channels.
69 dopplegangerAddrs map[string]struct{}
70 badPeerIPs map[string]struct{}
71 torrents map[InfoHash]*Torrent
73 acceptLimiter map[ipStr]int
74 dialRateLimiter *rate.Limiter
79 func (cl *Client) BadPeerIPs() []string {
82 return cl.badPeerIPsLocked()
85 func (cl *Client) badPeerIPsLocked() []string {
86 return slices.FromMapKeys(cl.badPeerIPs).([]string)
89 func (cl *Client) PeerID() PeerID {
93 // Returns the port number for the first listener that has one. No longer assumes that all port
94 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
96 func (cl *Client) LocalPort() (port int) {
97 cl.eachListener(func(l Listener) bool {
98 port = addrPortOrZero(l.Addr())
104 func writeDhtServerStatus(w io.Writer, s DhtServer) {
105 dhtStats := s.Stats()
106 fmt.Fprintf(w, " ID: %x\n", s.ID())
107 spew.Fdump(w, dhtStats)
110 // Writes out a human readable status of the client, such as for writing to a
112 func (cl *Client) WriteStatus(_w io.Writer) {
115 w := bufio.NewWriter(_w)
117 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
118 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
119 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
120 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
121 cl.eachDhtServer(func(s DhtServer) {
122 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
123 writeDhtServerStatus(w, s)
125 spew.Fdump(w, &cl.stats)
126 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
128 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
129 return l.InfoHash().AsString() < r.InfoHash().AsString()
132 fmt.Fprint(w, "<unknown name>")
134 fmt.Fprint(w, t.name())
138 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
140 w.WriteString("<missing metainfo>")
148 const debugLogValue = log.Debug
150 func (cl *Client) debugLogFilter(m log.Msg) bool {
154 return !m.HasValue(debugLogValue)
157 func (cl *Client) initLogger() {
158 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
161 func (cl *Client) announceKey() int32 {
162 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
165 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
167 cfg = NewDefaultClientConfig()
177 dopplegangerAddrs: make(map[string]struct{}),
178 torrents: make(map[metainfo.Hash]*Torrent),
179 dialRateLimiter: rate.NewLimiter(10, 10),
181 go cl.acceptLimitClearer()
189 cl.extensionBytes = defaultPeerExtensionBytes()
190 cl.event.L = cl.locker()
191 storageImpl := cfg.DefaultStorage
192 if storageImpl == nil {
193 // We'd use mmap but HFS+ doesn't support sparse files.
194 storageImpl = storage.NewFile(cfg.DataDir)
195 cl.onClose = append(cl.onClose, func() {
196 if err := storageImpl.Close(); err != nil {
197 cl.logger.Printf("error closing default storage: %s", err)
201 cl.defaultStorage = storage.NewClient(storageImpl)
202 if cfg.IPBlocklist != nil {
203 cl.ipBlockList = cfg.IPBlocklist
206 if cfg.PeerID != "" {
207 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
209 o := copy(cl.peerID[:], cfg.Bep20)
210 _, err = rand.Read(cl.peerID[o:])
212 panic("error generating peer id")
216 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
224 for _, _s := range sockets {
225 s := _s // Go is fucking retarded.
226 cl.onClose = append(cl.onClose, func() { s.Close() })
227 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
228 cl.dialers = append(cl.dialers, s)
229 cl.listeners = append(cl.listeners, s)
230 go cl.acceptConnections(s)
236 for _, s := range sockets {
237 if pc, ok := s.(net.PacketConn); ok {
238 ds, err := cl.newAnacrolixDhtServer(pc)
242 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
243 cl.onClose = append(cl.onClose, func() { ds.Close() })
251 func (cl *Client) AddDhtServer(d DhtServer) {
252 cl.dhtServers = append(cl.dhtServers, d)
255 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
256 // given address for any Torrent.
257 func (cl *Client) AddDialer(d Dialer) {
258 cl.dialers = append(cl.dialers, d)
261 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
263 func (cl *Client) AddListener(l Listener) {
264 cl.listeners = append(cl.listeners, l)
265 go cl.acceptConnections(l)
268 func (cl *Client) firewallCallback(net.Addr) bool {
270 block := !cl.wantConns()
273 torrent.Add("connections firewalled", 1)
275 torrent.Add("connections not firewalled", 1)
280 func (cl *Client) listenOnNetwork(n network) bool {
281 if n.Ipv4 && cl.config.DisableIPv4 {
284 if n.Ipv6 && cl.config.DisableIPv6 {
287 if n.Tcp && cl.config.DisableTCP {
290 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
296 func (cl *Client) listenNetworks() (ns []network) {
297 for _, n := range allPeerNetworks {
298 if cl.listenOnNetwork(n) {
305 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306 cfg := dht.ServerConfig{
307 IPBlocklist: cl.ipBlockList,
309 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310 PublicIP: func() net.IP {
311 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312 return cl.config.PublicIp6
314 return cl.config.PublicIp4
316 StartingNodes: cl.config.DhtStartingNodes,
317 ConnectionTracking: cl.config.ConnTracker,
318 OnQuery: cl.config.DHTOnQuery,
319 Logger: cl.logger.WithValues("dht", conn.LocalAddr().String()),
321 s, err = dht.NewServer(&cfg)
324 ts, err := s.Bootstrap()
326 cl.logger.Printf("error bootstrapping dht: %s", err)
328 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
334 func (cl *Client) Closed() <-chan struct{} {
340 func (cl *Client) eachDhtServer(f func(DhtServer)) {
341 for _, ds := range cl.dhtServers {
346 // Stops the client. All connections to peers are closed and all activity will
348 func (cl *Client) Close() {
352 for _, t := range cl.torrents {
355 for i := range cl.onClose {
356 cl.onClose[len(cl.onClose)-1-i]()
361 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
362 if cl.ipBlockList == nil {
365 return cl.ipBlockList.Lookup(ip)
368 func (cl *Client) ipIsBlocked(ip net.IP) bool {
369 _, blocked := cl.ipBlockRange(ip)
373 func (cl *Client) wantConns() bool {
374 for _, t := range cl.torrents {
382 func (cl *Client) waitAccept() {
384 if cl.closed.IsSet() {
394 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
395 func (cl *Client) rejectAccepted(conn net.Conn) error {
396 ra := conn.RemoteAddr()
397 if rip := addrIpOrNil(ra); rip != nil {
398 if cl.config.DisableIPv4Peers && rip.To4() != nil {
399 return errors.New("ipv4 peers disabled")
401 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
402 return errors.New("ipv4 disabled")
405 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
406 return errors.New("ipv6 disabled")
408 if cl.rateLimitAccept(rip) {
409 return errors.New("source IP accepted rate limited")
411 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
412 return errors.New("bad source addr")
418 func (cl *Client) acceptConnections(l net.Listener) {
420 conn, err := l.Accept()
421 torrent.Add("client listener accepts", 1)
422 conn = pproffd.WrapNetConn(conn)
424 closed := cl.closed.IsSet()
427 reject = cl.rejectAccepted(conn)
437 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
442 torrent.Add("rejected accepted connections", 1)
443 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
446 go cl.incomingConnection(conn)
448 log.Fmsg("accepted %q connection at %q from %q",
452 ).AddValue(debugLogValue).Log(cl.logger)
453 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
454 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
455 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
460 func (cl *Client) incomingConnection(nc net.Conn) {
462 if tc, ok := nc.(*net.TCPConn); ok {
465 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network())
466 c.Discovery = PeerSourceIncoming
467 cl.runReceivedConn(c)
470 // Returns a handle to the given torrent, if it's present in the client.
471 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
474 t, ok = cl.torrents[ih]
478 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
479 return cl.torrents[ih]
482 type dialResult struct {
487 func countDialResult(err error) {
489 torrent.Add("successful dials", 1)
491 torrent.Add("unsuccessful dials", 1)
495 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
496 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
497 if ret < minDialTimeout {
503 // Returns whether an address is known to connect to a client with our own ID.
504 func (cl *Client) dopplegangerAddr(addr string) bool {
505 _, ok := cl.dopplegangerAddrs[addr]
509 // Returns a connection over UTP or TCP, whichever is first to connect.
510 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
512 t := perf.NewTimer(perf.CallerName(0))
515 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
517 t.Mark("returned conn over " + res.Network)
521 ctx, cancel := context.WithCancel(ctx)
522 // As soon as we return one connection, cancel the others.
525 resCh := make(chan dialResult, left)
529 cl.eachDialer(func(s Dialer) bool {
532 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
535 cl.dialFromSocket(ctx, s, addr),
536 s.LocalAddr().Network(),
543 // Wait for a successful connection.
545 defer perf.ScopeTimer()()
546 for ; left > 0 && res.Conn == nil; left-- {
550 // There are still incompleted dials.
552 for ; left > 0; left-- {
553 conn := (<-resCh).Conn
560 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
562 //if res.Conn != nil {
563 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
565 // cl.logger.Printf("failed to dial %s", addr)
570 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
571 network := s.LocalAddr().Network()
572 cte := cl.config.ConnTracker.Wait(
574 conntrack.Entry{network, s.LocalAddr().String(), addr},
575 "dial torrent client",
578 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
579 // which dial errors allow us to forget the connection tracking entry handle.
580 if ctx.Err() != nil {
586 c, err := s.Dial(ctx, addr)
587 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
588 // it now in case we close the connection forthwith.
589 if tc, ok := c.(*net.TCPConn); ok {
594 if err != nil && forgettableDialError(err) {
601 return closeWrapper{c, func() error {
608 func forgettableDialError(err error) bool {
609 return strings.Contains(err.Error(), "no suitable address found")
612 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
613 if _, ok := t.halfOpen[addr]; !ok {
614 panic("invariant broken")
616 delete(t.halfOpen, addr)
620 // Performs initiator handshakes and returns a connection. Returns nil
621 // *connection if no connection for valid reasons.
622 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr, network string) (c *PeerConn, err error) {
623 c = cl.newConnection(nc, true, remoteAddr, network)
624 c.headerEncrypted = encryptHeader
625 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
627 dl, ok := ctx.Deadline()
631 err = nc.SetDeadline(dl)
635 err = cl.initiateHandshakes(c, t)
639 // Returns nil connection and nil error if no connection could be established
640 // for valid reasons.
641 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
642 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
645 return t.dialTimeout()
648 dr := cl.dialFirst(dialCtx, addr.String())
651 if dialCtx.Err() != nil {
652 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
654 return nil, errors.New("dial failed")
656 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
663 // Returns nil connection and nil error if no connection could be established
664 // for valid reasons.
665 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
666 torrent.Add("establish outgoing connection", 1)
667 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
668 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
670 torrent.Add("initiated conn with preferred header obfuscation", 1)
673 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
674 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
675 // We should have just tried with the preferred header obfuscation. If it was required,
676 // there's nothing else to try.
679 // Try again with encryption if we didn't earlier, or without if we did.
680 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
682 torrent.Add("initiated conn with fallback header obfuscation", 1)
684 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
688 // Called to dial out and run a connection. The addr we're given is already
689 // considered half-open.
690 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
691 cl.dialRateLimiter.Wait(context.Background())
692 c, err := cl.establishOutgoingConn(t, addr)
695 // Don't release lock between here and addConnection, unless it's for
697 cl.noLongerHalfOpen(t, addr.String())
700 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
707 cl.runHandshookConn(c, t)
710 // The port number for incoming peer connections. 0 if the client isn't listening.
711 func (cl *Client) incomingPeerPort() int {
712 return cl.LocalPort()
715 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
716 if c.headerEncrypted {
719 rw, c.cryptoMethod, err = mse.InitiateHandshake(
726 cl.config.CryptoProvides,
730 return xerrors.Errorf("header obfuscation handshake: %w", err)
733 ih, err := cl.connBtHandshake(c, &t.infoHash)
735 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
737 if ih != t.infoHash {
738 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
743 // Calls f with any secret keys.
744 func (cl *Client) forSkeys(f func([]byte) bool) {
747 if false { // Emulate the bug from #114
749 for ih := range cl.torrents {
753 for range cl.torrents {
760 for ih := range cl.torrents {
767 // Do encryption and bittorrent handshakes as receiver.
768 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
769 defer perf.ScopeTimerErr(&err)()
771 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
773 if err == nil || err == mse.ErrNoSecretKeyMatch {
774 if c.headerEncrypted {
775 torrent.Add("handshakes received encrypted", 1)
777 torrent.Add("handshakes received unencrypted", 1)
780 torrent.Add("handshakes received with error while handling encryption", 1)
783 if err == mse.ErrNoSecretKeyMatch {
788 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
789 err = errors.New("connection not have required header obfuscation")
792 ih, err := cl.connBtHandshake(c, nil)
794 err = xerrors.Errorf("during bt handshake: %w", err)
803 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
804 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
809 c.PeerExtensionBytes = res.PeerExtensionBits
810 c.PeerID = res.PeerID
811 c.completedHandshake = time.Now()
815 func (cl *Client) runReceivedConn(c *PeerConn) {
816 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
820 t, err := cl.receiveHandshakes(c)
823 "error receiving handshakes: %s", err,
827 "network", c.network,
829 torrent.Add("error receiving handshake", 1)
831 cl.onBadAccept(c.remoteAddr)
836 torrent.Add("received handshake for unloaded torrent", 1)
837 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
839 cl.onBadAccept(c.remoteAddr)
843 torrent.Add("received handshake for loaded torrent", 1)
846 cl.runHandshookConn(c, t)
849 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
851 if c.PeerID == cl.peerID {
854 addr := c.conn.RemoteAddr().String()
855 cl.dopplegangerAddrs[addr] = struct{}{}
857 // Because the remote address is not necessarily the same as its client's torrent listen
858 // address, we won't record the remote address as a doppleganger. Instead, the initiator
859 // can record *us* as the doppleganger.
863 c.conn.SetWriteDeadline(time.Time{})
864 c.r = deadlineReader{c.conn, c.r}
865 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
866 if connIsIpv6(c.conn) {
867 torrent.Add("completed handshake over ipv6", 1)
869 if err := t.addConnection(c); err != nil {
870 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
873 defer t.dropConnection(c)
874 go c.writer(time.Minute)
875 cl.sendInitialMessages(c, t)
876 err := c.mainReadLoop()
877 if err != nil && cl.config.Debug {
878 cl.logger.Printf("error during connection main read loop: %s", err)
882 // See the order given in Transmission's tr_peerMsgsNew.
883 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
884 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
885 conn.post(pp.Message{
887 ExtendedID: pp.HandshakeExtendedID,
888 ExtendedPayload: func() []byte {
889 msg := pp.ExtendedHandshakeMessage{
890 M: map[pp.ExtensionName]pp.ExtensionNumber{
891 pp.ExtensionNameMetadata: metadataExtendedId,
893 V: cl.config.ExtendedHandshakeClientVersion,
894 Reqq: 64, // TODO: Really?
895 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
896 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
897 Port: cl.incomingPeerPort(),
898 MetadataSize: torrent.metadataSize(),
899 // TODO: We can figured these out specific to the socket
901 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
902 Ipv6: cl.config.PublicIp6.To16(),
904 if !cl.config.DisablePEX {
905 msg.M[pp.ExtensionNamePex] = pexExtendedId
907 return bencode.MustMarshal(msg)
912 if conn.fastEnabled() {
913 if torrent.haveAllPieces() {
914 conn.post(pp.Message{Type: pp.HaveAll})
915 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
917 } else if !torrent.haveAnyPieces() {
918 conn.post(pp.Message{Type: pp.HaveNone})
919 conn.sentHaves.Clear()
925 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
926 conn.post(pp.Message{
933 func (cl *Client) dhtPort() (ret uint16) {
934 cl.eachDhtServer(func(s DhtServer) {
935 ret = uint16(missinggo.AddrPort(s.Addr()))
940 func (cl *Client) haveDhtServer() (ret bool) {
941 cl.eachDhtServer(func(_ DhtServer) {
947 // Process incoming ut_metadata message.
948 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
950 err := bencode.Unmarshal(payload, &d)
951 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
952 } else if err != nil {
953 return fmt.Errorf("error unmarshalling bencode: %s", err)
955 msgType, ok := d["msg_type"]
957 return errors.New("missing msg_type field")
961 case pp.DataMetadataExtensionMsgType:
962 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
963 if !c.requestedMetadataPiece(piece) {
964 return fmt.Errorf("got unexpected piece %d", piece)
966 c.metadataRequests[piece] = false
967 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
968 if begin < 0 || begin >= len(payload) {
969 return fmt.Errorf("data has bad offset in payload: %d", begin)
971 t.saveMetadataPiece(piece, payload[begin:])
972 c.lastUsefulChunkReceived = time.Now()
973 return t.maybeCompleteMetadata()
974 case pp.RequestMetadataExtensionMsgType:
975 if !t.haveMetadataPiece(piece) {
976 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
979 start := (1 << 14) * piece
980 c.logger.Printf("sending metadata piece %d", piece)
981 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
983 case pp.RejectMetadataExtensionMsgType:
986 return errors.New("unknown msg_type value")
990 func (cl *Client) badPeerAddr(addr net.Addr) bool {
991 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
992 return cl.badPeerIPPort(ipa.IP, ipa.Port)
997 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1001 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1004 if _, ok := cl.ipBlockRange(ip); ok {
1007 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1013 // Return a Torrent ready for insertion into a Client.
1014 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1015 // use provided storage, if provided
1016 storageClient := cl.defaultStorage
1017 if specStorage != nil {
1018 storageClient = storage.NewClient(specStorage)
1024 peers: prioritizedPeers{
1026 getPrio: func(p Peer) peerPriority {
1027 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1030 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1032 halfOpen: make(map[string]Peer),
1033 pieceStateChanges: pubsub.NewPubSub(),
1035 storageOpener: storageClient,
1036 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1038 networkingEnabled: true,
1039 metadataChanged: sync.Cond{
1043 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1044 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1045 return fmt.Sprintf("%v: %s", t, m.Text())
1047 t.setChunkSize(defaultChunkSize)
1051 // A file-like handle to some torrent data resource.
1052 type Handle interface {
1059 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1060 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1063 // Adds a torrent by InfoHash with a custom Storage implementation.
1064 // If the torrent already exists then this Storage is ignored and the
1065 // existing torrent returned with `new` set to `false`
1066 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1069 t, ok := cl.torrents[infoHash]
1075 t = cl.newTorrent(infoHash, specStorage)
1076 cl.eachDhtServer(func(s DhtServer) {
1077 go t.dhtAnnouncer(s)
1079 cl.torrents[infoHash] = t
1080 cl.clearAcceptLimits()
1081 t.updateWantPeersEvent()
1082 // Tickle Client.waitAccept, new torrent may want conns.
1083 cl.event.Broadcast()
1087 // Add or merge a torrent spec. If the torrent is already present, the
1088 // trackers will be merged with the existing ones. If the Info isn't yet
1089 // known, it will be set. The display name is replaced if the new spec
1090 // provides one. Returns new if the torrent wasn't already in the client.
1091 // Note that any `Storage` defined on the spec will be ignored if the
1092 // torrent is already present (i.e. `new` return value is `true`)
1093 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1094 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1095 if spec.DisplayName != "" {
1096 t.SetDisplayName(spec.DisplayName)
1098 if spec.InfoBytes != nil {
1099 err = t.SetInfoBytes(spec.InfoBytes)
1106 if spec.ChunkSize != 0 {
1107 t.setChunkSize(pp.Integer(spec.ChunkSize))
1109 t.addTrackers(spec.Trackers)
1114 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1115 t, ok := cl.torrents[infoHash]
1117 err = fmt.Errorf("no such torrent")
1124 delete(cl.torrents, infoHash)
1128 func (cl *Client) allTorrentsCompleted() bool {
1129 for _, t := range cl.torrents {
1133 if !t.haveAllPieces() {
1140 // Returns true when all torrents are completely downloaded and false if the
1141 // client is stopped before that.
1142 func (cl *Client) WaitAll() bool {
1145 for !cl.allTorrentsCompleted() {
1146 if cl.closed.IsSet() {
1154 // Returns handles to all the torrents loaded in the Client.
1155 func (cl *Client) Torrents() []*Torrent {
1158 return cl.torrentsAsSlice()
1161 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1162 for _, t := range cl.torrents {
1163 ret = append(ret, t)
1168 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1169 spec, err := TorrentSpecFromMagnetURI(uri)
1173 T, _, err = cl.AddTorrentSpec(spec)
1177 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1178 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1180 slices.MakeInto(&ss, mi.Nodes)
1185 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1186 mi, err := metainfo.LoadFromFile(filename)
1190 return cl.AddTorrent(mi)
1193 func (cl *Client) DhtServers() []DhtServer {
1194 return cl.dhtServers
1197 func (cl *Client) AddDHTNodes(nodes []string) {
1198 for _, n := range nodes {
1199 hmp := missinggo.SplitHostMaybePort(n)
1200 ip := net.ParseIP(hmp.Host)
1202 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1205 ni := krpc.NodeInfo{
1206 Addr: krpc.NodeAddr{
1211 cl.eachDhtServer(func(s DhtServer) {
1217 func (cl *Client) banPeerIP(ip net.IP) {
1218 cl.logger.Printf("banning ip %v", ip)
1219 if cl.badPeerIPs == nil {
1220 cl.badPeerIPs = make(map[string]struct{})
1222 cl.badPeerIPs[ip.String()] = struct{}{}
1225 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network string) (c *PeerConn) {
1231 PeerMaxRequests: 250,
1232 writeBuffer: new(bytes.Buffer),
1233 remoteAddr: remoteAddr,
1236 c.logger = cl.logger.WithValues(c,
1237 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1238 ).WithText(func(m log.Msg) string {
1239 return fmt.Sprintf("%v: %s", c, m.Text())
1241 c.writerCond.L = cl.locker()
1242 c.setRW(connStatsReadWriter{nc, c})
1243 c.r = &rateLimitedReader{
1244 l: cl.config.DownloadRateLimiter,
1247 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1251 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1259 Addr: ipPortAddr{ip, port},
1260 Source: PeerSourceDhtAnnouncePeer,
1264 func firstNotNil(ips ...net.IP) net.IP {
1265 for _, ip := range ips {
1273 func (cl *Client) eachDialer(f func(Dialer) bool) {
1274 for _, s := range cl.dialers {
1281 func (cl *Client) eachListener(f func(Listener) bool) {
1282 for _, s := range cl.listeners {
1289 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1290 cl.eachListener(func(l Listener) bool {
1297 func (cl *Client) publicIp(peer net.IP) net.IP {
1298 // TODO: Use BEP 10 to determine how peers are seeing us.
1299 if peer.To4() != nil {
1301 cl.config.PublicIp4,
1302 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1307 cl.config.PublicIp6,
1308 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1312 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1315 func(l net.Listener) bool {
1316 return f(addrIpOrNil(l.Addr()))
1322 // Our IP as a peer should see it.
1323 func (cl *Client) publicAddr(peer net.IP) IpPort {
1324 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1327 // ListenAddrs addresses currently being listened to.
1328 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1331 cl.eachListener(func(l Listener) bool {
1332 ret = append(ret, l.Addr())
1338 func (cl *Client) onBadAccept(addr net.Addr) {
1339 ipa, ok := tryIpPortFromNetAddr(addr)
1343 ip := maskIpForAcceptLimiting(ipa.IP)
1344 if cl.acceptLimiter == nil {
1345 cl.acceptLimiter = make(map[ipStr]int)
1347 cl.acceptLimiter[ipStr(ip.String())]++
1350 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1351 if ip4 := ip.To4(); ip4 != nil {
1352 return ip4.Mask(net.CIDRMask(24, 32))
1357 func (cl *Client) clearAcceptLimits() {
1358 cl.acceptLimiter = nil
1361 func (cl *Client) acceptLimitClearer() {
1364 case <-cl.closed.LockedChan(cl.locker()):
1366 case <-time.After(15 * time.Minute):
1368 cl.clearAcceptLimits()
1374 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1375 if cl.config.DisableAcceptRateLimiting {
1378 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1381 func (cl *Client) rLock() {
1385 func (cl *Client) rUnlock() {
1389 func (cl *Client) lock() {
1393 func (cl *Client) unlock() {
1397 func (cl *Client) locker() sync.Locker {
1398 return clientLocker{cl}
1401 func (cl *Client) String() string {
1402 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1405 type clientLocker struct {
1409 func (cl clientLocker) Lock() {
1413 func (cl clientLocker) Unlock() {