19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo/bitmap"
23 "github.com/anacrolix/missinggo/perf"
24 "github.com/anacrolix/missinggo/pproffd"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/slices"
27 "github.com/anacrolix/missinggo/v2"
28 "github.com/anacrolix/missinggo/v2/conntrack"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
47 // An aggregate of stats over all connections. First in struct to ensure
48 // 64-bit alignment of fields. See #262.
53 closed missinggo.Event
59 defaultStorage *storage.Client
62 dhtServers []*dht.Server
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, &cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = log.Debug
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
162 return !m.HasValue(debugLogValue)
165 func (cl *Client) initLogger() {
166 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
187 dialRateLimiter: rate.NewLimiter(10, 10),
189 go cl.acceptLimitClearer()
197 cl.extensionBytes = defaultPeerExtensionBytes()
198 cl.event.L = cl.locker()
199 storageImpl := cfg.DefaultStorage
200 if storageImpl == nil {
201 // We'd use mmap but HFS+ doesn't support sparse files.
202 storageImpl = storage.NewFile(cfg.DataDir)
203 cl.onClose = append(cl.onClose, func() {
204 if err := storageImpl.Close(); err != nil {
205 cl.logger.Printf("error closing default storage: %s", err)
209 cl.defaultStorage = storage.NewClient(storageImpl)
210 if cfg.IPBlocklist != nil {
211 cl.ipBlockList = cfg.IPBlocklist
214 if cfg.PeerID != "" {
215 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
217 o := copy(cl.peerID[:], cfg.Bep20)
218 _, err = rand.Read(cl.peerID[o:])
220 panic("error generating peer id")
224 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
230 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
237 for _, s := range cl.conns {
238 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239 go cl.acceptConnections(s)
245 for _, s := range cl.conns {
246 if pc, ok := s.(net.PacketConn); ok {
247 ds, err := cl.newDhtServer(pc)
251 cl.dhtServers = append(cl.dhtServers, ds)
259 func (cl *Client) firewallCallback(net.Addr) bool {
261 block := !cl.wantConns()
264 torrent.Add("connections firewalled", 1)
266 torrent.Add("connections not firewalled", 1)
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272 for _, n := range allPeerNetworks {
273 if peerNetworkEnabled(n, cl.config) {
280 func (cl *Client) listenOnNetwork(n network) bool {
281 if n.Ipv4 && cl.config.DisableIPv4 {
284 if n.Ipv6 && cl.config.DisableIPv6 {
287 if n.Tcp && cl.config.DisableTCP {
290 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
296 func (cl *Client) listenNetworks() (ns []network) {
297 for _, n := range allPeerNetworks {
298 if cl.listenOnNetwork(n) {
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306 cfg := dht.ServerConfig{
307 IPBlocklist: cl.ipBlockList,
309 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310 PublicIP: func() net.IP {
311 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312 return cl.config.PublicIp6
314 return cl.config.PublicIp4
316 StartingNodes: cl.config.DhtStartingNodes,
317 ConnectionTracking: cl.config.ConnTracker,
318 OnQuery: cl.config.DHTOnQuery,
319 Logger: cl.logger.WithValues("dht", conn.LocalAddr().String()),
321 s, err = dht.NewServer(&cfg)
324 ts, err := s.Bootstrap()
326 cl.logger.Printf("error bootstrapping dht: %s", err)
328 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
334 func (cl *Client) Closed() <-chan struct{} {
340 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
341 for _, ds := range cl.dhtServers {
346 func (cl *Client) closeSockets() {
347 cl.eachListener(func(l socket) bool {
354 // Stops the client. All connections to peers are closed and all activity will
356 func (cl *Client) Close() {
360 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
362 for _, t := range cl.torrents {
365 for _, f := range cl.onClose {
371 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
372 if cl.ipBlockList == nil {
375 return cl.ipBlockList.Lookup(ip)
378 func (cl *Client) ipIsBlocked(ip net.IP) bool {
379 _, blocked := cl.ipBlockRange(ip)
383 func (cl *Client) wantConns() bool {
384 for _, t := range cl.torrents {
392 func (cl *Client) waitAccept() {
394 if cl.closed.IsSet() {
404 func (cl *Client) rejectAccepted(conn net.Conn) error {
405 ra := conn.RemoteAddr()
406 rip := missinggo.AddrIP(ra)
407 if cl.config.DisableIPv4Peers && rip.To4() != nil {
408 return errors.New("ipv4 peers disabled")
410 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411 return errors.New("ipv4 disabled")
414 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
415 return errors.New("ipv6 disabled")
417 if cl.rateLimitAccept(rip) {
418 return errors.New("source IP accepted rate limited")
420 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
421 return errors.New("bad source addr")
426 func (cl *Client) acceptConnections(l net.Listener) {
428 conn, err := l.Accept()
429 torrent.Add("client listener accepts", 1)
430 conn = pproffd.WrapNetConn(conn)
432 closed := cl.closed.IsSet()
435 reject = cl.rejectAccepted(conn)
445 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
450 torrent.Add("rejected accepted connections", 1)
451 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
454 go cl.incomingConnection(conn)
456 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
457 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
458 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
459 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
464 func (cl *Client) incomingConnection(nc net.Conn) {
466 if tc, ok := nc.(*net.TCPConn); ok {
469 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
470 c.Discovery = peerSourceIncoming
471 cl.runReceivedConn(c)
474 // Returns a handle to the given torrent, if it's present in the client.
475 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
478 t, ok = cl.torrents[ih]
482 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
483 return cl.torrents[ih]
486 type dialResult struct {
491 func countDialResult(err error) {
493 torrent.Add("successful dials", 1)
495 torrent.Add("unsuccessful dials", 1)
499 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
500 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
501 if ret < minDialTimeout {
507 // Returns whether an address is known to connect to a client with our own ID.
508 func (cl *Client) dopplegangerAddr(addr string) bool {
509 _, ok := cl.dopplegangerAddrs[addr]
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
516 t := perf.NewTimer(perf.CallerName(0))
519 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
521 t.Mark("returned conn over " + res.Network)
525 ctx, cancel := context.WithCancel(ctx)
526 // As soon as we return one connection, cancel the others.
529 resCh := make(chan dialResult, left)
533 cl.eachListener(func(s socket) bool {
535 network := s.Addr().Network()
536 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
540 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
543 cl.dialFromSocket(ctx, s, addr),
551 // Wait for a successful connection.
553 defer perf.ScopeTimer()()
554 for ; left > 0 && res.Conn == nil; left-- {
558 // There are still incompleted dials.
560 for ; left > 0; left-- {
561 conn := (<-resCh).Conn
568 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
570 //if res.Conn != nil {
571 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
573 // cl.logger.Printf("failed to dial %s", addr)
578 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
579 network := s.Addr().Network()
580 cte := cl.config.ConnTracker.Wait(
582 conntrack.Entry{network, s.Addr().String(), addr},
583 "dial torrent client",
586 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
587 // which dial errors allow us to forget the connection tracking entry handle.
588 if ctx.Err() != nil {
594 c, err := s.dial(ctx, addr)
595 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
596 // it now in case we close the connection forthwith.
597 if tc, ok := c.(*net.TCPConn); ok {
602 if err != nil && forgettableDialError(err) {
609 return closeWrapper{c, func() error {
616 func forgettableDialError(err error) bool {
617 return strings.Contains(err.Error(), "no suitable address found")
620 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
621 if _, ok := t.halfOpen[addr]; !ok {
622 panic("invariant broken")
624 delete(t.halfOpen, addr)
628 // Performs initiator handshakes and returns a connection. Returns nil
629 // *connection if no connection for valid reasons.
630 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
631 c = cl.newConnection(nc, true, remoteAddr, network)
632 c.headerEncrypted = encryptHeader
633 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
635 dl, ok := ctx.Deadline()
639 err = nc.SetDeadline(dl)
643 err = cl.initiateHandshakes(c, t)
647 // Returns nil connection and nil error if no connection could be established
648 // for valid reasons.
649 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
650 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
653 return t.dialTimeout()
656 dr := cl.dialFirst(dialCtx, addr.String())
659 if dialCtx.Err() != nil {
660 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
662 return nil, errors.New("dial failed")
664 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
671 // Returns nil connection and nil error if no connection could be established
672 // for valid reasons.
673 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
674 torrent.Add("establish outgoing connection", 1)
675 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
676 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
678 torrent.Add("initiated conn with preferred header obfuscation", 1)
681 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
682 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
683 // We should have just tried with the preferred header obfuscation. If it was required,
684 // there's nothing else to try.
687 // Try again with encryption if we didn't earlier, or without if we did.
688 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
690 torrent.Add("initiated conn with fallback header obfuscation", 1)
692 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
696 // Called to dial out and run a connection. The addr we're given is already
697 // considered half-open.
698 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
699 cl.dialRateLimiter.Wait(context.Background())
700 c, err := cl.establishOutgoingConn(t, addr)
703 // Don't release lock between here and addConnection, unless it's for
705 cl.noLongerHalfOpen(t, addr.String())
708 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
715 cl.runHandshookConn(c, t)
718 // The port number for incoming peer connections. 0 if the client isn't
720 func (cl *Client) incomingPeerPort() int {
721 return cl.LocalPort()
724 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
725 if c.headerEncrypted {
728 rw, c.cryptoMethod, err = mse.InitiateHandshake(
735 cl.config.CryptoProvides,
739 return xerrors.Errorf("header obfuscation handshake: %w", err)
742 ih, err := cl.connBtHandshake(c, &t.infoHash)
744 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
746 if ih != t.infoHash {
747 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
752 // Calls f with any secret keys.
753 func (cl *Client) forSkeys(f func([]byte) bool) {
756 if false { // Emulate the bug from #114
758 for ih := range cl.torrents {
762 for range cl.torrents {
769 for ih := range cl.torrents {
776 // Do encryption and bittorrent handshakes as receiver.
777 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
778 defer perf.ScopeTimerErr(&err)()
780 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
782 if err == nil || err == mse.ErrNoSecretKeyMatch {
783 if c.headerEncrypted {
784 torrent.Add("handshakes received encrypted", 1)
786 torrent.Add("handshakes received unencrypted", 1)
789 torrent.Add("handshakes received with error while handling encryption", 1)
792 if err == mse.ErrNoSecretKeyMatch {
797 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
798 err = errors.New("connection not have required header obfuscation")
801 ih, err := cl.connBtHandshake(c, nil)
803 err = xerrors.Errorf("during bt handshake: %w", err)
812 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
813 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
818 c.PeerExtensionBytes = res.PeerExtensionBits
819 c.PeerID = res.PeerID
820 c.completedHandshake = time.Now()
824 func (cl *Client) runReceivedConn(c *connection) {
825 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
829 t, err := cl.receiveHandshakes(c)
832 "error receiving handshakes: %s", err,
836 "network", c.network,
838 torrent.Add("error receiving handshake", 1)
840 cl.onBadAccept(c.remoteAddr)
845 torrent.Add("received handshake for unloaded torrent", 1)
846 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
848 cl.onBadAccept(c.remoteAddr)
852 torrent.Add("received handshake for loaded torrent", 1)
855 cl.runHandshookConn(c, t)
858 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
860 if c.PeerID == cl.peerID {
863 addr := c.conn.RemoteAddr().String()
864 cl.dopplegangerAddrs[addr] = struct{}{}
866 // Because the remote address is not necessarily the same as its
867 // client's torrent listen address, we won't record the remote address
868 // as a doppleganger. Instead, the initiator can record *us* as the
873 c.conn.SetWriteDeadline(time.Time{})
874 c.r = deadlineReader{c.conn, c.r}
875 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
876 if connIsIpv6(c.conn) {
877 torrent.Add("completed handshake over ipv6", 1)
879 if err := t.addConnection(c); err != nil {
880 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
883 defer t.dropConnection(c)
884 go c.writer(time.Minute)
885 cl.sendInitialMessages(c, t)
886 err := c.mainReadLoop()
887 if err != nil && cl.config.Debug {
888 cl.logger.Printf("error during connection main read loop: %s", err)
892 // See the order given in Transmission's tr_peerMsgsNew.
893 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
894 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
895 conn.Post(pp.Message{
897 ExtendedID: pp.HandshakeExtendedID,
898 ExtendedPayload: func() []byte {
899 msg := pp.ExtendedHandshakeMessage{
900 M: map[pp.ExtensionName]pp.ExtensionNumber{
901 pp.ExtensionNameMetadata: metadataExtendedId,
903 V: cl.config.ExtendedHandshakeClientVersion,
904 Reqq: 64, // TODO: Really?
905 YourIp: pp.CompactIp(conn.remoteAddr.IP),
906 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
907 Port: cl.incomingPeerPort(),
908 MetadataSize: torrent.metadataSize(),
909 // TODO: We can figured these out specific to the socket
911 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
912 Ipv6: cl.config.PublicIp6.To16(),
914 if !cl.config.DisablePEX {
915 msg.M[pp.ExtensionNamePex] = pexExtendedId
917 return bencode.MustMarshal(msg)
922 if conn.fastEnabled() {
923 if torrent.haveAllPieces() {
924 conn.Post(pp.Message{Type: pp.HaveAll})
925 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
927 } else if !torrent.haveAnyPieces() {
928 conn.Post(pp.Message{Type: pp.HaveNone})
929 conn.sentHaves.Clear()
935 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
936 conn.Post(pp.Message{
943 func (cl *Client) dhtPort() (ret uint16) {
944 cl.eachDhtServer(func(s *dht.Server) {
945 ret = uint16(missinggo.AddrPort(s.Addr()))
950 func (cl *Client) haveDhtServer() (ret bool) {
951 cl.eachDhtServer(func(_ *dht.Server) {
957 // Process incoming ut_metadata message.
958 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
960 err := bencode.Unmarshal(payload, &d)
961 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
962 } else if err != nil {
963 return fmt.Errorf("error unmarshalling bencode: %s", err)
965 msgType, ok := d["msg_type"]
967 return errors.New("missing msg_type field")
971 case pp.DataMetadataExtensionMsgType:
972 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
973 if !c.requestedMetadataPiece(piece) {
974 return fmt.Errorf("got unexpected piece %d", piece)
976 c.metadataRequests[piece] = false
977 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
978 if begin < 0 || begin >= len(payload) {
979 return fmt.Errorf("data has bad offset in payload: %d", begin)
981 t.saveMetadataPiece(piece, payload[begin:])
982 c.lastUsefulChunkReceived = time.Now()
983 return t.maybeCompleteMetadata()
984 case pp.RequestMetadataExtensionMsgType:
985 if !t.haveMetadataPiece(piece) {
986 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
989 start := (1 << 14) * piece
990 c.logger.Printf("sending metadata piece %d", piece)
991 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
993 case pp.RejectMetadataExtensionMsgType:
996 return errors.New("unknown msg_type value")
1000 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1004 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1007 if _, ok := cl.ipBlockRange(ip); ok {
1010 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1016 // Return a Torrent ready for insertion into a Client.
1017 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1018 // use provided storage, if provided
1019 storageClient := cl.defaultStorage
1020 if specStorage != nil {
1021 storageClient = storage.NewClient(specStorage)
1027 peers: prioritizedPeers{
1029 getPrio: func(p Peer) peerPriority {
1030 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1033 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1035 halfOpen: make(map[string]Peer),
1036 pieceStateChanges: pubsub.NewPubSub(),
1038 storageOpener: storageClient,
1039 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1041 networkingEnabled: true,
1042 requestStrategy: cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()),
1043 metadataChanged: sync.Cond{
1047 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1048 return fmt.Sprintf("%v: %s", t, m.Text())
1050 t.setChunkSize(defaultChunkSize)
1054 // A file-like handle to some torrent data resource.
1055 type Handle interface {
1062 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1063 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1066 // Adds a torrent by InfoHash with a custom Storage implementation.
1067 // If the torrent already exists then this Storage is ignored and the
1068 // existing torrent returned with `new` set to `false`
1069 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1072 t, ok := cl.torrents[infoHash]
1078 t = cl.newTorrent(infoHash, specStorage)
1079 cl.eachDhtServer(func(s *dht.Server) {
1080 go t.dhtAnnouncer(s)
1082 cl.torrents[infoHash] = t
1083 cl.clearAcceptLimits()
1084 t.updateWantPeersEvent()
1085 // Tickle Client.waitAccept, new torrent may want conns.
1086 cl.event.Broadcast()
1090 // Add or merge a torrent spec. If the torrent is already present, the
1091 // trackers will be merged with the existing ones. If the Info isn't yet
1092 // known, it will be set. The display name is replaced if the new spec
1093 // provides one. Returns new if the torrent wasn't already in the client.
1094 // Note that any `Storage` defined on the spec will be ignored if the
1095 // torrent is already present (i.e. `new` return value is `true`)
1096 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1097 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1098 if spec.DisplayName != "" {
1099 t.SetDisplayName(spec.DisplayName)
1101 if spec.InfoBytes != nil {
1102 err = t.SetInfoBytes(spec.InfoBytes)
1109 if spec.ChunkSize != 0 {
1110 t.setChunkSize(pp.Integer(spec.ChunkSize))
1112 t.addTrackers(spec.Trackers)
1117 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1118 t, ok := cl.torrents[infoHash]
1120 err = fmt.Errorf("no such torrent")
1127 delete(cl.torrents, infoHash)
1131 func (cl *Client) allTorrentsCompleted() bool {
1132 for _, t := range cl.torrents {
1136 if !t.haveAllPieces() {
1143 // Returns true when all torrents are completely downloaded and false if the
1144 // client is stopped before that.
1145 func (cl *Client) WaitAll() bool {
1148 for !cl.allTorrentsCompleted() {
1149 if cl.closed.IsSet() {
1157 // Returns handles to all the torrents loaded in the Client.
1158 func (cl *Client) Torrents() []*Torrent {
1161 return cl.torrentsAsSlice()
1164 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1165 for _, t := range cl.torrents {
1166 ret = append(ret, t)
1171 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1172 spec, err := TorrentSpecFromMagnetURI(uri)
1176 T, _, err = cl.AddTorrentSpec(spec)
1180 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1181 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1183 slices.MakeInto(&ss, mi.Nodes)
1188 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1189 mi, err := metainfo.LoadFromFile(filename)
1193 return cl.AddTorrent(mi)
1196 func (cl *Client) DhtServers() []*dht.Server {
1197 return cl.dhtServers
1200 func (cl *Client) AddDHTNodes(nodes []string) {
1201 for _, n := range nodes {
1202 hmp := missinggo.SplitHostMaybePort(n)
1203 ip := net.ParseIP(hmp.Host)
1205 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1208 ni := krpc.NodeInfo{
1209 Addr: krpc.NodeAddr{
1214 cl.eachDhtServer(func(s *dht.Server) {
1220 func (cl *Client) banPeerIP(ip net.IP) {
1221 cl.logger.Printf("banning ip %v", ip)
1222 if cl.badPeerIPs == nil {
1223 cl.badPeerIPs = make(map[string]struct{})
1225 cl.badPeerIPs[ip.String()] = struct{}{}
1228 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1234 PeerMaxRequests: 250,
1235 writeBuffer: new(bytes.Buffer),
1236 remoteAddr: remoteAddr,
1239 c.logger = cl.logger.WithValues(c,
1240 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1241 ).WithText(func(m log.Msg) string {
1242 return fmt.Sprintf("%v: %s", c, m.Text())
1244 c.writerCond.L = cl.locker()
1245 c.setRW(connStatsReadWriter{nc, c})
1246 c.r = &rateLimitedReader{
1247 l: cl.config.DownloadRateLimiter,
1250 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1254 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1264 Source: peerSourceDhtAnnouncePeer,
1268 func firstNotNil(ips ...net.IP) net.IP {
1269 for _, ip := range ips {
1277 func (cl *Client) eachListener(f func(socket) bool) {
1278 for _, s := range cl.conns {
1285 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1286 cl.eachListener(func(l socket) bool {
1293 func (cl *Client) publicIp(peer net.IP) net.IP {
1294 // TODO: Use BEP 10 to determine how peers are seeing us.
1295 if peer.To4() != nil {
1297 cl.config.PublicIp4,
1298 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1303 cl.config.PublicIp6,
1304 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1308 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1309 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1310 return f(missinggo.AddrIP(l.Addr()))
1314 // Our IP as a peer should see it.
1315 func (cl *Client) publicAddr(peer net.IP) IpPort {
1316 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1319 // ListenAddrs addresses currently being listened to.
1320 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1323 cl.eachListener(func(l socket) bool {
1324 ret = append(ret, l.Addr())
1330 func (cl *Client) onBadAccept(addr IpPort) {
1331 ip := maskIpForAcceptLimiting(addr.IP)
1332 if cl.acceptLimiter == nil {
1333 cl.acceptLimiter = make(map[ipStr]int)
1335 cl.acceptLimiter[ipStr(ip.String())]++
1338 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1339 if ip4 := ip.To4(); ip4 != nil {
1340 return ip4.Mask(net.CIDRMask(24, 32))
1345 func (cl *Client) clearAcceptLimits() {
1346 cl.acceptLimiter = nil
1349 func (cl *Client) acceptLimitClearer() {
1352 case <-cl.closed.LockedChan(cl.locker()):
1354 case <-time.After(15 * time.Minute):
1356 cl.clearAcceptLimits()
1362 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1363 if cl.config.DisableAcceptRateLimiting {
1366 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1369 func (cl *Client) rLock() {
1373 func (cl *Client) rUnlock() {
1377 func (cl *Client) lock() {
1381 func (cl *Client) unlock() {
1385 func (cl *Client) locker() sync.Locker {
1386 return clientLocker{cl}
1389 func (cl *Client) String() string {
1390 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1393 type clientLocker struct {
1397 func (cl clientLocker) Lock() {
1401 func (cl clientLocker) Unlock() {