18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
88 func (cl *Client) BadPeerIPs() []string {
91 return cl.badPeerIPsLocked()
94 func (cl *Client) badPeerIPsLocked() []string {
95 return slices.FromMapKeys(cl.badPeerIPs).([]string)
98 func (cl *Client) PeerID() PeerID {
102 // Returns the port number for the first listener that has one. No longer assumes that all port
103 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
105 func (cl *Client) LocalPort() (port int) {
106 cl.eachListener(func(l Listener) bool {
107 port = addrPortOrZero(l.Addr())
113 func writeDhtServerStatus(w io.Writer, s DhtServer) {
114 dhtStats := s.Stats()
115 fmt.Fprintf(w, " ID: %x\n", s.ID())
116 spew.Fdump(w, dhtStats)
119 // Writes out a human readable status of the client, such as for writing to a
121 func (cl *Client) WriteStatus(_w io.Writer) {
124 w := bufio.NewWriter(_w)
126 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
127 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
128 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
129 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131 cl.eachDhtServer(func(s DhtServer) {
132 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133 writeDhtServerStatus(w, s)
135 spew.Fdump(w, &cl.stats)
136 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
138 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139 return l.InfoHash().AsString() < r.InfoHash().AsString()
142 fmt.Fprint(w, "<unknown name>")
144 fmt.Fprint(w, t.name())
150 "%f%% of %d bytes (%s)",
151 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
153 humanize.Bytes(uint64(*t.length)))
155 w.WriteString("<missing metainfo>")
163 func (cl *Client) initLogger() {
164 cl.logger = cl.config.Logger.WithValues(cl)
165 if !cl.config.Debug {
166 cl.logger = cl.logger.FilterLevel(log.Info)
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
186 dopplegangerAddrs: make(map[string]struct{}),
187 torrents: make(map[metainfo.Hash]*Torrent),
188 dialRateLimiter: rate.NewLimiter(10, 10),
190 cl.activeAnnounceLimiter.SlotsPerKey = 2
191 go cl.acceptLimitClearer()
199 cl.event.L = cl.locker()
200 storageImpl := cfg.DefaultStorage
201 if storageImpl == nil {
202 // We'd use mmap by default but HFS+ doesn't support sparse files.
203 storageImplCloser := storage.NewFile(cfg.DataDir)
204 cl.onClose = append(cl.onClose, func() {
205 if err := storageImplCloser.Close(); err != nil {
206 cl.logger.Printf("error closing default storage: %s", err)
209 storageImpl = storageImplCloser
211 cl.defaultStorage = storage.NewClient(storageImpl)
212 if cfg.IPBlocklist != nil {
213 cl.ipBlockList = cfg.IPBlocklist
216 if cfg.PeerID != "" {
217 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
219 o := copy(cl.peerID[:], cfg.Bep20)
220 _, err = rand.Read(cl.peerID[o:])
222 panic("error generating peer id")
226 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
234 for _, _s := range sockets {
235 s := _s // Go is fucking retarded.
236 cl.onClose = append(cl.onClose, func() { s.Close() })
237 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
238 cl.dialers = append(cl.dialers, s)
239 cl.listeners = append(cl.listeners, s)
240 go cl.acceptConnections(s)
246 for _, s := range sockets {
247 if pc, ok := s.(net.PacketConn); ok {
248 ds, err := cl.newAnacrolixDhtServer(pc)
252 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
253 cl.onClose = append(cl.onClose, func() { ds.Close() })
258 cl.websocketTrackers = websocketTrackers{
261 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
264 t, ok := cl.torrents[infoHash]
266 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
268 return t.announceRequest(event), nil
270 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
273 t, ok := cl.torrents[dcc.InfoHash]
275 cl.logger.WithDefaultLevel(log.Warning).Printf(
276 "got webrtc conn for unloaded torrent with infohash %x",
282 go t.onWebRtcConn(dc, dcc)
289 func (cl *Client) AddDhtServer(d DhtServer) {
290 cl.dhtServers = append(cl.dhtServers, d)
293 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
294 // given address for any Torrent.
295 func (cl *Client) AddDialer(d Dialer) {
298 cl.dialers = append(cl.dialers, d)
299 for _, t := range cl.torrents {
304 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
306 func (cl *Client) AddListener(l Listener) {
307 cl.listeners = append(cl.listeners, l)
308 go cl.acceptConnections(l)
311 func (cl *Client) firewallCallback(net.Addr) bool {
313 block := !cl.wantConns()
316 torrent.Add("connections firewalled", 1)
318 torrent.Add("connections not firewalled", 1)
323 func (cl *Client) listenOnNetwork(n network) bool {
324 if n.Ipv4 && cl.config.DisableIPv4 {
327 if n.Ipv6 && cl.config.DisableIPv6 {
330 if n.Tcp && cl.config.DisableTCP {
333 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
339 func (cl *Client) listenNetworks() (ns []network) {
340 for _, n := range allPeerNetworks {
341 if cl.listenOnNetwork(n) {
348 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
349 cfg := dht.ServerConfig{
350 IPBlocklist: cl.ipBlockList,
352 OnAnnouncePeer: cl.onDHTAnnouncePeer,
353 PublicIP: func() net.IP {
354 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
355 return cl.config.PublicIp6
357 return cl.config.PublicIp4
359 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
360 ConnectionTracking: cl.config.ConnTracker,
361 OnQuery: cl.config.DHTOnQuery,
362 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
364 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
367 s, err = dht.NewServer(&cfg)
370 ts, err := s.Bootstrap()
372 cl.logger.Printf("error bootstrapping dht: %s", err)
374 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
380 func (cl *Client) Closed() <-chan struct{} {
386 func (cl *Client) eachDhtServer(f func(DhtServer)) {
387 for _, ds := range cl.dhtServers {
392 // Stops the client. All connections to peers are closed and all activity will
394 func (cl *Client) Close() {
398 for _, t := range cl.torrents {
401 for i := range cl.onClose {
402 cl.onClose[len(cl.onClose)-1-i]()
407 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
408 if cl.ipBlockList == nil {
411 return cl.ipBlockList.Lookup(ip)
414 func (cl *Client) ipIsBlocked(ip net.IP) bool {
415 _, blocked := cl.ipBlockRange(ip)
419 func (cl *Client) wantConns() bool {
420 for _, t := range cl.torrents {
428 func (cl *Client) waitAccept() {
430 if cl.closed.IsSet() {
440 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
441 func (cl *Client) rejectAccepted(conn net.Conn) error {
442 ra := conn.RemoteAddr()
443 if rip := addrIpOrNil(ra); rip != nil {
444 if cl.config.DisableIPv4Peers && rip.To4() != nil {
445 return errors.New("ipv4 peers disabled")
447 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
448 return errors.New("ipv4 disabled")
451 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
452 return errors.New("ipv6 disabled")
454 if cl.rateLimitAccept(rip) {
455 return errors.New("source IP accepted rate limited")
457 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
458 return errors.New("bad source addr")
464 func (cl *Client) acceptConnections(l Listener) {
466 conn, err := l.Accept()
467 torrent.Add("client listener accepts", 1)
468 conn = pproffd.WrapNetConn(conn)
470 closed := cl.closed.IsSet()
473 reject = cl.rejectAccepted(conn)
483 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
488 torrent.Add("rejected accepted connections", 1)
489 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
492 go cl.incomingConnection(conn)
494 log.Fmsg("accepted %q connection at %q from %q",
498 ).SetLevel(log.Debug).Log(cl.logger)
499 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
500 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
501 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
506 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
507 func regularNetConnPeerConnConnString(nc net.Conn) string {
508 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
511 func (cl *Client) incomingConnection(nc net.Conn) {
513 if tc, ok := nc.(*net.TCPConn); ok {
516 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
517 regularNetConnPeerConnConnString(nc))
519 c.Discovery = PeerSourceIncoming
520 cl.runReceivedConn(c)
523 // Returns a handle to the given torrent, if it's present in the client.
524 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
527 t, ok = cl.torrents[ih]
531 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
532 return cl.torrents[ih]
535 type dialResult struct {
540 func countDialResult(err error) {
542 torrent.Add("successful dials", 1)
544 torrent.Add("unsuccessful dials", 1)
548 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
549 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
550 if ret < minDialTimeout {
556 // Returns whether an address is known to connect to a client with our own ID.
557 func (cl *Client) dopplegangerAddr(addr string) bool {
558 _, ok := cl.dopplegangerAddrs[addr]
562 // Returns a connection over UTP or TCP, whichever is first to connect.
563 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
565 t := perf.NewTimer(perf.CallerName(0))
568 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
570 t.Mark("returned conn over " + res.Network)
574 ctx, cancel := context.WithCancel(ctx)
575 // As soon as we return one connection, cancel the others.
578 resCh := make(chan dialResult, left)
582 cl.eachDialer(func(s Dialer) bool {
585 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
588 cl.dialFromSocket(ctx, s, addr),
589 s.LocalAddr().Network(),
596 // Wait for a successful connection.
598 defer perf.ScopeTimer()()
599 for ; left > 0 && res.Conn == nil; left-- {
603 // There are still incompleted dials.
605 for ; left > 0; left-- {
606 conn := (<-resCh).Conn
613 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
615 //if res.Conn != nil {
616 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
618 // cl.logger.Printf("failed to dial %s", addr)
623 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
624 network := s.LocalAddr().Network()
625 cte := cl.config.ConnTracker.Wait(
627 conntrack.Entry{network, s.LocalAddr().String(), addr},
628 "dial torrent client",
631 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
632 // which dial errors allow us to forget the connection tracking entry handle.
633 if ctx.Err() != nil {
639 c, err := s.Dial(ctx, addr)
640 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
641 // it now in case we close the connection forthwith.
642 if tc, ok := c.(*net.TCPConn); ok {
647 if err != nil && forgettableDialError(err) {
654 return closeWrapper{c, func() error {
661 func forgettableDialError(err error) bool {
662 return strings.Contains(err.Error(), "no suitable address found")
665 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
666 if _, ok := t.halfOpen[addr]; !ok {
667 panic("invariant broken")
669 delete(t.halfOpen, addr)
671 for _, t := range cl.torrents {
676 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
677 // for valid reasons.
678 func (cl *Client) initiateProtocolHandshakes(
682 outgoing, encryptHeader bool,
683 remoteAddr PeerRemoteAddr,
684 network, connString string,
686 c *PeerConn, err error,
688 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
689 c.headerEncrypted = encryptHeader
690 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
692 dl, ok := ctx.Deadline()
696 err = nc.SetDeadline(dl)
700 err = cl.initiateHandshakes(c, t)
704 // Returns nil connection and nil error if no connection could be established for valid reasons.
705 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
706 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
709 return t.dialTimeout()
712 dr := cl.dialFirst(dialCtx, addr.String())
715 if dialCtx.Err() != nil {
716 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
718 return nil, errors.New("dial failed")
720 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
727 // Returns nil connection and nil error if no connection could be established
728 // for valid reasons.
729 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
730 torrent.Add("establish outgoing connection", 1)
731 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
732 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with preferred header obfuscation", 1)
737 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
738 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
739 // We should have just tried with the preferred header obfuscation. If it was required,
740 // there's nothing else to try.
743 // Try again with encryption if we didn't earlier, or without if we did.
744 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
746 torrent.Add("initiated conn with fallback header obfuscation", 1)
748 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
752 // Called to dial out and run a connection. The addr we're given is already
753 // considered half-open.
754 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
755 cl.dialRateLimiter.Wait(context.Background())
756 c, err := cl.establishOutgoingConn(t, addr)
759 // Don't release lock between here and addConnection, unless it's for
761 cl.noLongerHalfOpen(t, addr.String())
764 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
771 t.runHandshookConnLoggingErr(c)
774 // The port number for incoming peer connections. 0 if the client isn't listening.
775 func (cl *Client) incomingPeerPort() int {
776 return cl.LocalPort()
779 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
780 if c.headerEncrypted {
783 rw, c.cryptoMethod, err = mse.InitiateHandshake(
790 cl.config.CryptoProvides,
794 return xerrors.Errorf("header obfuscation handshake: %w", err)
797 ih, err := cl.connBtHandshake(c, &t.infoHash)
799 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
801 if ih != t.infoHash {
802 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
807 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
808 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
809 func (cl *Client) forSkeys(f func([]byte) bool) {
812 if false { // Emulate the bug from #114
814 for ih := range cl.torrents {
818 for range cl.torrents {
825 for ih := range cl.torrents {
832 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
833 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
839 // Do encryption and bittorrent handshakes as receiver.
840 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
841 defer perf.ScopeTimerErr(&err)()
843 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
845 if err == nil || err == mse.ErrNoSecretKeyMatch {
846 if c.headerEncrypted {
847 torrent.Add("handshakes received encrypted", 1)
849 torrent.Add("handshakes received unencrypted", 1)
852 torrent.Add("handshakes received with error while handling encryption", 1)
855 if err == mse.ErrNoSecretKeyMatch {
860 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
861 err = errors.New("connection does not have required header obfuscation")
864 ih, err := cl.connBtHandshake(c, nil)
866 err = xerrors.Errorf("during bt handshake: %w", err)
875 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
876 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
881 c.PeerExtensionBytes = res.PeerExtensionBits
882 c.PeerID = res.PeerID
883 c.completedHandshake = time.Now()
884 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
890 func (cl *Client) runReceivedConn(c *PeerConn) {
891 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
895 t, err := cl.receiveHandshakes(c)
898 "error receiving handshakes on %v: %s", c, err,
899 ).SetLevel(log.Debug).
901 "network", c.Network,
903 torrent.Add("error receiving handshake", 1)
905 cl.onBadAccept(c.RemoteAddr)
910 torrent.Add("received handshake for unloaded torrent", 1)
911 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
913 cl.onBadAccept(c.RemoteAddr)
917 torrent.Add("received handshake for loaded torrent", 1)
920 t.runHandshookConnLoggingErr(c)
923 // Client lock must be held before entering this.
924 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
926 if c.PeerID == cl.peerID {
929 addr := c.conn.RemoteAddr().String()
930 cl.dopplegangerAddrs[addr] = struct{}{}
932 // Because the remote address is not necessarily the same as its client's torrent listen
933 // address, we won't record the remote address as a doppleganger. Instead, the initiator
934 // can record *us* as the doppleganger.
936 return errors.New("local and remote peer ids are the same")
938 c.conn.SetWriteDeadline(time.Time{})
939 c.r = deadlineReader{c.conn, c.r}
940 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
941 if connIsIpv6(c.conn) {
942 torrent.Add("completed handshake over ipv6", 1)
944 if err := t.addConnection(c); err != nil {
945 return fmt.Errorf("adding connection: %w", err)
947 defer t.dropConnection(c)
948 go c.writer(time.Minute)
949 cl.sendInitialMessages(c, t)
950 err := c.mainReadLoop()
952 return fmt.Errorf("main read loop: %w", err)
957 // See the order given in Transmission's tr_peerMsgsNew.
958 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
959 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
960 conn.post(pp.Message{
962 ExtendedID: pp.HandshakeExtendedID,
963 ExtendedPayload: func() []byte {
964 msg := pp.ExtendedHandshakeMessage{
965 M: map[pp.ExtensionName]pp.ExtensionNumber{
966 pp.ExtensionNameMetadata: metadataExtendedId,
968 V: cl.config.ExtendedHandshakeClientVersion,
969 // If peer requests are buffered on read, this instructs the amount of memory
970 // that might be used to cache pending writes. Assuming 512KiB cached for
971 // sending, for 16KiB chunks.
973 YourIp: pp.CompactIp(conn.remoteIp()),
974 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
975 Port: cl.incomingPeerPort(),
976 MetadataSize: torrent.metadataSize(),
977 // TODO: We can figured these out specific to the socket
979 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
980 Ipv6: cl.config.PublicIp6.To16(),
982 if !cl.config.DisablePEX {
983 msg.M[pp.ExtensionNamePex] = pexExtendedId
985 return bencode.MustMarshal(msg)
990 if conn.fastEnabled() {
991 if torrent.haveAllPieces() {
992 conn.post(pp.Message{Type: pp.HaveAll})
993 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
995 } else if !torrent.haveAnyPieces() {
996 conn.post(pp.Message{Type: pp.HaveNone})
997 conn.sentHaves.Clear()
1003 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1004 conn.post(pp.Message{
1011 func (cl *Client) dhtPort() (ret uint16) {
1012 cl.eachDhtServer(func(s DhtServer) {
1013 ret = uint16(missinggo.AddrPort(s.Addr()))
1018 func (cl *Client) haveDhtServer() (ret bool) {
1019 cl.eachDhtServer(func(_ DhtServer) {
1025 // Process incoming ut_metadata message.
1026 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1027 var d map[string]int
1028 err := bencode.Unmarshal(payload, &d)
1029 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1030 } else if err != nil {
1031 return fmt.Errorf("error unmarshalling bencode: %s", err)
1033 msgType, ok := d["msg_type"]
1035 return errors.New("missing msg_type field")
1039 case pp.DataMetadataExtensionMsgType:
1040 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1041 if !c.requestedMetadataPiece(piece) {
1042 return fmt.Errorf("got unexpected piece %d", piece)
1044 c.metadataRequests[piece] = false
1045 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1046 if begin < 0 || begin >= len(payload) {
1047 return fmt.Errorf("data has bad offset in payload: %d", begin)
1049 t.saveMetadataPiece(piece, payload[begin:])
1050 c.lastUsefulChunkReceived = time.Now()
1051 return t.maybeCompleteMetadata()
1052 case pp.RequestMetadataExtensionMsgType:
1053 if !t.haveMetadataPiece(piece) {
1054 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1057 start := (1 << 14) * piece
1058 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1059 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1061 case pp.RejectMetadataExtensionMsgType:
1064 return errors.New("unknown msg_type value")
1068 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1069 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1070 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1075 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1079 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1082 if _, ok := cl.ipBlockRange(ip); ok {
1085 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1091 // Return a Torrent ready for insertion into a Client.
1092 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1093 // use provided storage, if provided
1094 storageClient := cl.defaultStorage
1095 if specStorage != nil {
1096 storageClient = storage.NewClient(specStorage)
1102 peers: prioritizedPeers{
1104 getPrio: func(p PeerInfo) peerPriority {
1106 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1109 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1111 halfOpen: make(map[string]PeerInfo),
1112 pieceStateChanges: pubsub.NewPubSub(),
1114 storageOpener: storageClient,
1115 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1117 networkingEnabled: true,
1118 metadataChanged: sync.Cond{
1121 webSeeds: make(map[string]*Peer),
1123 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1124 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1125 t.logger = cl.logger.WithContextValue(t)
1126 t.setChunkSize(defaultChunkSize)
1130 // A file-like handle to some torrent data resource.
1131 type Handle interface {
1138 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1139 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1142 // Adds a torrent by InfoHash with a custom Storage implementation.
1143 // If the torrent already exists then this Storage is ignored and the
1144 // existing torrent returned with `new` set to `false`
1145 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1148 t, ok := cl.torrents[infoHash]
1154 t = cl.newTorrent(infoHash, specStorage)
1155 cl.eachDhtServer(func(s DhtServer) {
1156 go t.dhtAnnouncer(s)
1158 cl.torrents[infoHash] = t
1159 cl.clearAcceptLimits()
1160 t.updateWantPeersEvent()
1161 // Tickle Client.waitAccept, new torrent may want conns.
1162 cl.event.Broadcast()
1166 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1167 // Torrent.MergeSpec.
1168 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1169 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1170 err = t.MergeSpec(spec)
1171 if err != nil && new {
1177 type stringAddr string
1179 var _ net.Addr = stringAddr("")
1181 func (stringAddr) Network() string { return "" }
1182 func (me stringAddr) String() string { return string(me) }
1184 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1185 // spec.DisallowDataDownload/Upload will be read and applied
1186 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1187 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1188 if spec.DisplayName != "" {
1189 t.SetDisplayName(spec.DisplayName)
1191 if spec.InfoBytes != nil {
1192 err := t.SetInfoBytes(spec.InfoBytes)
1198 cl.AddDhtNodes(spec.DhtNodes)
1201 useTorrentSources(spec.Sources, t)
1202 for _, url := range spec.Webseeds {
1205 for _, peerAddr := range spec.PeerAddrs {
1207 Addr: stringAddr(peerAddr),
1208 Source: PeerSourceDirect,
1212 if spec.ChunkSize != 0 {
1213 t.setChunkSize(pp.Integer(spec.ChunkSize))
1215 t.addTrackers(spec.Trackers)
1217 t.dataDownloadDisallowed = spec.DisallowDataDownload
1218 t.dataUploadDisallowed = spec.DisallowDataUpload
1222 func useTorrentSources(sources []string, t *Torrent) {
1223 for _, s := range sources {
1225 err := useTorrentSource(s, t)
1227 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1229 t.logger.Printf("successfully used source %q", s)
1235 func useTorrentSource(source string, t *Torrent) error {
1236 req, err := http.NewRequest(http.MethodGet, source, nil)
1240 ctx, cancel := context.WithCancel(context.Background())
1250 req = req.WithContext(ctx)
1251 resp, err := http.DefaultClient.Do(req)
1255 mi, err := metainfo.Load(resp.Body)
1257 if ctx.Err() != nil {
1262 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1265 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1266 t, ok := cl.torrents[infoHash]
1268 err = fmt.Errorf("no such torrent")
1275 delete(cl.torrents, infoHash)
1279 func (cl *Client) allTorrentsCompleted() bool {
1280 for _, t := range cl.torrents {
1284 if !t.haveAllPieces() {
1291 // Returns true when all torrents are completely downloaded and false if the
1292 // client is stopped before that.
1293 func (cl *Client) WaitAll() bool {
1296 for !cl.allTorrentsCompleted() {
1297 if cl.closed.IsSet() {
1305 // Returns handles to all the torrents loaded in the Client.
1306 func (cl *Client) Torrents() []*Torrent {
1309 return cl.torrentsAsSlice()
1312 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1313 for _, t := range cl.torrents {
1314 ret = append(ret, t)
1319 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1320 spec, err := TorrentSpecFromMagnetUri(uri)
1324 T, _, err = cl.AddTorrentSpec(spec)
1328 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1329 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1333 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1334 mi, err := metainfo.LoadFromFile(filename)
1338 return cl.AddTorrent(mi)
1341 func (cl *Client) DhtServers() []DhtServer {
1342 return cl.dhtServers
1345 func (cl *Client) AddDhtNodes(nodes []string) {
1346 for _, n := range nodes {
1347 hmp := missinggo.SplitHostMaybePort(n)
1348 ip := net.ParseIP(hmp.Host)
1350 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1353 ni := krpc.NodeInfo{
1354 Addr: krpc.NodeAddr{
1359 cl.eachDhtServer(func(s DhtServer) {
1365 func (cl *Client) banPeerIP(ip net.IP) {
1366 cl.logger.Printf("banning ip %v", ip)
1367 if cl.badPeerIPs == nil {
1368 cl.badPeerIPs = make(map[string]struct{})
1370 cl.badPeerIPs[ip.String()] = struct{}{}
1373 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1382 PeerMaxRequests: 250,
1384 RemoteAddr: remoteAddr,
1386 callbacks: &cl.config.Callbacks,
1388 connString: connString,
1390 writeBuffer: new(bytes.Buffer),
1393 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1394 c.writerCond.L = cl.locker()
1395 c.setRW(connStatsReadWriter{nc, c})
1396 c.r = &rateLimitedReader{
1397 l: cl.config.DownloadRateLimiter,
1400 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1401 for _, f := range cl.config.Callbacks.NewPeer {
1407 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1414 t.addPeers([]PeerInfo{{
1415 Addr: ipPortAddr{ip, port},
1416 Source: PeerSourceDhtAnnouncePeer,
1420 func firstNotNil(ips ...net.IP) net.IP {
1421 for _, ip := range ips {
1429 func (cl *Client) eachDialer(f func(Dialer) bool) {
1430 for _, s := range cl.dialers {
1437 func (cl *Client) eachListener(f func(Listener) bool) {
1438 for _, s := range cl.listeners {
1445 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1446 cl.eachListener(func(l Listener) bool {
1453 func (cl *Client) publicIp(peer net.IP) net.IP {
1454 // TODO: Use BEP 10 to determine how peers are seeing us.
1455 if peer.To4() != nil {
1457 cl.config.PublicIp4,
1458 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1463 cl.config.PublicIp6,
1464 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1468 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1469 l := cl.findListener(
1470 func(l Listener) bool {
1471 return f(addrIpOrNil(l.Addr()))
1477 return addrIpOrNil(l.Addr())
1480 // Our IP as a peer should see it.
1481 func (cl *Client) publicAddr(peer net.IP) IpPort {
1482 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1485 // ListenAddrs addresses currently being listened to.
1486 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1489 cl.eachListener(func(l Listener) bool {
1490 ret = append(ret, l.Addr())
1496 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1497 ipa, ok := tryIpPortFromNetAddr(addr)
1501 ip := maskIpForAcceptLimiting(ipa.IP)
1502 if cl.acceptLimiter == nil {
1503 cl.acceptLimiter = make(map[ipStr]int)
1505 cl.acceptLimiter[ipStr(ip.String())]++
1508 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1509 if ip4 := ip.To4(); ip4 != nil {
1510 return ip4.Mask(net.CIDRMask(24, 32))
1515 func (cl *Client) clearAcceptLimits() {
1516 cl.acceptLimiter = nil
1519 func (cl *Client) acceptLimitClearer() {
1522 case <-cl.closed.LockedChan(cl.locker()):
1524 case <-time.After(15 * time.Minute):
1526 cl.clearAcceptLimits()
1532 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1533 if cl.config.DisableAcceptRateLimiting {
1536 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1539 func (cl *Client) rLock() {
1543 func (cl *Client) rUnlock() {
1547 func (cl *Client) lock() {
1551 func (cl *Client) unlock() {
1555 func (cl *Client) locker() *lockWithDeferreds {
1559 func (cl *Client) String() string {
1560 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1563 // Returns connection-level aggregate stats at the Client level. See the comment on
1564 // TorrentStats.ConnStats.
1565 func (cl *Client) ConnStats() ConnStats {
1566 return cl.stats.Copy()