17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/davecgh/go-spew/spew"
27 "github.com/dustin/go-humanize"
28 "github.com/google/btree"
29 "golang.org/x/time/rate"
30 "golang.org/x/xerrors"
32 "github.com/anacrolix/missinggo/v2"
33 "github.com/anacrolix/missinggo/v2/conntrack"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/iplist"
37 "github.com/anacrolix/torrent/metainfo"
38 "github.com/anacrolix/torrent/mse"
39 pp "github.com/anacrolix/torrent/peer_protocol"
40 "github.com/anacrolix/torrent/storage"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
62 dhtServers []DhtServer
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 // Returns the port number for the first listener that has one. No longer assumes that all port
95 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l Listener) bool {
99 port = addrPortOrZero(l.Addr())
105 func writeDhtServerStatus(w io.Writer, s DhtServer) {
106 dhtStats := s.Stats()
107 fmt.Fprintf(w, " ID: %x\n", s.ID())
108 spew.Fdump(w, dhtStats)
111 // Writes out a human readable status of the client, such as for writing to a
113 func (cl *Client) WriteStatus(_w io.Writer) {
116 w := bufio.NewWriter(_w)
118 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
119 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
120 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
121 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
122 cl.eachDhtServer(func(s DhtServer) {
123 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
124 writeDhtServerStatus(w, s)
126 spew.Fdump(w, &cl.stats)
127 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
129 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
130 return l.InfoHash().AsString() < r.InfoHash().AsString()
133 fmt.Fprint(w, "<unknown name>")
135 fmt.Fprint(w, t.name())
139 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
141 w.WriteString("<missing metainfo>")
149 const debugLogValue = log.Debug
151 func (cl *Client) debugLogFilter(m log.Msg) bool {
155 return !m.HasValue(debugLogValue)
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
162 func (cl *Client) announceKey() int32 {
163 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
166 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
168 cfg = NewDefaultClientConfig()
178 dopplegangerAddrs: make(map[string]struct{}),
179 torrents: make(map[metainfo.Hash]*Torrent),
180 dialRateLimiter: rate.NewLimiter(10, 10),
182 go cl.acceptLimitClearer()
190 cl.extensionBytes = defaultPeerExtensionBytes()
191 cl.event.L = cl.locker()
192 storageImpl := cfg.DefaultStorage
193 if storageImpl == nil {
194 // We'd use mmap by default but HFS+ doesn't support sparse files.
195 storageImplCloser := storage.NewFile(cfg.DataDir)
196 cl.onClose = append(cl.onClose, func() {
197 if err := storageImplCloser.Close(); err != nil {
198 cl.logger.Printf("error closing default storage: %s", err)
201 storageImpl = storageImplCloser
203 cl.defaultStorage = storage.NewClient(storageImpl)
204 if cfg.IPBlocklist != nil {
205 cl.ipBlockList = cfg.IPBlocklist
208 if cfg.PeerID != "" {
209 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
211 o := copy(cl.peerID[:], cfg.Bep20)
212 _, err = rand.Read(cl.peerID[o:])
214 panic("error generating peer id")
218 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
226 for _, _s := range sockets {
227 s := _s // Go is fucking retarded.
228 cl.onClose = append(cl.onClose, func() { s.Close() })
229 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
230 cl.dialers = append(cl.dialers, s)
231 cl.listeners = append(cl.listeners, s)
232 go cl.acceptConnections(s)
238 for _, s := range sockets {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newAnacrolixDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
245 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 func (cl *Client) AddDhtServer(d DhtServer) {
254 cl.dhtServers = append(cl.dhtServers, d)
257 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
258 // given address for any Torrent.
259 func (cl *Client) AddDialer(d Dialer) {
262 cl.dialers = append(cl.dialers, d)
263 for _, t := range cl.torrents {
268 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
270 func (cl *Client) AddListener(l Listener) {
271 cl.listeners = append(cl.listeners, l)
272 go cl.acceptConnections(l)
275 func (cl *Client) firewallCallback(net.Addr) bool {
277 block := !cl.wantConns()
280 torrent.Add("connections firewalled", 1)
282 torrent.Add("connections not firewalled", 1)
287 func (cl *Client) listenOnNetwork(n network) bool {
288 if n.Ipv4 && cl.config.DisableIPv4 {
291 if n.Ipv6 && cl.config.DisableIPv6 {
294 if n.Tcp && cl.config.DisableTCP {
297 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
303 func (cl *Client) listenNetworks() (ns []network) {
304 for _, n := range allPeerNetworks {
305 if cl.listenOnNetwork(n) {
312 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
313 cfg := dht.ServerConfig{
314 IPBlocklist: cl.ipBlockList,
316 OnAnnouncePeer: cl.onDHTAnnouncePeer,
317 PublicIP: func() net.IP {
318 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
319 return cl.config.PublicIp6
321 return cl.config.PublicIp4
323 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
324 ConnectionTracking: cl.config.ConnTracker,
325 OnQuery: cl.config.DHTOnQuery,
326 Logger: cl.logger.WithText(func(m log.Msg) string {
327 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
330 s, err = dht.NewServer(&cfg)
333 ts, err := s.Bootstrap()
335 cl.logger.Printf("error bootstrapping dht: %s", err)
337 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
343 func (cl *Client) Closed() <-chan struct{} {
349 func (cl *Client) eachDhtServer(f func(DhtServer)) {
350 for _, ds := range cl.dhtServers {
355 // Stops the client. All connections to peers are closed and all activity will
357 func (cl *Client) Close() {
361 for _, t := range cl.torrents {
364 for i := range cl.onClose {
365 cl.onClose[len(cl.onClose)-1-i]()
370 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
371 if cl.ipBlockList == nil {
374 return cl.ipBlockList.Lookup(ip)
377 func (cl *Client) ipIsBlocked(ip net.IP) bool {
378 _, blocked := cl.ipBlockRange(ip)
382 func (cl *Client) wantConns() bool {
383 for _, t := range cl.torrents {
391 func (cl *Client) waitAccept() {
393 if cl.closed.IsSet() {
403 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
404 func (cl *Client) rejectAccepted(conn net.Conn) error {
405 ra := conn.RemoteAddr()
406 if rip := addrIpOrNil(ra); rip != nil {
407 if cl.config.DisableIPv4Peers && rip.To4() != nil {
408 return errors.New("ipv4 peers disabled")
410 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411 return errors.New("ipv4 disabled")
414 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
415 return errors.New("ipv6 disabled")
417 if cl.rateLimitAccept(rip) {
418 return errors.New("source IP accepted rate limited")
420 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
421 return errors.New("bad source addr")
427 func (cl *Client) acceptConnections(l net.Listener) {
429 conn, err := l.Accept()
430 torrent.Add("client listener accepts", 1)
431 conn = pproffd.WrapNetConn(conn)
433 closed := cl.closed.IsSet()
436 reject = cl.rejectAccepted(conn)
446 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
451 torrent.Add("rejected accepted connections", 1)
452 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
455 go cl.incomingConnection(conn)
457 log.Fmsg("accepted %q connection at %q from %q",
461 ).AddValue(debugLogValue).Log(cl.logger)
462 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
463 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
464 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
469 func regularConnString(nc net.Conn) string {
470 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
473 func (cl *Client) incomingConnection(nc net.Conn) {
475 if tc, ok := nc.(*net.TCPConn); ok {
478 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
479 regularConnString(nc))
480 c.Discovery = PeerSourceIncoming
481 cl.runReceivedConn(c)
484 // Returns a handle to the given torrent, if it's present in the client.
485 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
488 t, ok = cl.torrents[ih]
492 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
493 return cl.torrents[ih]
496 type dialResult struct {
501 func countDialResult(err error) {
503 torrent.Add("successful dials", 1)
505 torrent.Add("unsuccessful dials", 1)
509 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
510 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
511 if ret < minDialTimeout {
517 // Returns whether an address is known to connect to a client with our own ID.
518 func (cl *Client) dopplegangerAddr(addr string) bool {
519 _, ok := cl.dopplegangerAddrs[addr]
523 // Returns a connection over UTP or TCP, whichever is first to connect.
524 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
526 t := perf.NewTimer(perf.CallerName(0))
529 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
531 t.Mark("returned conn over " + res.Network)
535 ctx, cancel := context.WithCancel(ctx)
536 // As soon as we return one connection, cancel the others.
539 resCh := make(chan dialResult, left)
543 cl.eachDialer(func(s Dialer) bool {
546 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
549 cl.dialFromSocket(ctx, s, addr),
550 s.LocalAddr().Network(),
557 // Wait for a successful connection.
559 defer perf.ScopeTimer()()
560 for ; left > 0 && res.Conn == nil; left-- {
564 // There are still incompleted dials.
566 for ; left > 0; left-- {
567 conn := (<-resCh).Conn
574 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
576 //if res.Conn != nil {
577 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
579 // cl.logger.Printf("failed to dial %s", addr)
584 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
585 network := s.LocalAddr().Network()
586 cte := cl.config.ConnTracker.Wait(
588 conntrack.Entry{network, s.LocalAddr().String(), addr},
589 "dial torrent client",
592 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
593 // which dial errors allow us to forget the connection tracking entry handle.
594 if ctx.Err() != nil {
600 c, err := s.Dial(ctx, addr)
601 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
602 // it now in case we close the connection forthwith.
603 if tc, ok := c.(*net.TCPConn); ok {
608 if err != nil && forgettableDialError(err) {
615 return closeWrapper{c, func() error {
622 func forgettableDialError(err error) bool {
623 return strings.Contains(err.Error(), "no suitable address found")
626 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
627 if _, ok := t.halfOpen[addr]; !ok {
628 panic("invariant broken")
630 delete(t.halfOpen, addr)
634 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
635 // for valid reasons.
636 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr,
637 network, connString string,
638 ) (c *PeerConn, err error) {
639 c = cl.newConnection(nc, true, remoteAddr, network, connString)
640 c.headerEncrypted = encryptHeader
641 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
643 dl, ok := ctx.Deadline()
647 err = nc.SetDeadline(dl)
651 err = cl.initiateHandshakes(c, t)
655 // Returns nil connection and nil error if no connection could be established for valid reasons.
656 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
657 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
660 return t.dialTimeout()
663 dr := cl.dialFirst(dialCtx, addr.String())
666 if dialCtx.Err() != nil {
667 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
669 return nil, errors.New("dial failed")
671 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
678 // Returns nil connection and nil error if no connection could be established
679 // for valid reasons.
680 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
681 torrent.Add("establish outgoing connection", 1)
682 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
683 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
685 torrent.Add("initiated conn with preferred header obfuscation", 1)
688 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
689 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
690 // We should have just tried with the preferred header obfuscation. If it was required,
691 // there's nothing else to try.
694 // Try again with encryption if we didn't earlier, or without if we did.
695 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
697 torrent.Add("initiated conn with fallback header obfuscation", 1)
699 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
703 // Called to dial out and run a connection. The addr we're given is already
704 // considered half-open.
705 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
706 cl.dialRateLimiter.Wait(context.Background())
707 c, err := cl.establishOutgoingConn(t, addr)
710 // Don't release lock between here and addConnection, unless it's for
712 cl.noLongerHalfOpen(t, addr.String())
715 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
722 cl.runHandshookConn(c, t)
725 // The port number for incoming peer connections. 0 if the client isn't listening.
726 func (cl *Client) incomingPeerPort() int {
727 return cl.LocalPort()
730 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
731 if c.headerEncrypted {
734 rw, c.cryptoMethod, err = mse.InitiateHandshake(
741 cl.config.CryptoProvides,
745 return xerrors.Errorf("header obfuscation handshake: %w", err)
748 ih, err := cl.connBtHandshake(c, &t.infoHash)
750 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
752 if ih != t.infoHash {
753 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
758 // Calls f with any secret keys.
759 func (cl *Client) forSkeys(f func([]byte) bool) {
762 if false { // Emulate the bug from #114
764 for ih := range cl.torrents {
768 for range cl.torrents {
775 for ih := range cl.torrents {
782 // Do encryption and bittorrent handshakes as receiver.
783 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
784 defer perf.ScopeTimerErr(&err)()
786 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
788 if err == nil || err == mse.ErrNoSecretKeyMatch {
789 if c.headerEncrypted {
790 torrent.Add("handshakes received encrypted", 1)
792 torrent.Add("handshakes received unencrypted", 1)
795 torrent.Add("handshakes received with error while handling encryption", 1)
798 if err == mse.ErrNoSecretKeyMatch {
803 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
804 err = errors.New("connection not have required header obfuscation")
807 ih, err := cl.connBtHandshake(c, nil)
809 err = xerrors.Errorf("during bt handshake: %w", err)
818 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
819 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
824 c.PeerExtensionBytes = res.PeerExtensionBits
825 c.PeerID = res.PeerID
826 c.completedHandshake = time.Now()
830 func (cl *Client) runReceivedConn(c *PeerConn) {
831 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
835 t, err := cl.receiveHandshakes(c)
838 "error receiving handshakes on %v: %s", c, err,
842 "network", c.network,
844 torrent.Add("error receiving handshake", 1)
846 cl.onBadAccept(c.remoteAddr)
851 torrent.Add("received handshake for unloaded torrent", 1)
852 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
854 cl.onBadAccept(c.remoteAddr)
858 torrent.Add("received handshake for loaded torrent", 1)
861 cl.runHandshookConn(c, t)
864 // Client lock must be held before entering this.
865 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
867 if c.PeerID == cl.peerID {
870 addr := c.conn.RemoteAddr().String()
871 cl.dopplegangerAddrs[addr] = struct{}{}
873 // Because the remote address is not necessarily the same as its client's torrent listen
874 // address, we won't record the remote address as a doppleganger. Instead, the initiator
875 // can record *us* as the doppleganger.
879 c.conn.SetWriteDeadline(time.Time{})
880 c.r = deadlineReader{c.conn, c.r}
881 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
882 if connIsIpv6(c.conn) {
883 torrent.Add("completed handshake over ipv6", 1)
885 if err := t.addConnection(c); err != nil {
886 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
889 defer t.dropConnection(c)
890 go c.writer(time.Minute)
891 cl.sendInitialMessages(c, t)
892 err := c.mainReadLoop()
893 if err != nil && cl.config.Debug {
894 cl.logger.Printf("error during connection main read loop: %s", err)
898 // See the order given in Transmission's tr_peerMsgsNew.
899 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
900 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
901 conn.post(pp.Message{
903 ExtendedID: pp.HandshakeExtendedID,
904 ExtendedPayload: func() []byte {
905 msg := pp.ExtendedHandshakeMessage{
906 M: map[pp.ExtensionName]pp.ExtensionNumber{
907 pp.ExtensionNameMetadata: metadataExtendedId,
909 V: cl.config.ExtendedHandshakeClientVersion,
910 Reqq: 64, // TODO: Really?
911 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
912 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
913 Port: cl.incomingPeerPort(),
914 MetadataSize: torrent.metadataSize(),
915 // TODO: We can figured these out specific to the socket
917 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
918 Ipv6: cl.config.PublicIp6.To16(),
920 if !cl.config.DisablePEX {
921 msg.M[pp.ExtensionNamePex] = pexExtendedId
923 return bencode.MustMarshal(msg)
928 if conn.fastEnabled() {
929 if torrent.haveAllPieces() {
930 conn.post(pp.Message{Type: pp.HaveAll})
931 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
933 } else if !torrent.haveAnyPieces() {
934 conn.post(pp.Message{Type: pp.HaveNone})
935 conn.sentHaves.Clear()
941 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
942 conn.post(pp.Message{
949 func (cl *Client) dhtPort() (ret uint16) {
950 cl.eachDhtServer(func(s DhtServer) {
951 ret = uint16(missinggo.AddrPort(s.Addr()))
956 func (cl *Client) haveDhtServer() (ret bool) {
957 cl.eachDhtServer(func(_ DhtServer) {
963 // Process incoming ut_metadata message.
964 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
966 err := bencode.Unmarshal(payload, &d)
967 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
968 } else if err != nil {
969 return fmt.Errorf("error unmarshalling bencode: %s", err)
971 msgType, ok := d["msg_type"]
973 return errors.New("missing msg_type field")
977 case pp.DataMetadataExtensionMsgType:
978 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
979 if !c.requestedMetadataPiece(piece) {
980 return fmt.Errorf("got unexpected piece %d", piece)
982 c.metadataRequests[piece] = false
983 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
984 if begin < 0 || begin >= len(payload) {
985 return fmt.Errorf("data has bad offset in payload: %d", begin)
987 t.saveMetadataPiece(piece, payload[begin:])
988 c.lastUsefulChunkReceived = time.Now()
989 return t.maybeCompleteMetadata()
990 case pp.RequestMetadataExtensionMsgType:
991 if !t.haveMetadataPiece(piece) {
992 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
995 start := (1 << 14) * piece
996 c.logger.Printf("sending metadata piece %d", piece)
997 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
999 case pp.RejectMetadataExtensionMsgType:
1002 return errors.New("unknown msg_type value")
1006 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1007 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1008 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1013 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1017 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1020 if _, ok := cl.ipBlockRange(ip); ok {
1023 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1029 // Return a Torrent ready for insertion into a Client.
1030 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1031 // use provided storage, if provided
1032 storageClient := cl.defaultStorage
1033 if specStorage != nil {
1034 storageClient = storage.NewClient(specStorage)
1040 peers: prioritizedPeers{
1042 getPrio: func(p Peer) peerPriority {
1043 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1046 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1048 halfOpen: make(map[string]Peer),
1049 pieceStateChanges: pubsub.NewPubSub(),
1051 storageOpener: storageClient,
1052 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1054 networkingEnabled: true,
1055 metadataChanged: sync.Cond{
1059 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1060 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1061 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1062 return fmt.Sprintf("%v: %s", t, m.Text())
1064 t.setChunkSize(defaultChunkSize)
1068 // A file-like handle to some torrent data resource.
1069 type Handle interface {
1076 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1077 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1080 // Adds a torrent by InfoHash with a custom Storage implementation.
1081 // If the torrent already exists then this Storage is ignored and the
1082 // existing torrent returned with `new` set to `false`
1083 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1086 t, ok := cl.torrents[infoHash]
1092 t = cl.newTorrent(infoHash, specStorage)
1093 cl.eachDhtServer(func(s DhtServer) {
1094 go t.dhtAnnouncer(s)
1096 cl.torrents[infoHash] = t
1097 cl.clearAcceptLimits()
1098 t.updateWantPeersEvent()
1099 // Tickle Client.waitAccept, new torrent may want conns.
1100 cl.event.Broadcast()
1104 // Add or merge a torrent spec. If the torrent is already present, the
1105 // trackers will be merged with the existing ones. If the Info isn't yet
1106 // known, it will be set. The display name is replaced if the new spec
1107 // provides one. Returns new if the torrent wasn't already in the client.
1108 // Note that any `Storage` defined on the spec will be ignored if the
1109 // torrent is already present (i.e. `new` return value is `true`)
1110 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1111 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1112 if spec.DisplayName != "" {
1113 t.SetDisplayName(spec.DisplayName)
1115 if spec.InfoBytes != nil {
1116 err = t.SetInfoBytes(spec.InfoBytes)
1123 if spec.ChunkSize != 0 {
1124 t.setChunkSize(pp.Integer(spec.ChunkSize))
1126 t.addTrackers(spec.Trackers)
1131 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1132 t, ok := cl.torrents[infoHash]
1134 err = fmt.Errorf("no such torrent")
1141 delete(cl.torrents, infoHash)
1145 func (cl *Client) allTorrentsCompleted() bool {
1146 for _, t := range cl.torrents {
1150 if !t.haveAllPieces() {
1157 // Returns true when all torrents are completely downloaded and false if the
1158 // client is stopped before that.
1159 func (cl *Client) WaitAll() bool {
1162 for !cl.allTorrentsCompleted() {
1163 if cl.closed.IsSet() {
1171 // Returns handles to all the torrents loaded in the Client.
1172 func (cl *Client) Torrents() []*Torrent {
1175 return cl.torrentsAsSlice()
1178 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1179 for _, t := range cl.torrents {
1180 ret = append(ret, t)
1185 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1186 spec, err := TorrentSpecFromMagnetURI(uri)
1190 T, _, err = cl.AddTorrentSpec(spec)
1194 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1195 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1197 slices.MakeInto(&ss, mi.Nodes)
1202 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1203 mi, err := metainfo.LoadFromFile(filename)
1207 return cl.AddTorrent(mi)
1210 func (cl *Client) DhtServers() []DhtServer {
1211 return cl.dhtServers
1214 func (cl *Client) AddDHTNodes(nodes []string) {
1215 for _, n := range nodes {
1216 hmp := missinggo.SplitHostMaybePort(n)
1217 ip := net.ParseIP(hmp.Host)
1219 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1222 ni := krpc.NodeInfo{
1223 Addr: krpc.NodeAddr{
1228 cl.eachDhtServer(func(s DhtServer) {
1234 func (cl *Client) banPeerIP(ip net.IP) {
1235 cl.logger.Printf("banning ip %v", ip)
1236 if cl.badPeerIPs == nil {
1237 cl.badPeerIPs = make(map[string]struct{})
1239 cl.badPeerIPs[ip.String()] = struct{}{}
1242 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1248 PeerMaxRequests: 250,
1249 writeBuffer: new(bytes.Buffer),
1250 remoteAddr: remoteAddr,
1252 connString: connString,
1254 c.logger = cl.logger.WithValues(c,
1255 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1256 ).WithText(func(m log.Msg) string {
1257 return fmt.Sprintf("%v: %s", c, m.Text())
1259 c.writerCond.L = cl.locker()
1260 c.setRW(connStatsReadWriter{nc, c})
1261 c.r = &rateLimitedReader{
1262 l: cl.config.DownloadRateLimiter,
1265 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1269 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1277 Addr: ipPortAddr{ip, port},
1278 Source: PeerSourceDhtAnnouncePeer,
1282 func firstNotNil(ips ...net.IP) net.IP {
1283 for _, ip := range ips {
1291 func (cl *Client) eachDialer(f func(Dialer) bool) {
1292 for _, s := range cl.dialers {
1299 func (cl *Client) eachListener(f func(Listener) bool) {
1300 for _, s := range cl.listeners {
1307 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1308 cl.eachListener(func(l Listener) bool {
1315 func (cl *Client) publicIp(peer net.IP) net.IP {
1316 // TODO: Use BEP 10 to determine how peers are seeing us.
1317 if peer.To4() != nil {
1319 cl.config.PublicIp4,
1320 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1325 cl.config.PublicIp6,
1326 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1330 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1331 l := cl.findListener(
1332 func(l net.Listener) bool {
1333 return f(addrIpOrNil(l.Addr()))
1339 return addrIpOrNil(l.Addr())
1342 // Our IP as a peer should see it.
1343 func (cl *Client) publicAddr(peer net.IP) IpPort {
1344 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1347 // ListenAddrs addresses currently being listened to.
1348 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1351 cl.eachListener(func(l Listener) bool {
1352 ret = append(ret, l.Addr())
1358 func (cl *Client) onBadAccept(addr net.Addr) {
1359 ipa, ok := tryIpPortFromNetAddr(addr)
1363 ip := maskIpForAcceptLimiting(ipa.IP)
1364 if cl.acceptLimiter == nil {
1365 cl.acceptLimiter = make(map[ipStr]int)
1367 cl.acceptLimiter[ipStr(ip.String())]++
1370 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1371 if ip4 := ip.To4(); ip4 != nil {
1372 return ip4.Mask(net.CIDRMask(24, 32))
1377 func (cl *Client) clearAcceptLimits() {
1378 cl.acceptLimiter = nil
1381 func (cl *Client) acceptLimitClearer() {
1384 case <-cl.closed.LockedChan(cl.locker()):
1386 case <-time.After(15 * time.Minute):
1388 cl.clearAcceptLimits()
1394 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1395 if cl.config.DisableAcceptRateLimiting {
1398 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1401 func (cl *Client) rLock() {
1405 func (cl *Client) rUnlock() {
1409 func (cl *Client) lock() {
1413 func (cl *Client) unlock() {
1417 func (cl *Client) locker() *lockWithDeferreds {
1421 func (cl *Client) String() string {
1422 return fmt.Sprintf("<%[1]T %[1]p>", cl)