18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
33 "github.com/anacrolix/chansync"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/limiter"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
42 "github.com/anacrolix/torrent/tracker"
43 "github.com/anacrolix/torrent/webtorrent"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
55 closed chansync.SetOnce
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
81 activeAnnounceLimiter limiter.Instance
83 updateRequests chansync.BroadcastCond
88 func (cl *Client) BadPeerIPs() (ips []string) {
90 ips = cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() (ips []string) {
96 ips = make([]string, len(cl.badPeerIPs))
98 for k := range cl.badPeerIPs {
105 func (cl *Client) PeerID() PeerID {
109 // Returns the port number for the first listener that has one. No longer assumes that all port
110 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
112 func (cl *Client) LocalPort() (port int) {
113 for i := 0; i < len(cl.listeners); i += 1 {
114 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
121 func writeDhtServerStatus(w io.Writer, s DhtServer) {
122 dhtStats := s.Stats()
123 fmt.Fprintf(w, " ID: %x\n", s.ID())
124 spew.Fdump(w, dhtStats)
127 // Writes out a human readable status of the client, such as for writing to a
129 func (cl *Client) WriteStatus(_w io.Writer) {
132 w := bufio.NewWriter(_w)
134 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
135 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
136 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
137 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
138 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
139 cl.eachDhtServer(func(s DhtServer) {
140 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
141 writeDhtServerStatus(w, s)
143 spew.Fdump(w, &cl.stats)
144 torrentsSlice := cl.torrentsAsSlice()
145 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
147 sort.Slice(torrentsSlice, func(l, r int) bool {
148 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
150 for _, t := range torrentsSlice {
152 fmt.Fprint(w, "<unknown name>")
154 fmt.Fprint(w, t.name())
160 "%f%% of %d bytes (%s)",
161 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
163 humanize.Bytes(uint64(*t.length)))
165 w.WriteString("<missing metainfo>")
173 // Filters things that are less than warning from UPnP discovery.
174 func upnpDiscoverLogFilter(m log.Msg) bool {
175 level, ok := m.GetLevel()
176 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
179 func (cl *Client) initLogger() {
180 logger := cl.config.Logger
183 if !cl.config.Debug {
184 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
187 cl.logger = logger.WithValues(cl)
190 func (cl *Client) announceKey() int32 {
191 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
194 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
195 func (cl *Client) init(cfg *ClientConfig) {
197 cl.dopplegangerAddrs = make(map[string]struct{})
198 cl.torrents = make(map[metainfo.Hash]*Torrent)
199 cl.dialRateLimiter = rate.NewLimiter(10, 10)
200 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 cl.event.L = cl.locker()
203 cl.ipBlockList = cfg.IPBlocklist
206 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
208 cfg = NewDefaultClientConfig()
214 go cl.acceptLimitClearer()
223 storageImpl := cfg.DefaultStorage
224 if storageImpl == nil {
225 // We'd use mmap by default but HFS+ doesn't support sparse files.
226 storageImplCloser := storage.NewFile(cfg.DataDir)
227 cl.onClose = append(cl.onClose, func() {
228 if err := storageImplCloser.Close(); err != nil {
229 cl.logger.Printf("error closing default storage: %s", err)
232 storageImpl = storageImplCloser
234 cl.defaultStorage = storage.NewClient(storageImpl)
236 if cfg.PeerID != "" {
237 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
239 o := copy(cl.peerID[:], cfg.Bep20)
240 _, err = rand.Read(cl.peerID[o:])
242 panic("error generating peer id")
246 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
254 for _, _s := range sockets {
255 s := _s // Go is fucking retarded.
256 cl.onClose = append(cl.onClose, func() { s.Close() })
257 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
258 cl.dialers = append(cl.dialers, s)
259 cl.listeners = append(cl.listeners, s)
260 if cl.config.AcceptPeerConnections {
261 go cl.acceptConnections(s)
268 for _, s := range sockets {
269 if pc, ok := s.(net.PacketConn); ok {
270 ds, err := cl.NewAnacrolixDhtServer(pc)
274 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
275 cl.onClose = append(cl.onClose, func() { ds.Close() })
280 cl.websocketTrackers = websocketTrackers{
283 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
286 t, ok := cl.torrents[infoHash]
288 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
290 return t.announceRequest(event), nil
292 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
295 t, ok := cl.torrents[dcc.InfoHash]
297 cl.logger.WithDefaultLevel(log.Warning).Printf(
298 "got webrtc conn for unloaded torrent with infohash %x",
304 go t.onWebRtcConn(dc, dcc)
313 func (cl *Client) AddDhtServer(d DhtServer) {
314 cl.dhtServers = append(cl.dhtServers, d)
317 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
318 // given address for any Torrent.
319 func (cl *Client) AddDialer(d Dialer) {
322 cl.dialers = append(cl.dialers, d)
323 for _, t := range cl.torrents {
328 func (cl *Client) Listeners() []Listener {
332 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
334 func (cl *Client) AddListener(l Listener) {
335 cl.listeners = append(cl.listeners, l)
336 if cl.config.AcceptPeerConnections {
337 go cl.acceptConnections(l)
341 func (cl *Client) firewallCallback(net.Addr) bool {
343 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
346 torrent.Add("connections firewalled", 1)
348 torrent.Add("connections not firewalled", 1)
353 func (cl *Client) listenOnNetwork(n network) bool {
354 if n.Ipv4 && cl.config.DisableIPv4 {
357 if n.Ipv6 && cl.config.DisableIPv6 {
360 if n.Tcp && cl.config.DisableTCP {
363 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
369 func (cl *Client) listenNetworks() (ns []network) {
370 for _, n := range allPeerNetworks {
371 if cl.listenOnNetwork(n) {
378 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
379 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
380 cfg := dht.ServerConfig{
381 IPBlocklist: cl.ipBlockList,
383 OnAnnouncePeer: cl.onDHTAnnouncePeer,
384 PublicIP: func() net.IP {
385 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
386 return cl.config.PublicIp6
388 return cl.config.PublicIp4
390 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
391 OnQuery: cl.config.DHTOnQuery,
392 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
394 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
397 s, err = dht.NewServer(&cfg)
400 ts, err := s.Bootstrap()
402 cl.logger.Printf("error bootstrapping dht: %s", err)
404 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
410 func (cl *Client) Closed() chansync.Done {
411 return cl.closed.Done()
414 func (cl *Client) eachDhtServer(f func(DhtServer)) {
415 for _, ds := range cl.dhtServers {
420 // Stops the client. All connections to peers are closed and all activity will
422 func (cl *Client) Close() {
424 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
427 for _, t := range cl.torrents {
431 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
433 for i := range cl.onClose {
434 cl.onClose[len(cl.onClose)-1-i]()
439 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
440 if cl.ipBlockList == nil {
443 return cl.ipBlockList.Lookup(ip)
446 func (cl *Client) ipIsBlocked(ip net.IP) bool {
447 _, blocked := cl.ipBlockRange(ip)
451 func (cl *Client) wantConns() bool {
452 for _, t := range cl.torrents {
460 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
461 func (cl *Client) rejectAccepted(conn net.Conn) error {
463 return errors.New("don't want conns right now")
465 ra := conn.RemoteAddr()
466 if rip := addrIpOrNil(ra); rip != nil {
467 if cl.config.DisableIPv4Peers && rip.To4() != nil {
468 return errors.New("ipv4 peers disabled")
470 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
471 return errors.New("ipv4 disabled")
474 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
475 return errors.New("ipv6 disabled")
477 if cl.rateLimitAccept(rip) {
478 return errors.New("source IP accepted rate limited")
480 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
481 return errors.New("bad source addr")
487 func (cl *Client) acceptConnections(l Listener) {
489 conn, err := l.Accept()
490 torrent.Add("client listener accepts", 1)
491 conn = pproffd.WrapNetConn(conn)
493 closed := cl.closed.IsSet()
496 reject = cl.rejectAccepted(conn)
506 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
511 torrent.Add("rejected accepted connections", 1)
512 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
515 go cl.incomingConnection(conn)
517 log.Fmsg("accepted %q connection at %q from %q",
521 ).SetLevel(log.Debug).Log(cl.logger)
522 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
523 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
524 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
529 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
530 func regularNetConnPeerConnConnString(nc net.Conn) string {
531 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
534 func (cl *Client) incomingConnection(nc net.Conn) {
536 if tc, ok := nc.(*net.TCPConn); ok {
539 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
540 regularNetConnPeerConnConnString(nc))
546 c.Discovery = PeerSourceIncoming
547 cl.runReceivedConn(c)
550 // Returns a handle to the given torrent, if it's present in the client.
551 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
554 t, ok = cl.torrents[ih]
558 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
559 return cl.torrents[ih]
562 type DialResult struct {
567 func countDialResult(err error) {
569 torrent.Add("successful dials", 1)
571 torrent.Add("unsuccessful dials", 1)
575 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
576 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
577 if ret < minDialTimeout {
583 // Returns whether an address is known to connect to a client with our own ID.
584 func (cl *Client) dopplegangerAddr(addr string) bool {
585 _, ok := cl.dopplegangerAddrs[addr]
589 // Returns a connection over UTP or TCP, whichever is first to connect.
590 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
591 return DialFirst(ctx, addr, cl.dialers)
594 // Returns a connection over UTP or TCP, whichever is first to connect.
595 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
597 t := perf.NewTimer(perf.CallerName(0))
600 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
602 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
606 ctx, cancel := context.WithCancel(ctx)
607 // As soon as we return one connection, cancel the others.
610 resCh := make(chan DialResult, left)
611 for _, _s := range dialers {
616 dialFromSocket(ctx, s, addr),
621 // Wait for a successful connection.
623 defer perf.ScopeTimer()()
624 for ; left > 0 && res.Conn == nil; left-- {
628 // There are still incompleted dials.
630 for ; left > 0; left-- {
631 conn := (<-resCh).Conn
638 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
643 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
644 c, err := s.Dial(ctx, addr)
645 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
646 // it now in case we close the connection forthwith.
647 if tc, ok := c.(*net.TCPConn); ok {
654 func forgettableDialError(err error) bool {
655 return strings.Contains(err.Error(), "no suitable address found")
658 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
659 if _, ok := t.halfOpen[addr]; !ok {
660 panic("invariant broken")
662 delete(t.halfOpen, addr)
664 for _, t := range cl.torrents {
669 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
670 // for valid reasons.
671 func (cl *Client) initiateProtocolHandshakes(
675 outgoing, encryptHeader bool,
676 remoteAddr PeerRemoteAddr,
677 network, connString string,
679 c *PeerConn, err error,
681 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
682 c.headerEncrypted = encryptHeader
683 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
685 dl, ok := ctx.Deadline()
689 err = nc.SetDeadline(dl)
693 err = cl.initiateHandshakes(c, t)
697 // Returns nil connection and nil error if no connection could be established for valid reasons.
698 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
699 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
702 return t.dialTimeout()
705 dr := cl.dialFirst(dialCtx, addr.String())
708 if dialCtx.Err() != nil {
709 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
711 return nil, errors.New("dial failed")
713 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
720 // Returns nil connection and nil error if no connection could be established
721 // for valid reasons.
722 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
723 torrent.Add("establish outgoing connection", 1)
724 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
725 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
727 torrent.Add("initiated conn with preferred header obfuscation", 1)
730 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
731 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
732 // We should have just tried with the preferred header obfuscation. If it was required,
733 // there's nothing else to try.
736 // Try again with encryption if we didn't earlier, or without if we did.
737 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
739 torrent.Add("initiated conn with fallback header obfuscation", 1)
741 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
745 // Called to dial out and run a connection. The addr we're given is already
746 // considered half-open.
747 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
748 cl.dialRateLimiter.Wait(context.Background())
749 c, err := cl.establishOutgoingConn(t, addr)
752 // Don't release lock between here and addPeerConn, unless it's for
754 cl.noLongerHalfOpen(t, addr.String())
757 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
764 t.runHandshookConnLoggingErr(c)
767 // The port number for incoming peer connections. 0 if the client isn't listening.
768 func (cl *Client) incomingPeerPort() int {
769 return cl.LocalPort()
772 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
773 if c.headerEncrypted {
776 rw, c.cryptoMethod, err = mse.InitiateHandshake(
783 cl.config.CryptoProvides,
787 return fmt.Errorf("header obfuscation handshake: %w", err)
790 ih, err := cl.connBtHandshake(c, &t.infoHash)
792 return fmt.Errorf("bittorrent protocol handshake: %w", err)
794 if ih != t.infoHash {
795 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
800 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
801 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
802 func (cl *Client) forSkeys(f func([]byte) bool) {
805 if false { // Emulate the bug from #114
807 for ih := range cl.torrents {
811 for range cl.torrents {
818 for ih := range cl.torrents {
825 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
826 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
832 // Do encryption and bittorrent handshakes as receiver.
833 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
834 defer perf.ScopeTimerErr(&err)()
836 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
838 if err == nil || err == mse.ErrNoSecretKeyMatch {
839 if c.headerEncrypted {
840 torrent.Add("handshakes received encrypted", 1)
842 torrent.Add("handshakes received unencrypted", 1)
845 torrent.Add("handshakes received with error while handling encryption", 1)
848 if err == mse.ErrNoSecretKeyMatch {
853 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
854 err = errors.New("connection does not have required header obfuscation")
857 ih, err := cl.connBtHandshake(c, nil)
859 return nil, fmt.Errorf("during bt handshake: %w", err)
867 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
868 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
873 c.PeerExtensionBytes = res.PeerExtensionBits
874 c.PeerID = res.PeerID
875 c.completedHandshake = time.Now()
876 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
882 func (cl *Client) runReceivedConn(c *PeerConn) {
883 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
887 t, err := cl.receiveHandshakes(c)
890 "error receiving handshakes on %v: %s", c, err,
891 ).SetLevel(log.Debug).
893 "network", c.Network,
895 torrent.Add("error receiving handshake", 1)
897 cl.onBadAccept(c.RemoteAddr)
902 torrent.Add("received handshake for unloaded torrent", 1)
903 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
905 cl.onBadAccept(c.RemoteAddr)
909 torrent.Add("received handshake for loaded torrent", 1)
912 t.runHandshookConnLoggingErr(c)
915 // Client lock must be held before entering this.
916 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
918 if c.PeerID == cl.peerID {
921 addr := c.conn.RemoteAddr().String()
922 cl.dopplegangerAddrs[addr] = struct{}{}
924 // Because the remote address is not necessarily the same as its client's torrent listen
925 // address, we won't record the remote address as a doppleganger. Instead, the initiator
926 // can record *us* as the doppleganger.
928 return errors.New("local and remote peer ids are the same")
930 c.conn.SetWriteDeadline(time.Time{})
931 c.r = deadlineReader{c.conn, c.r}
932 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
933 if connIsIpv6(c.conn) {
934 torrent.Add("completed handshake over ipv6", 1)
936 if err := t.addPeerConn(c); err != nil {
937 return fmt.Errorf("adding connection: %w", err)
939 defer t.dropConnection(c)
941 cl.sendInitialMessages(c, t)
942 err := c.mainReadLoop()
944 return fmt.Errorf("main read loop: %w", err)
949 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
950 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
951 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
952 const localClientReqq = 1 << 5
954 // See the order given in Transmission's tr_peerMsgsNew.
955 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
956 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
957 conn.write(pp.Message{
959 ExtendedID: pp.HandshakeExtendedID,
960 ExtendedPayload: func() []byte {
961 msg := pp.ExtendedHandshakeMessage{
962 M: map[pp.ExtensionName]pp.ExtensionNumber{
963 pp.ExtensionNameMetadata: metadataExtendedId,
965 V: cl.config.ExtendedHandshakeClientVersion,
966 Reqq: localClientReqq,
967 YourIp: pp.CompactIp(conn.remoteIp()),
968 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
969 Port: cl.incomingPeerPort(),
970 MetadataSize: torrent.metadataSize(),
971 // TODO: We can figured these out specific to the socket
973 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
974 Ipv6: cl.config.PublicIp6.To16(),
976 if !cl.config.DisablePEX {
977 msg.M[pp.ExtensionNamePex] = pexExtendedId
979 return bencode.MustMarshal(msg)
984 if conn.fastEnabled() {
985 if torrent.haveAllPieces() {
986 conn.write(pp.Message{Type: pp.HaveAll})
987 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
989 } else if !torrent.haveAnyPieces() {
990 conn.write(pp.Message{Type: pp.HaveNone})
991 conn.sentHaves.Clear()
997 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
998 conn.write(pp.Message{
1005 func (cl *Client) dhtPort() (ret uint16) {
1006 if len(cl.dhtServers) == 0 {
1009 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1012 func (cl *Client) haveDhtServer() bool {
1013 return len(cl.dhtServers) > 0
1016 // Process incoming ut_metadata message.
1017 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1018 var d pp.ExtendedMetadataRequestMsg
1019 err := bencode.Unmarshal(payload, &d)
1020 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1021 } else if err != nil {
1022 return fmt.Errorf("error unmarshalling bencode: %s", err)
1026 case pp.DataMetadataExtensionMsgType:
1027 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1028 if !c.requestedMetadataPiece(piece) {
1029 return fmt.Errorf("got unexpected piece %d", piece)
1031 c.metadataRequests[piece] = false
1032 begin := len(payload) - d.PieceSize()
1033 if begin < 0 || begin >= len(payload) {
1034 return fmt.Errorf("data has bad offset in payload: %d", begin)
1036 t.saveMetadataPiece(piece, payload[begin:])
1037 c.lastUsefulChunkReceived = time.Now()
1038 err = t.maybeCompleteMetadata()
1040 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1041 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1042 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1043 // log consumers can filter for this message.
1044 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1047 case pp.RequestMetadataExtensionMsgType:
1048 if !t.haveMetadataPiece(piece) {
1049 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1052 start := (1 << 14) * piece
1053 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1054 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1056 case pp.RejectMetadataExtensionMsgType:
1059 return errors.New("unknown msg_type value")
1063 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1064 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1065 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1070 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1074 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1077 if _, ok := cl.ipBlockRange(ip); ok {
1080 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1086 // Return a Torrent ready for insertion into a Client.
1087 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1088 // use provided storage, if provided
1089 storageClient := cl.defaultStorage
1090 if specStorage != nil {
1091 storageClient = storage.NewClient(specStorage)
1097 peers: prioritizedPeers{
1099 getPrio: func(p PeerInfo) peerPriority {
1101 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1104 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1106 halfOpen: make(map[string]PeerInfo),
1107 pieceStateChanges: pubsub.NewPubSub(),
1109 storageOpener: storageClient,
1110 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1112 metadataChanged: sync.Cond{
1115 webSeeds: make(map[string]*Peer),
1116 gotMetainfoC: make(chan struct{}),
1118 t.networkingEnabled.Set()
1119 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1120 t.logger = cl.logger.WithContextValue(t)
1121 t.setChunkSize(defaultChunkSize)
1125 // A file-like handle to some torrent data resource.
1126 type Handle interface {
1133 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1134 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1137 // Adds a torrent by InfoHash with a custom Storage implementation.
1138 // If the torrent already exists then this Storage is ignored and the
1139 // existing torrent returned with `new` set to `false`
1140 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1143 t, ok := cl.torrents[infoHash]
1149 t = cl.newTorrent(infoHash, specStorage)
1150 cl.eachDhtServer(func(s DhtServer) {
1151 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1152 go t.dhtAnnouncer(s)
1155 cl.torrents[infoHash] = t
1156 cl.clearAcceptLimits()
1157 t.updateWantPeersEvent()
1158 // Tickle Client.waitAccept, new torrent may want conns.
1159 cl.event.Broadcast()
1163 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1164 // Torrent.MergeSpec.
1165 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1166 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1167 err = t.MergeSpec(spec)
1168 if err != nil && new {
1174 type stringAddr string
1176 var _ net.Addr = stringAddr("")
1178 func (stringAddr) Network() string { return "" }
1179 func (me stringAddr) String() string { return string(me) }
1181 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1182 // spec.DisallowDataDownload/Upload will be read and applied
1183 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1184 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1185 if spec.DisplayName != "" {
1186 t.SetDisplayName(spec.DisplayName)
1188 if spec.InfoBytes != nil {
1189 err := t.SetInfoBytes(spec.InfoBytes)
1195 cl.AddDhtNodes(spec.DhtNodes)
1198 useTorrentSources(spec.Sources, t)
1199 for _, url := range spec.Webseeds {
1202 for _, peerAddr := range spec.PeerAddrs {
1204 Addr: stringAddr(peerAddr),
1205 Source: PeerSourceDirect,
1209 if spec.ChunkSize != 0 {
1210 t.setChunkSize(pp.Integer(spec.ChunkSize))
1212 t.addTrackers(spec.Trackers)
1214 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1215 t.dataUploadDisallowed = spec.DisallowDataUpload
1219 func useTorrentSources(sources []string, t *Torrent) {
1220 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1221 ctx := context.Background()
1222 for i := 0; i < len(sources); i += 1 {
1225 if err := useTorrentSource(ctx, s, t); err != nil {
1226 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1228 t.logger.Printf("successfully used source %q", s)
1234 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1235 ctx, cancel := context.WithCancel(ctx)
1245 var req *http.Request
1246 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1249 var resp *http.Response
1250 if resp, err = http.DefaultClient.Do(req); err != nil {
1253 var mi metainfo.MetaInfo
1254 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1257 if ctx.Err() != nil {
1262 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1265 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1266 t, ok := cl.torrents[infoHash]
1268 err = fmt.Errorf("no such torrent")
1275 delete(cl.torrents, infoHash)
1279 func (cl *Client) allTorrentsCompleted() bool {
1280 for _, t := range cl.torrents {
1284 if !t.haveAllPieces() {
1291 // Returns true when all torrents are completely downloaded and false if the
1292 // client is stopped before that.
1293 func (cl *Client) WaitAll() bool {
1296 for !cl.allTorrentsCompleted() {
1297 if cl.closed.IsSet() {
1305 // Returns handles to all the torrents loaded in the Client.
1306 func (cl *Client) Torrents() []*Torrent {
1309 return cl.torrentsAsSlice()
1312 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1313 for _, t := range cl.torrents {
1314 ret = append(ret, t)
1319 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1320 spec, err := TorrentSpecFromMagnetUri(uri)
1324 T, _, err = cl.AddTorrentSpec(spec)
1328 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1329 ts, err := TorrentSpecFromMetaInfoErr(mi)
1333 T, _, err = cl.AddTorrentSpec(ts)
1337 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1338 mi, err := metainfo.LoadFromFile(filename)
1342 return cl.AddTorrent(mi)
1345 func (cl *Client) DhtServers() []DhtServer {
1346 return cl.dhtServers
1349 func (cl *Client) AddDhtNodes(nodes []string) {
1350 for _, n := range nodes {
1351 hmp := missinggo.SplitHostMaybePort(n)
1352 ip := net.ParseIP(hmp.Host)
1354 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1357 ni := krpc.NodeInfo{
1358 Addr: krpc.NodeAddr{
1363 cl.eachDhtServer(func(s DhtServer) {
1369 func (cl *Client) banPeerIP(ip net.IP) {
1370 cl.logger.Printf("banning ip %v", ip)
1371 if cl.badPeerIPs == nil {
1372 cl.badPeerIPs = make(map[string]struct{})
1374 cl.badPeerIPs[ip.String()] = struct{}{}
1377 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1386 PeerMaxRequests: 250,
1388 RemoteAddr: remoteAddr,
1390 callbacks: &cl.config.Callbacks,
1392 connString: connString,
1396 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1397 c.setRW(connStatsReadWriter{nc, c})
1398 c.r = &rateLimitedReader{
1399 l: cl.config.DownloadRateLimiter,
1402 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1403 for _, f := range cl.config.Callbacks.NewPeer {
1409 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1416 t.addPeers([]PeerInfo{{
1417 Addr: ipPortAddr{ip, port},
1418 Source: PeerSourceDhtAnnouncePeer,
1422 func firstNotNil(ips ...net.IP) net.IP {
1423 for _, ip := range ips {
1431 func (cl *Client) eachListener(f func(Listener) bool) {
1432 for _, s := range cl.listeners {
1439 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1440 for i := 0; i < len(cl.listeners); i += 1 {
1441 if ret = cl.listeners[i]; f(ret) {
1448 func (cl *Client) publicIp(peer net.IP) net.IP {
1449 // TODO: Use BEP 10 to determine how peers are seeing us.
1450 if peer.To4() != nil {
1452 cl.config.PublicIp4,
1453 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1458 cl.config.PublicIp6,
1459 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1463 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1464 l := cl.findListener(
1465 func(l Listener) bool {
1466 return f(addrIpOrNil(l.Addr()))
1472 return addrIpOrNil(l.Addr())
1475 // Our IP as a peer should see it.
1476 func (cl *Client) publicAddr(peer net.IP) IpPort {
1477 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1480 // ListenAddrs addresses currently being listened to.
1481 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1483 ret = make([]net.Addr, len(cl.listeners))
1484 for i := 0; i < len(cl.listeners); i += 1 {
1485 ret[i] = cl.listeners[i].Addr()
1491 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1492 ipa, ok := tryIpPortFromNetAddr(addr)
1496 ip := maskIpForAcceptLimiting(ipa.IP)
1497 if cl.acceptLimiter == nil {
1498 cl.acceptLimiter = make(map[ipStr]int)
1500 cl.acceptLimiter[ipStr(ip.String())]++
1503 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1504 if ip4 := ip.To4(); ip4 != nil {
1505 return ip4.Mask(net.CIDRMask(24, 32))
1510 func (cl *Client) clearAcceptLimits() {
1511 cl.acceptLimiter = nil
1514 func (cl *Client) acceptLimitClearer() {
1517 case <-cl.closed.Done():
1519 case <-time.After(15 * time.Minute):
1521 cl.clearAcceptLimits()
1527 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1528 if cl.config.DisableAcceptRateLimiting {
1531 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1534 func (cl *Client) rLock() {
1538 func (cl *Client) rUnlock() {
1542 func (cl *Client) lock() {
1546 func (cl *Client) unlock() {
1550 func (cl *Client) locker() *lockWithDeferreds {
1554 func (cl *Client) String() string {
1555 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1558 // Returns connection-level aggregate stats at the Client level. See the comment on
1559 // TorrentStats.ConnStats.
1560 func (cl *Client) ConnStats() ConnStats {
1561 return cl.stats.Copy()