18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
88 func (cl *Client) BadPeerIPs() []string {
91 return cl.badPeerIPsLocked()
94 func (cl *Client) badPeerIPsLocked() []string {
95 return slices.FromMapKeys(cl.badPeerIPs).([]string)
98 func (cl *Client) PeerID() PeerID {
102 // Returns the port number for the first listener that has one. No longer assumes that all port
103 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
105 func (cl *Client) LocalPort() (port int) {
106 cl.eachListener(func(l Listener) bool {
107 port = addrPortOrZero(l.Addr())
113 func writeDhtServerStatus(w io.Writer, s DhtServer) {
114 dhtStats := s.Stats()
115 fmt.Fprintf(w, " ID: %x\n", s.ID())
116 spew.Fdump(w, dhtStats)
119 // Writes out a human readable status of the client, such as for writing to a
121 func (cl *Client) WriteStatus(_w io.Writer) {
124 w := bufio.NewWriter(_w)
126 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
127 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
128 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
129 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131 cl.eachDhtServer(func(s DhtServer) {
132 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133 writeDhtServerStatus(w, s)
135 spew.Fdump(w, &cl.stats)
136 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
138 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139 return l.InfoHash().AsString() < r.InfoHash().AsString()
142 fmt.Fprint(w, "<unknown name>")
144 fmt.Fprint(w, t.name())
150 "%f%% of %d bytes (%s)",
151 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
153 humanize.Bytes(uint64(*t.length)))
155 w.WriteString("<missing metainfo>")
163 func (cl *Client) initLogger() {
164 cl.logger = cl.config.Logger.WithValues(cl)
165 if !cl.config.Debug {
166 cl.logger = cl.logger.FilterLevel(log.Info)
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
186 dopplegangerAddrs: make(map[string]struct{}),
187 torrents: make(map[metainfo.Hash]*Torrent),
188 dialRateLimiter: rate.NewLimiter(10, 10),
190 cl.activeAnnounceLimiter.SlotsPerKey = 2
191 go cl.acceptLimitClearer()
199 cl.event.L = cl.locker()
200 storageImpl := cfg.DefaultStorage
201 if storageImpl == nil {
202 // We'd use mmap by default but HFS+ doesn't support sparse files.
203 storageImplCloser := storage.NewFile(cfg.DataDir)
204 cl.onClose = append(cl.onClose, func() {
205 if err := storageImplCloser.Close(); err != nil {
206 cl.logger.Printf("error closing default storage: %s", err)
209 storageImpl = storageImplCloser
211 cl.defaultStorage = storage.NewClient(storageImpl)
212 if cfg.IPBlocklist != nil {
213 cl.ipBlockList = cfg.IPBlocklist
216 if cfg.PeerID != "" {
217 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
219 o := copy(cl.peerID[:], cfg.Bep20)
220 _, err = rand.Read(cl.peerID[o:])
222 panic("error generating peer id")
226 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
234 for _, _s := range sockets {
235 s := _s // Go is fucking retarded.
236 cl.onClose = append(cl.onClose, func() { s.Close() })
237 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
238 cl.dialers = append(cl.dialers, s)
239 cl.listeners = append(cl.listeners, s)
240 go cl.acceptConnections(s)
246 for _, s := range sockets {
247 if pc, ok := s.(net.PacketConn); ok {
248 ds, err := cl.newAnacrolixDhtServer(pc)
252 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
253 cl.onClose = append(cl.onClose, func() { ds.Close() })
258 cl.websocketTrackers = websocketTrackers{
261 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
264 t, ok := cl.torrents[infoHash]
266 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
268 return t.announceRequest(event), nil
270 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
273 t, ok := cl.torrents[dcc.InfoHash]
275 cl.logger.WithDefaultLevel(log.Warning).Printf(
276 "got webrtc conn for unloaded torrent with infohash %x",
282 go t.onWebRtcConn(dc, dcc)
289 func (cl *Client) AddDhtServer(d DhtServer) {
290 cl.dhtServers = append(cl.dhtServers, d)
293 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
294 // given address for any Torrent.
295 func (cl *Client) AddDialer(d Dialer) {
298 cl.dialers = append(cl.dialers, d)
299 for _, t := range cl.torrents {
304 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
306 func (cl *Client) AddListener(l Listener) {
307 cl.listeners = append(cl.listeners, l)
308 go cl.acceptConnections(l)
311 func (cl *Client) firewallCallback(net.Addr) bool {
313 block := !cl.wantConns()
316 torrent.Add("connections firewalled", 1)
318 torrent.Add("connections not firewalled", 1)
323 func (cl *Client) listenOnNetwork(n network) bool {
324 if n.Ipv4 && cl.config.DisableIPv4 {
327 if n.Ipv6 && cl.config.DisableIPv6 {
330 if n.Tcp && cl.config.DisableTCP {
333 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
339 func (cl *Client) listenNetworks() (ns []network) {
340 for _, n := range allPeerNetworks {
341 if cl.listenOnNetwork(n) {
348 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
349 cfg := dht.ServerConfig{
350 IPBlocklist: cl.ipBlockList,
352 OnAnnouncePeer: cl.onDHTAnnouncePeer,
353 PublicIP: func() net.IP {
354 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
355 return cl.config.PublicIp6
357 return cl.config.PublicIp4
359 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
360 ConnectionTracking: cl.config.ConnTracker,
361 OnQuery: cl.config.DHTOnQuery,
362 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
364 s, err = dht.NewServer(&cfg)
367 ts, err := s.Bootstrap()
369 cl.logger.Printf("error bootstrapping dht: %s", err)
371 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
377 func (cl *Client) Closed() <-chan struct{} {
383 func (cl *Client) eachDhtServer(f func(DhtServer)) {
384 for _, ds := range cl.dhtServers {
389 // Stops the client. All connections to peers are closed and all activity will
391 func (cl *Client) Close() {
395 for _, t := range cl.torrents {
398 for i := range cl.onClose {
399 cl.onClose[len(cl.onClose)-1-i]()
404 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
405 if cl.ipBlockList == nil {
408 return cl.ipBlockList.Lookup(ip)
411 func (cl *Client) ipIsBlocked(ip net.IP) bool {
412 _, blocked := cl.ipBlockRange(ip)
416 func (cl *Client) wantConns() bool {
417 for _, t := range cl.torrents {
425 func (cl *Client) waitAccept() {
427 if cl.closed.IsSet() {
437 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
438 func (cl *Client) rejectAccepted(conn net.Conn) error {
439 ra := conn.RemoteAddr()
440 if rip := addrIpOrNil(ra); rip != nil {
441 if cl.config.DisableIPv4Peers && rip.To4() != nil {
442 return errors.New("ipv4 peers disabled")
444 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
445 return errors.New("ipv4 disabled")
448 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
449 return errors.New("ipv6 disabled")
451 if cl.rateLimitAccept(rip) {
452 return errors.New("source IP accepted rate limited")
454 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
455 return errors.New("bad source addr")
461 func (cl *Client) acceptConnections(l Listener) {
463 conn, err := l.Accept()
464 torrent.Add("client listener accepts", 1)
465 conn = pproffd.WrapNetConn(conn)
467 closed := cl.closed.IsSet()
470 reject = cl.rejectAccepted(conn)
480 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
485 torrent.Add("rejected accepted connections", 1)
486 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
489 go cl.incomingConnection(conn)
491 log.Fmsg("accepted %q connection at %q from %q",
495 ).SetLevel(log.Debug).Log(cl.logger)
496 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
497 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
498 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
503 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
504 func regularNetConnPeerConnConnString(nc net.Conn) string {
505 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
508 func (cl *Client) incomingConnection(nc net.Conn) {
510 if tc, ok := nc.(*net.TCPConn); ok {
513 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
514 regularNetConnPeerConnConnString(nc))
516 c.Discovery = PeerSourceIncoming
517 cl.runReceivedConn(c)
520 // Returns a handle to the given torrent, if it's present in the client.
521 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
524 t, ok = cl.torrents[ih]
528 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
529 return cl.torrents[ih]
532 type dialResult struct {
537 func countDialResult(err error) {
539 torrent.Add("successful dials", 1)
541 torrent.Add("unsuccessful dials", 1)
545 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
546 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
547 if ret < minDialTimeout {
553 // Returns whether an address is known to connect to a client with our own ID.
554 func (cl *Client) dopplegangerAddr(addr string) bool {
555 _, ok := cl.dopplegangerAddrs[addr]
559 // Returns a connection over UTP or TCP, whichever is first to connect.
560 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
562 t := perf.NewTimer(perf.CallerName(0))
565 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
567 t.Mark("returned conn over " + res.Network)
571 ctx, cancel := context.WithCancel(ctx)
572 // As soon as we return one connection, cancel the others.
575 resCh := make(chan dialResult, left)
579 cl.eachDialer(func(s Dialer) bool {
582 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
585 cl.dialFromSocket(ctx, s, addr),
586 s.LocalAddr().Network(),
593 // Wait for a successful connection.
595 defer perf.ScopeTimer()()
596 for ; left > 0 && res.Conn == nil; left-- {
600 // There are still incompleted dials.
602 for ; left > 0; left-- {
603 conn := (<-resCh).Conn
610 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
612 //if res.Conn != nil {
613 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
615 // cl.logger.Printf("failed to dial %s", addr)
620 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
621 network := s.LocalAddr().Network()
622 cte := cl.config.ConnTracker.Wait(
624 conntrack.Entry{network, s.LocalAddr().String(), addr},
625 "dial torrent client",
628 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
629 // which dial errors allow us to forget the connection tracking entry handle.
630 if ctx.Err() != nil {
636 c, err := s.Dial(ctx, addr)
637 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
638 // it now in case we close the connection forthwith.
639 if tc, ok := c.(*net.TCPConn); ok {
644 if err != nil && forgettableDialError(err) {
651 return closeWrapper{c, func() error {
658 func forgettableDialError(err error) bool {
659 return strings.Contains(err.Error(), "no suitable address found")
662 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
663 if _, ok := t.halfOpen[addr]; !ok {
664 panic("invariant broken")
666 delete(t.halfOpen, addr)
668 for _, t := range cl.torrents {
673 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
674 // for valid reasons.
675 func (cl *Client) initiateProtocolHandshakes(
679 outgoing, encryptHeader bool,
680 remoteAddr PeerRemoteAddr,
681 network, connString string,
683 c *PeerConn, err error,
685 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
686 c.headerEncrypted = encryptHeader
687 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
689 dl, ok := ctx.Deadline()
693 err = nc.SetDeadline(dl)
697 err = cl.initiateHandshakes(c, t)
701 // Returns nil connection and nil error if no connection could be established for valid reasons.
702 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
703 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
706 return t.dialTimeout()
709 dr := cl.dialFirst(dialCtx, addr.String())
712 if dialCtx.Err() != nil {
713 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
715 return nil, errors.New("dial failed")
717 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
724 // Returns nil connection and nil error if no connection could be established
725 // for valid reasons.
726 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
727 torrent.Add("establish outgoing connection", 1)
728 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
729 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
731 torrent.Add("initiated conn with preferred header obfuscation", 1)
734 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
735 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
736 // We should have just tried with the preferred header obfuscation. If it was required,
737 // there's nothing else to try.
740 // Try again with encryption if we didn't earlier, or without if we did.
741 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
743 torrent.Add("initiated conn with fallback header obfuscation", 1)
745 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
749 // Called to dial out and run a connection. The addr we're given is already
750 // considered half-open.
751 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
752 cl.dialRateLimiter.Wait(context.Background())
753 c, err := cl.establishOutgoingConn(t, addr)
756 // Don't release lock between here and addConnection, unless it's for
758 cl.noLongerHalfOpen(t, addr.String())
761 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
768 t.runHandshookConnLoggingErr(c)
771 // The port number for incoming peer connections. 0 if the client isn't listening.
772 func (cl *Client) incomingPeerPort() int {
773 return cl.LocalPort()
776 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
777 if c.headerEncrypted {
780 rw, c.cryptoMethod, err = mse.InitiateHandshake(
787 cl.config.CryptoProvides,
791 return xerrors.Errorf("header obfuscation handshake: %w", err)
794 ih, err := cl.connBtHandshake(c, &t.infoHash)
796 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
798 if ih != t.infoHash {
799 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
804 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
805 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
806 func (cl *Client) forSkeys(f func([]byte) bool) {
809 if false { // Emulate the bug from #114
811 for ih := range cl.torrents {
815 for range cl.torrents {
822 for ih := range cl.torrents {
829 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
830 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
836 // Do encryption and bittorrent handshakes as receiver.
837 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
838 defer perf.ScopeTimerErr(&err)()
840 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
842 if err == nil || err == mse.ErrNoSecretKeyMatch {
843 if c.headerEncrypted {
844 torrent.Add("handshakes received encrypted", 1)
846 torrent.Add("handshakes received unencrypted", 1)
849 torrent.Add("handshakes received with error while handling encryption", 1)
852 if err == mse.ErrNoSecretKeyMatch {
857 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
858 err = errors.New("connection does not have required header obfuscation")
861 ih, err := cl.connBtHandshake(c, nil)
863 err = xerrors.Errorf("during bt handshake: %w", err)
872 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
873 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
878 c.PeerExtensionBytes = res.PeerExtensionBits
879 c.PeerID = res.PeerID
880 c.completedHandshake = time.Now()
881 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
887 func (cl *Client) runReceivedConn(c *PeerConn) {
888 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
892 t, err := cl.receiveHandshakes(c)
895 "error receiving handshakes on %v: %s", c, err,
896 ).SetLevel(log.Debug).
898 "network", c.network,
900 torrent.Add("error receiving handshake", 1)
902 cl.onBadAccept(c.RemoteAddr)
907 torrent.Add("received handshake for unloaded torrent", 1)
908 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
910 cl.onBadAccept(c.RemoteAddr)
914 torrent.Add("received handshake for loaded torrent", 1)
917 t.runHandshookConnLoggingErr(c)
920 // Client lock must be held before entering this.
921 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
923 if c.PeerID == cl.peerID {
926 addr := c.conn.RemoteAddr().String()
927 cl.dopplegangerAddrs[addr] = struct{}{}
929 // Because the remote address is not necessarily the same as its client's torrent listen
930 // address, we won't record the remote address as a doppleganger. Instead, the initiator
931 // can record *us* as the doppleganger.
933 return errors.New("local and remote peer ids are the same")
935 c.conn.SetWriteDeadline(time.Time{})
936 c.r = deadlineReader{c.conn, c.r}
937 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
938 if connIsIpv6(c.conn) {
939 torrent.Add("completed handshake over ipv6", 1)
941 if err := t.addConnection(c); err != nil {
942 return fmt.Errorf("adding connection: %w", err)
944 defer t.dropConnection(c)
945 go c.writer(time.Minute)
946 cl.sendInitialMessages(c, t)
947 err := c.mainReadLoop()
949 return fmt.Errorf("main read loop: %w", err)
954 // See the order given in Transmission's tr_peerMsgsNew.
955 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
956 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
957 conn.post(pp.Message{
959 ExtendedID: pp.HandshakeExtendedID,
960 ExtendedPayload: func() []byte {
961 msg := pp.ExtendedHandshakeMessage{
962 M: map[pp.ExtensionName]pp.ExtensionNumber{
963 pp.ExtensionNameMetadata: metadataExtendedId,
965 V: cl.config.ExtendedHandshakeClientVersion,
966 // If peer requests are buffered on read, this instructs the amount of memory
967 // that might be used to cache pending writes. Assuming 512KiB cached for
968 // sending, for 16KiB chunks.
970 YourIp: pp.CompactIp(conn.remoteIp()),
971 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
972 Port: cl.incomingPeerPort(),
973 MetadataSize: torrent.metadataSize(),
974 // TODO: We can figured these out specific to the socket
976 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
977 Ipv6: cl.config.PublicIp6.To16(),
979 if !cl.config.DisablePEX {
980 msg.M[pp.ExtensionNamePex] = pexExtendedId
982 return bencode.MustMarshal(msg)
987 if conn.fastEnabled() {
988 if torrent.haveAllPieces() {
989 conn.post(pp.Message{Type: pp.HaveAll})
990 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
992 } else if !torrent.haveAnyPieces() {
993 conn.post(pp.Message{Type: pp.HaveNone})
994 conn.sentHaves.Clear()
1000 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1001 conn.post(pp.Message{
1008 func (cl *Client) dhtPort() (ret uint16) {
1009 cl.eachDhtServer(func(s DhtServer) {
1010 ret = uint16(missinggo.AddrPort(s.Addr()))
1015 func (cl *Client) haveDhtServer() (ret bool) {
1016 cl.eachDhtServer(func(_ DhtServer) {
1022 // Process incoming ut_metadata message.
1023 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1024 var d map[string]int
1025 err := bencode.Unmarshal(payload, &d)
1026 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1027 } else if err != nil {
1028 return fmt.Errorf("error unmarshalling bencode: %s", err)
1030 msgType, ok := d["msg_type"]
1032 return errors.New("missing msg_type field")
1036 case pp.DataMetadataExtensionMsgType:
1037 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1038 if !c.requestedMetadataPiece(piece) {
1039 return fmt.Errorf("got unexpected piece %d", piece)
1041 c.metadataRequests[piece] = false
1042 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1043 if begin < 0 || begin >= len(payload) {
1044 return fmt.Errorf("data has bad offset in payload: %d", begin)
1046 t.saveMetadataPiece(piece, payload[begin:])
1047 c.lastUsefulChunkReceived = time.Now()
1048 return t.maybeCompleteMetadata()
1049 case pp.RequestMetadataExtensionMsgType:
1050 if !t.haveMetadataPiece(piece) {
1051 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1054 start := (1 << 14) * piece
1055 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1056 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1058 case pp.RejectMetadataExtensionMsgType:
1061 return errors.New("unknown msg_type value")
1065 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1066 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1067 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1072 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1076 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1079 if _, ok := cl.ipBlockRange(ip); ok {
1082 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1088 // Return a Torrent ready for insertion into a Client.
1089 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1090 // use provided storage, if provided
1091 storageClient := cl.defaultStorage
1092 if specStorage != nil {
1093 storageClient = storage.NewClient(specStorage)
1099 peers: prioritizedPeers{
1101 getPrio: func(p PeerInfo) peerPriority {
1103 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1106 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1108 halfOpen: make(map[string]PeerInfo),
1109 pieceStateChanges: pubsub.NewPubSub(),
1111 storageOpener: storageClient,
1112 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1114 networkingEnabled: true,
1115 metadataChanged: sync.Cond{
1118 webSeeds: make(map[string]*Peer),
1120 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1121 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1122 t.logger = cl.logger.WithContextValue(t)
1123 t.setChunkSize(defaultChunkSize)
1127 // A file-like handle to some torrent data resource.
1128 type Handle interface {
1135 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1136 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1139 // Adds a torrent by InfoHash with a custom Storage implementation.
1140 // If the torrent already exists then this Storage is ignored and the
1141 // existing torrent returned with `new` set to `false`
1142 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1145 t, ok := cl.torrents[infoHash]
1151 t = cl.newTorrent(infoHash, specStorage)
1152 cl.eachDhtServer(func(s DhtServer) {
1153 go t.dhtAnnouncer(s)
1155 cl.torrents[infoHash] = t
1156 cl.clearAcceptLimits()
1157 t.updateWantPeersEvent()
1158 // Tickle Client.waitAccept, new torrent may want conns.
1159 cl.event.Broadcast()
1163 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1164 // Torrent.MergeSpec.
1165 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1166 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1167 err = t.MergeSpec(spec)
1168 if err != nil && new {
1174 type stringAddr string
1176 var _ net.Addr = stringAddr("")
1178 func (stringAddr) Network() string { return "" }
1179 func (me stringAddr) String() string { return string(me) }
1181 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1182 // spec.DisallowDataDownload/Upload will be read and applied
1183 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1184 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1185 if spec.DisplayName != "" {
1186 t.SetDisplayName(spec.DisplayName)
1188 if spec.InfoBytes != nil {
1189 err := t.SetInfoBytes(spec.InfoBytes)
1195 cl.AddDhtNodes(spec.DhtNodes)
1198 useTorrentSources(spec.Sources, t)
1199 for _, url := range spec.Webseeds {
1202 for _, peerAddr := range spec.PeerAddrs {
1204 Addr: stringAddr(peerAddr),
1205 Source: PeerSourceDirect,
1209 if spec.ChunkSize != 0 {
1210 t.setChunkSize(pp.Integer(spec.ChunkSize))
1212 t.addTrackers(spec.Trackers)
1214 t.dataDownloadDisallowed = spec.DisallowDataDownload
1215 t.dataUploadDisallowed = spec.DisallowDataUpload
1219 func useTorrentSources(sources []string, t *Torrent) {
1220 for _, s := range sources {
1222 err := useTorrentSource(s, t)
1224 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1226 t.logger.Printf("successfully used source %q", s)
1232 func useTorrentSource(source string, t *Torrent) error {
1233 req, err := http.NewRequest(http.MethodGet, source, nil)
1237 ctx, cancel := context.WithCancel(context.Background())
1247 req = req.WithContext(ctx)
1248 resp, err := http.DefaultClient.Do(req)
1252 mi, err := metainfo.Load(resp.Body)
1254 if ctx.Err() != nil {
1259 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1262 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1263 t, ok := cl.torrents[infoHash]
1265 err = fmt.Errorf("no such torrent")
1272 delete(cl.torrents, infoHash)
1276 func (cl *Client) allTorrentsCompleted() bool {
1277 for _, t := range cl.torrents {
1281 if !t.haveAllPieces() {
1288 // Returns true when all torrents are completely downloaded and false if the
1289 // client is stopped before that.
1290 func (cl *Client) WaitAll() bool {
1293 for !cl.allTorrentsCompleted() {
1294 if cl.closed.IsSet() {
1302 // Returns handles to all the torrents loaded in the Client.
1303 func (cl *Client) Torrents() []*Torrent {
1306 return cl.torrentsAsSlice()
1309 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1310 for _, t := range cl.torrents {
1311 ret = append(ret, t)
1316 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1317 spec, err := TorrentSpecFromMagnetUri(uri)
1321 T, _, err = cl.AddTorrentSpec(spec)
1325 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1326 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1330 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1331 mi, err := metainfo.LoadFromFile(filename)
1335 return cl.AddTorrent(mi)
1338 func (cl *Client) DhtServers() []DhtServer {
1339 return cl.dhtServers
1342 func (cl *Client) AddDhtNodes(nodes []string) {
1343 for _, n := range nodes {
1344 hmp := missinggo.SplitHostMaybePort(n)
1345 ip := net.ParseIP(hmp.Host)
1347 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1350 ni := krpc.NodeInfo{
1351 Addr: krpc.NodeAddr{
1356 cl.eachDhtServer(func(s DhtServer) {
1362 func (cl *Client) banPeerIP(ip net.IP) {
1363 cl.logger.Printf("banning ip %v", ip)
1364 if cl.badPeerIPs == nil {
1365 cl.badPeerIPs = make(map[string]struct{})
1367 cl.badPeerIPs[ip.String()] = struct{}{}
1370 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1376 PeerMaxRequests: 250,
1378 RemoteAddr: remoteAddr,
1381 connString: connString,
1383 writeBuffer: new(bytes.Buffer),
1384 callbacks: &cl.config.Callbacks,
1387 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1388 c.writerCond.L = cl.locker()
1389 c.setRW(connStatsReadWriter{nc, c})
1390 c.r = &rateLimitedReader{
1391 l: cl.config.DownloadRateLimiter,
1394 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1398 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1405 t.addPeers([]PeerInfo{{
1406 Addr: ipPortAddr{ip, port},
1407 Source: PeerSourceDhtAnnouncePeer,
1411 func firstNotNil(ips ...net.IP) net.IP {
1412 for _, ip := range ips {
1420 func (cl *Client) eachDialer(f func(Dialer) bool) {
1421 for _, s := range cl.dialers {
1428 func (cl *Client) eachListener(f func(Listener) bool) {
1429 for _, s := range cl.listeners {
1436 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1437 cl.eachListener(func(l Listener) bool {
1444 func (cl *Client) publicIp(peer net.IP) net.IP {
1445 // TODO: Use BEP 10 to determine how peers are seeing us.
1446 if peer.To4() != nil {
1448 cl.config.PublicIp4,
1449 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1454 cl.config.PublicIp6,
1455 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1459 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1460 l := cl.findListener(
1461 func(l Listener) bool {
1462 return f(addrIpOrNil(l.Addr()))
1468 return addrIpOrNil(l.Addr())
1471 // Our IP as a peer should see it.
1472 func (cl *Client) publicAddr(peer net.IP) IpPort {
1473 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1476 // ListenAddrs addresses currently being listened to.
1477 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1480 cl.eachListener(func(l Listener) bool {
1481 ret = append(ret, l.Addr())
1487 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1488 ipa, ok := tryIpPortFromNetAddr(addr)
1492 ip := maskIpForAcceptLimiting(ipa.IP)
1493 if cl.acceptLimiter == nil {
1494 cl.acceptLimiter = make(map[ipStr]int)
1496 cl.acceptLimiter[ipStr(ip.String())]++
1499 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1500 if ip4 := ip.To4(); ip4 != nil {
1501 return ip4.Mask(net.CIDRMask(24, 32))
1506 func (cl *Client) clearAcceptLimits() {
1507 cl.acceptLimiter = nil
1510 func (cl *Client) acceptLimitClearer() {
1513 case <-cl.closed.LockedChan(cl.locker()):
1515 case <-time.After(15 * time.Minute):
1517 cl.clearAcceptLimits()
1523 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1524 if cl.config.DisableAcceptRateLimiting {
1527 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1530 func (cl *Client) rLock() {
1534 func (cl *Client) rUnlock() {
1538 func (cl *Client) lock() {
1542 func (cl *Client) unlock() {
1546 func (cl *Client) locker() *lockWithDeferreds {
1550 func (cl *Client) String() string {
1551 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1554 // Returns connection-level aggregate stats at the Client level. See the comment on
1555 // TorrentStats.ConnStats.
1556 func (cl *Client) ConnStats() ConnStats {
1557 return cl.stats.Copy()