18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
33 "github.com/anacrolix/chansync"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/limiter"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
42 "github.com/anacrolix/torrent/tracker"
43 "github.com/anacrolix/torrent/webtorrent"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
55 closed chansync.SetOnce
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
81 activeAnnounceLimiter limiter.Instance
83 updateRequests chansync.BroadcastCond
88 func (cl *Client) BadPeerIPs() (ips []string) {
90 ips = cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() (ips []string) {
96 ips = make([]string, len(cl.badPeerIPs))
98 for k := range cl.badPeerIPs {
105 func (cl *Client) PeerID() PeerID {
109 // Returns the port number for the first listener that has one. No longer assumes that all port
110 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
112 func (cl *Client) LocalPort() (port int) {
113 cl.eachListener(func(l Listener) bool {
114 port = addrPortOrZero(l.Addr())
120 func writeDhtServerStatus(w io.Writer, s DhtServer) {
121 dhtStats := s.Stats()
122 fmt.Fprintf(w, " ID: %x\n", s.ID())
123 spew.Fdump(w, dhtStats)
126 // Writes out a human readable status of the client, such as for writing to a
128 func (cl *Client) WriteStatus(_w io.Writer) {
131 w := bufio.NewWriter(_w)
133 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
134 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
135 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
136 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
137 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
138 cl.eachDhtServer(func(s DhtServer) {
139 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
140 writeDhtServerStatus(w, s)
142 spew.Fdump(w, &cl.stats)
143 torrentsSlice := cl.torrentsAsSlice()
144 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
146 sort.Slice(torrentsSlice, func(l, r int) bool {
147 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
149 for _, t := range torrentsSlice {
151 fmt.Fprint(w, "<unknown name>")
153 fmt.Fprint(w, t.name())
159 "%f%% of %d bytes (%s)",
160 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
162 humanize.Bytes(uint64(*t.length)))
164 w.WriteString("<missing metainfo>")
172 // Filters things that are less than warning from UPnP discovery.
173 func upnpDiscoverLogFilter(m log.Msg) bool {
174 level, ok := m.GetLevel()
175 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
178 func (cl *Client) initLogger() {
179 logger := cl.config.Logger
182 if !cl.config.Debug {
183 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
186 cl.logger = logger.WithValues(cl)
189 func (cl *Client) announceKey() int32 {
190 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
193 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
195 cfg = NewDefaultClientConfig()
205 dopplegangerAddrs: make(map[string]struct{}),
206 torrents: make(map[metainfo.Hash]*Torrent),
207 dialRateLimiter: rate.NewLimiter(10, 10),
209 cl.activeAnnounceLimiter.SlotsPerKey = 2
210 go cl.acceptLimitClearer()
218 cl.event.L = cl.locker()
219 storageImpl := cfg.DefaultStorage
220 if storageImpl == nil {
221 // We'd use mmap by default but HFS+ doesn't support sparse files.
222 storageImplCloser := storage.NewFile(cfg.DataDir)
223 cl.onClose = append(cl.onClose, func() {
224 if err := storageImplCloser.Close(); err != nil {
225 cl.logger.Printf("error closing default storage: %s", err)
228 storageImpl = storageImplCloser
230 cl.defaultStorage = storage.NewClient(storageImpl)
231 if cfg.IPBlocklist != nil {
232 cl.ipBlockList = cfg.IPBlocklist
235 if cfg.PeerID != "" {
236 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
238 o := copy(cl.peerID[:], cfg.Bep20)
239 _, err = rand.Read(cl.peerID[o:])
241 panic("error generating peer id")
245 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
253 for _, _s := range sockets {
254 s := _s // Go is fucking retarded.
255 cl.onClose = append(cl.onClose, func() { s.Close() })
256 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
257 cl.dialers = append(cl.dialers, s)
258 cl.listeners = append(cl.listeners, s)
259 if cl.config.AcceptPeerConnections {
260 go cl.acceptConnections(s)
267 for _, s := range sockets {
268 if pc, ok := s.(net.PacketConn); ok {
269 ds, err := cl.NewAnacrolixDhtServer(pc)
273 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
274 cl.onClose = append(cl.onClose, func() { ds.Close() })
279 cl.websocketTrackers = websocketTrackers{
282 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
285 t, ok := cl.torrents[infoHash]
287 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
289 return t.announceRequest(event), nil
291 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
294 t, ok := cl.torrents[dcc.InfoHash]
296 cl.logger.WithDefaultLevel(log.Warning).Printf(
297 "got webrtc conn for unloaded torrent with infohash %x",
303 go t.onWebRtcConn(dc, dcc)
312 func (cl *Client) AddDhtServer(d DhtServer) {
313 cl.dhtServers = append(cl.dhtServers, d)
316 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
317 // given address for any Torrent.
318 func (cl *Client) AddDialer(d Dialer) {
321 cl.dialers = append(cl.dialers, d)
322 for _, t := range cl.torrents {
327 func (cl *Client) Listeners() []Listener {
331 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
333 func (cl *Client) AddListener(l Listener) {
334 cl.listeners = append(cl.listeners, l)
335 if cl.config.AcceptPeerConnections {
336 go cl.acceptConnections(l)
340 func (cl *Client) firewallCallback(net.Addr) bool {
342 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
345 torrent.Add("connections firewalled", 1)
347 torrent.Add("connections not firewalled", 1)
352 func (cl *Client) listenOnNetwork(n network) bool {
353 if n.Ipv4 && cl.config.DisableIPv4 {
356 if n.Ipv6 && cl.config.DisableIPv6 {
359 if n.Tcp && cl.config.DisableTCP {
362 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
368 func (cl *Client) listenNetworks() (ns []network) {
369 for _, n := range allPeerNetworks {
370 if cl.listenOnNetwork(n) {
377 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
378 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
379 cfg := dht.ServerConfig{
380 IPBlocklist: cl.ipBlockList,
382 OnAnnouncePeer: cl.onDHTAnnouncePeer,
383 PublicIP: func() net.IP {
384 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
385 return cl.config.PublicIp6
387 return cl.config.PublicIp4
389 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
390 OnQuery: cl.config.DHTOnQuery,
391 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
393 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
396 s, err = dht.NewServer(&cfg)
399 ts, err := s.Bootstrap()
401 cl.logger.Printf("error bootstrapping dht: %s", err)
403 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
409 func (cl *Client) Closed() chansync.Done {
410 return cl.closed.Done()
413 func (cl *Client) eachDhtServer(f func(DhtServer)) {
414 for _, ds := range cl.dhtServers {
419 // Stops the client. All connections to peers are closed and all activity will
421 func (cl *Client) Close() {
422 var closeGroup sync.WaitGroup // WaitGroup for any concurrent cleanup to complete before returning.
423 defer closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
427 for _, t := range cl.torrents {
430 for i := range cl.onClose {
431 cl.onClose[len(cl.onClose)-1-i]()
436 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
437 if cl.ipBlockList == nil {
440 return cl.ipBlockList.Lookup(ip)
443 func (cl *Client) ipIsBlocked(ip net.IP) bool {
444 _, blocked := cl.ipBlockRange(ip)
448 func (cl *Client) wantConns() bool {
449 for _, t := range cl.torrents {
457 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
458 func (cl *Client) rejectAccepted(conn net.Conn) error {
460 return errors.New("don't want conns right now")
462 ra := conn.RemoteAddr()
463 if rip := addrIpOrNil(ra); rip != nil {
464 if cl.config.DisableIPv4Peers && rip.To4() != nil {
465 return errors.New("ipv4 peers disabled")
467 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
468 return errors.New("ipv4 disabled")
471 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
472 return errors.New("ipv6 disabled")
474 if cl.rateLimitAccept(rip) {
475 return errors.New("source IP accepted rate limited")
477 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
478 return errors.New("bad source addr")
484 func (cl *Client) acceptConnections(l Listener) {
486 conn, err := l.Accept()
487 torrent.Add("client listener accepts", 1)
488 conn = pproffd.WrapNetConn(conn)
490 closed := cl.closed.IsSet()
493 reject = cl.rejectAccepted(conn)
503 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
508 torrent.Add("rejected accepted connections", 1)
509 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
512 go cl.incomingConnection(conn)
514 log.Fmsg("accepted %q connection at %q from %q",
518 ).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
520 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
521 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
526 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
527 func regularNetConnPeerConnConnString(nc net.Conn) string {
528 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
531 func (cl *Client) incomingConnection(nc net.Conn) {
533 if tc, ok := nc.(*net.TCPConn); ok {
536 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
537 regularNetConnPeerConnConnString(nc))
543 c.Discovery = PeerSourceIncoming
544 cl.runReceivedConn(c)
547 // Returns a handle to the given torrent, if it's present in the client.
548 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
551 t, ok = cl.torrents[ih]
555 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
556 return cl.torrents[ih]
559 type DialResult struct {
564 func countDialResult(err error) {
566 torrent.Add("successful dials", 1)
568 torrent.Add("unsuccessful dials", 1)
572 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
573 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
574 if ret < minDialTimeout {
580 // Returns whether an address is known to connect to a client with our own ID.
581 func (cl *Client) dopplegangerAddr(addr string) bool {
582 _, ok := cl.dopplegangerAddrs[addr]
586 // Returns a connection over UTP or TCP, whichever is first to connect.
587 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
588 return DialFirst(ctx, addr, cl.dialers)
591 // Returns a connection over UTP or TCP, whichever is first to connect.
592 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
594 t := perf.NewTimer(perf.CallerName(0))
597 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
599 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
603 ctx, cancel := context.WithCancel(ctx)
604 // As soon as we return one connection, cancel the others.
607 resCh := make(chan DialResult, left)
608 for _, _s := range dialers {
613 dialFromSocket(ctx, s, addr),
618 // Wait for a successful connection.
620 defer perf.ScopeTimer()()
621 for ; left > 0 && res.Conn == nil; left-- {
625 // There are still incompleted dials.
627 for ; left > 0; left-- {
628 conn := (<-resCh).Conn
635 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
640 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
641 c, err := s.Dial(ctx, addr)
642 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
643 // it now in case we close the connection forthwith.
644 if tc, ok := c.(*net.TCPConn); ok {
651 func forgettableDialError(err error) bool {
652 return strings.Contains(err.Error(), "no suitable address found")
655 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
656 if _, ok := t.halfOpen[addr]; !ok {
657 panic("invariant broken")
659 delete(t.halfOpen, addr)
661 for _, t := range cl.torrents {
666 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
667 // for valid reasons.
668 func (cl *Client) initiateProtocolHandshakes(
672 outgoing, encryptHeader bool,
673 remoteAddr PeerRemoteAddr,
674 network, connString string,
676 c *PeerConn, err error,
678 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
679 c.headerEncrypted = encryptHeader
680 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
682 dl, ok := ctx.Deadline()
686 err = nc.SetDeadline(dl)
690 err = cl.initiateHandshakes(c, t)
694 // Returns nil connection and nil error if no connection could be established for valid reasons.
695 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
696 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
699 return t.dialTimeout()
702 dr := cl.dialFirst(dialCtx, addr.String())
705 if dialCtx.Err() != nil {
706 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
708 return nil, errors.New("dial failed")
710 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
717 // Returns nil connection and nil error if no connection could be established
718 // for valid reasons.
719 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
720 torrent.Add("establish outgoing connection", 1)
721 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
722 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
724 torrent.Add("initiated conn with preferred header obfuscation", 1)
727 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
728 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
729 // We should have just tried with the preferred header obfuscation. If it was required,
730 // there's nothing else to try.
733 // Try again with encryption if we didn't earlier, or without if we did.
734 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
736 torrent.Add("initiated conn with fallback header obfuscation", 1)
738 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
742 // Called to dial out and run a connection. The addr we're given is already
743 // considered half-open.
744 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
745 cl.dialRateLimiter.Wait(context.Background())
746 c, err := cl.establishOutgoingConn(t, addr)
749 // Don't release lock between here and addPeerConn, unless it's for
751 cl.noLongerHalfOpen(t, addr.String())
754 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
761 t.runHandshookConnLoggingErr(c)
764 // The port number for incoming peer connections. 0 if the client isn't listening.
765 func (cl *Client) incomingPeerPort() int {
766 return cl.LocalPort()
769 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
770 if c.headerEncrypted {
773 rw, c.cryptoMethod, err = mse.InitiateHandshake(
780 cl.config.CryptoProvides,
784 return fmt.Errorf("header obfuscation handshake: %w", err)
787 ih, err := cl.connBtHandshake(c, &t.infoHash)
789 return fmt.Errorf("bittorrent protocol handshake: %w", err)
791 if ih != t.infoHash {
792 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
797 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
798 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
799 func (cl *Client) forSkeys(f func([]byte) bool) {
802 if false { // Emulate the bug from #114
804 for ih := range cl.torrents {
808 for range cl.torrents {
815 for ih := range cl.torrents {
822 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
823 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
829 // Do encryption and bittorrent handshakes as receiver.
830 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
831 defer perf.ScopeTimerErr(&err)()
833 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
835 if err == nil || err == mse.ErrNoSecretKeyMatch {
836 if c.headerEncrypted {
837 torrent.Add("handshakes received encrypted", 1)
839 torrent.Add("handshakes received unencrypted", 1)
842 torrent.Add("handshakes received with error while handling encryption", 1)
845 if err == mse.ErrNoSecretKeyMatch {
850 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
851 err = errors.New("connection does not have required header obfuscation")
854 ih, err := cl.connBtHandshake(c, nil)
856 return nil, fmt.Errorf("during bt handshake: %w", err)
864 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
865 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
870 c.PeerExtensionBytes = res.PeerExtensionBits
871 c.PeerID = res.PeerID
872 c.completedHandshake = time.Now()
873 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
879 func (cl *Client) runReceivedConn(c *PeerConn) {
880 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
884 t, err := cl.receiveHandshakes(c)
887 "error receiving handshakes on %v: %s", c, err,
888 ).SetLevel(log.Debug).
890 "network", c.Network,
892 torrent.Add("error receiving handshake", 1)
894 cl.onBadAccept(c.RemoteAddr)
899 torrent.Add("received handshake for unloaded torrent", 1)
900 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
902 cl.onBadAccept(c.RemoteAddr)
906 torrent.Add("received handshake for loaded torrent", 1)
909 t.runHandshookConnLoggingErr(c)
912 // Client lock must be held before entering this.
913 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
915 if c.PeerID == cl.peerID {
918 addr := c.conn.RemoteAddr().String()
919 cl.dopplegangerAddrs[addr] = struct{}{}
921 // Because the remote address is not necessarily the same as its client's torrent listen
922 // address, we won't record the remote address as a doppleganger. Instead, the initiator
923 // can record *us* as the doppleganger.
925 return errors.New("local and remote peer ids are the same")
927 c.conn.SetWriteDeadline(time.Time{})
928 c.r = deadlineReader{c.conn, c.r}
929 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
930 if connIsIpv6(c.conn) {
931 torrent.Add("completed handshake over ipv6", 1)
933 if err := t.addPeerConn(c); err != nil {
934 return fmt.Errorf("adding connection: %w", err)
936 defer t.dropConnection(c)
938 cl.sendInitialMessages(c, t)
939 err := c.mainReadLoop()
941 return fmt.Errorf("main read loop: %w", err)
946 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
947 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
948 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
949 const localClientReqq = 1 << 5
951 // See the order given in Transmission's tr_peerMsgsNew.
952 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
953 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
954 conn.write(pp.Message{
956 ExtendedID: pp.HandshakeExtendedID,
957 ExtendedPayload: func() []byte {
958 msg := pp.ExtendedHandshakeMessage{
959 M: map[pp.ExtensionName]pp.ExtensionNumber{
960 pp.ExtensionNameMetadata: metadataExtendedId,
962 V: cl.config.ExtendedHandshakeClientVersion,
963 Reqq: localClientReqq,
964 YourIp: pp.CompactIp(conn.remoteIp()),
965 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
966 Port: cl.incomingPeerPort(),
967 MetadataSize: torrent.metadataSize(),
968 // TODO: We can figured these out specific to the socket
970 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
971 Ipv6: cl.config.PublicIp6.To16(),
973 if !cl.config.DisablePEX {
974 msg.M[pp.ExtensionNamePex] = pexExtendedId
976 return bencode.MustMarshal(msg)
981 if conn.fastEnabled() {
982 if torrent.haveAllPieces() {
983 conn.write(pp.Message{Type: pp.HaveAll})
984 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
986 } else if !torrent.haveAnyPieces() {
987 conn.write(pp.Message{Type: pp.HaveNone})
988 conn.sentHaves.Clear()
994 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
995 conn.write(pp.Message{
1002 func (cl *Client) dhtPort() (ret uint16) {
1003 if len(cl.dhtServers) == 0 {
1006 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1009 func (cl *Client) haveDhtServer() bool {
1010 return len(cl.dhtServers) > 0
1013 // Process incoming ut_metadata message.
1014 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1015 var d pp.ExtendedMetadataRequestMsg
1016 err := bencode.Unmarshal(payload, &d)
1017 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1018 } else if err != nil {
1019 return fmt.Errorf("error unmarshalling bencode: %s", err)
1023 case pp.DataMetadataExtensionMsgType:
1024 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1025 if !c.requestedMetadataPiece(piece) {
1026 return fmt.Errorf("got unexpected piece %d", piece)
1028 c.metadataRequests[piece] = false
1029 begin := len(payload) - d.PieceSize()
1030 if begin < 0 || begin >= len(payload) {
1031 return fmt.Errorf("data has bad offset in payload: %d", begin)
1033 t.saveMetadataPiece(piece, payload[begin:])
1034 c.lastUsefulChunkReceived = time.Now()
1035 err = t.maybeCompleteMetadata()
1037 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1038 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1039 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1040 // log consumers can filter for this message.
1041 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1044 case pp.RequestMetadataExtensionMsgType:
1045 if !t.haveMetadataPiece(piece) {
1046 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1049 start := (1 << 14) * piece
1050 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1051 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1053 case pp.RejectMetadataExtensionMsgType:
1056 return errors.New("unknown msg_type value")
1060 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1061 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1062 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1067 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1071 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1074 if _, ok := cl.ipBlockRange(ip); ok {
1077 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1083 // Return a Torrent ready for insertion into a Client.
1084 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1085 // use provided storage, if provided
1086 storageClient := cl.defaultStorage
1087 if specStorage != nil {
1088 storageClient = storage.NewClient(specStorage)
1094 peers: prioritizedPeers{
1096 getPrio: func(p PeerInfo) peerPriority {
1098 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1101 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1103 halfOpen: make(map[string]PeerInfo),
1104 pieceStateChanges: pubsub.NewPubSub(),
1106 storageOpener: storageClient,
1107 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1109 metadataChanged: sync.Cond{
1112 webSeeds: make(map[string]*Peer),
1114 t.networkingEnabled.Set()
1115 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1116 t.logger = cl.logger.WithContextValue(t)
1117 t.setChunkSize(defaultChunkSize)
1121 // A file-like handle to some torrent data resource.
1122 type Handle interface {
1129 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1130 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1133 // Adds a torrent by InfoHash with a custom Storage implementation.
1134 // If the torrent already exists then this Storage is ignored and the
1135 // existing torrent returned with `new` set to `false`
1136 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1139 t, ok := cl.torrents[infoHash]
1145 t = cl.newTorrent(infoHash, specStorage)
1146 cl.eachDhtServer(func(s DhtServer) {
1147 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1148 go t.dhtAnnouncer(s)
1151 cl.torrents[infoHash] = t
1152 cl.clearAcceptLimits()
1153 t.updateWantPeersEvent()
1154 // Tickle Client.waitAccept, new torrent may want conns.
1155 cl.event.Broadcast()
1159 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1160 // Torrent.MergeSpec.
1161 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1162 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1163 err = t.MergeSpec(spec)
1164 if err != nil && new {
1170 type stringAddr string
1172 var _ net.Addr = stringAddr("")
1174 func (stringAddr) Network() string { return "" }
1175 func (me stringAddr) String() string { return string(me) }
1177 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1178 // spec.DisallowDataDownload/Upload will be read and applied
1179 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1180 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1181 if spec.DisplayName != "" {
1182 t.SetDisplayName(spec.DisplayName)
1184 if spec.InfoBytes != nil {
1185 err := t.SetInfoBytes(spec.InfoBytes)
1191 cl.AddDhtNodes(spec.DhtNodes)
1194 useTorrentSources(spec.Sources, t)
1195 for _, url := range spec.Webseeds {
1198 for _, peerAddr := range spec.PeerAddrs {
1200 Addr: stringAddr(peerAddr),
1201 Source: PeerSourceDirect,
1205 if spec.ChunkSize != 0 {
1206 t.setChunkSize(pp.Integer(spec.ChunkSize))
1208 t.addTrackers(spec.Trackers)
1210 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1211 t.dataUploadDisallowed = spec.DisallowDataUpload
1215 func useTorrentSources(sources []string, t *Torrent) {
1216 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1217 ctx := context.Background()
1218 for i := 0; i < len(sources); i += 1 {
1221 if err := useTorrentSource(ctx, s, t); err != nil {
1222 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1224 t.logger.Printf("successfully used source %q", s)
1230 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1231 ctx, cancel := context.WithCancel(ctx)
1241 var req *http.Request
1242 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1245 var resp *http.Response
1246 if resp, err = http.DefaultClient.Do(req); err != nil {
1249 var mi metainfo.MetaInfo
1250 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1253 if ctx.Err() != nil {
1258 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1261 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1262 t, ok := cl.torrents[infoHash]
1264 err = fmt.Errorf("no such torrent")
1267 var wg sync.WaitGroup
1273 delete(cl.torrents, infoHash)
1277 func (cl *Client) allTorrentsCompleted() bool {
1278 for _, t := range cl.torrents {
1282 if !t.haveAllPieces() {
1289 // Returns true when all torrents are completely downloaded and false if the
1290 // client is stopped before that.
1291 func (cl *Client) WaitAll() bool {
1294 for !cl.allTorrentsCompleted() {
1295 if cl.closed.IsSet() {
1303 // Returns handles to all the torrents loaded in the Client.
1304 func (cl *Client) Torrents() []*Torrent {
1307 return cl.torrentsAsSlice()
1310 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1311 for _, t := range cl.torrents {
1312 ret = append(ret, t)
1317 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1318 spec, err := TorrentSpecFromMagnetUri(uri)
1322 T, _, err = cl.AddTorrentSpec(spec)
1326 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1327 ts, err := TorrentSpecFromMetaInfoErr(mi)
1331 T, _, err = cl.AddTorrentSpec(ts)
1335 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1336 mi, err := metainfo.LoadFromFile(filename)
1340 return cl.AddTorrent(mi)
1343 func (cl *Client) DhtServers() []DhtServer {
1344 return cl.dhtServers
1347 func (cl *Client) AddDhtNodes(nodes []string) {
1348 for _, n := range nodes {
1349 hmp := missinggo.SplitHostMaybePort(n)
1350 ip := net.ParseIP(hmp.Host)
1352 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1355 ni := krpc.NodeInfo{
1356 Addr: krpc.NodeAddr{
1361 cl.eachDhtServer(func(s DhtServer) {
1367 func (cl *Client) banPeerIP(ip net.IP) {
1368 cl.logger.Printf("banning ip %v", ip)
1369 if cl.badPeerIPs == nil {
1370 cl.badPeerIPs = make(map[string]struct{})
1372 cl.badPeerIPs[ip.String()] = struct{}{}
1375 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1384 PeerMaxRequests: 250,
1386 RemoteAddr: remoteAddr,
1388 callbacks: &cl.config.Callbacks,
1390 connString: connString,
1394 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1395 c.setRW(connStatsReadWriter{nc, c})
1396 c.r = &rateLimitedReader{
1397 l: cl.config.DownloadRateLimiter,
1400 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1401 for _, f := range cl.config.Callbacks.NewPeer {
1407 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1414 t.addPeers([]PeerInfo{{
1415 Addr: ipPortAddr{ip, port},
1416 Source: PeerSourceDhtAnnouncePeer,
1420 func firstNotNil(ips ...net.IP) net.IP {
1421 for _, ip := range ips {
1429 func (cl *Client) eachListener(f func(Listener) bool) {
1430 for _, s := range cl.listeners {
1437 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1438 cl.eachListener(func(l Listener) bool {
1445 func (cl *Client) publicIp(peer net.IP) net.IP {
1446 // TODO: Use BEP 10 to determine how peers are seeing us.
1447 if peer.To4() != nil {
1449 cl.config.PublicIp4,
1450 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1455 cl.config.PublicIp6,
1456 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1460 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1461 l := cl.findListener(
1462 func(l Listener) bool {
1463 return f(addrIpOrNil(l.Addr()))
1469 return addrIpOrNil(l.Addr())
1472 // Our IP as a peer should see it.
1473 func (cl *Client) publicAddr(peer net.IP) IpPort {
1474 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1477 // ListenAddrs addresses currently being listened to.
1478 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1481 cl.eachListener(func(l Listener) bool {
1482 ret = append(ret, l.Addr())
1488 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1489 ipa, ok := tryIpPortFromNetAddr(addr)
1493 ip := maskIpForAcceptLimiting(ipa.IP)
1494 if cl.acceptLimiter == nil {
1495 cl.acceptLimiter = make(map[ipStr]int)
1497 cl.acceptLimiter[ipStr(ip.String())]++
1500 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1501 if ip4 := ip.To4(); ip4 != nil {
1502 return ip4.Mask(net.CIDRMask(24, 32))
1507 func (cl *Client) clearAcceptLimits() {
1508 cl.acceptLimiter = nil
1511 func (cl *Client) acceptLimitClearer() {
1514 case <-cl.closed.Done():
1516 case <-time.After(15 * time.Minute):
1518 cl.clearAcceptLimits()
1524 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1525 if cl.config.DisableAcceptRateLimiting {
1528 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1531 func (cl *Client) rLock() {
1535 func (cl *Client) rUnlock() {
1539 func (cl *Client) lock() {
1543 func (cl *Client) unlock() {
1547 func (cl *Client) locker() *lockWithDeferreds {
1551 func (cl *Client) String() string {
1552 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1555 // Returns connection-level aggregate stats at the Client level. See the comment on
1556 // TorrentStats.ConnStats.
1557 func (cl *Client) ConnStats() ConnStats {
1558 return cl.stats.Copy()