18 "github.com/anacrolix/chansync/events"
19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/v2"
25 "github.com/anacrolix/missinggo/v2/bitmap"
26 "github.com/anacrolix/missinggo/v2/pproffd"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
34 "github.com/anacrolix/chansync"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/internal/limiter"
38 "github.com/anacrolix/torrent/iplist"
39 "github.com/anacrolix/torrent/metainfo"
40 "github.com/anacrolix/torrent/mse"
41 pp "github.com/anacrolix/torrent/peer_protocol"
42 "github.com/anacrolix/torrent/storage"
43 "github.com/anacrolix/torrent/tracker"
44 "github.com/anacrolix/torrent/webtorrent"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
56 closed chansync.SetOnce
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
82 activeAnnounceLimiter limiter.Instance
84 updateRequests chansync.BroadcastCond
89 func (cl *Client) BadPeerIPs() (ips []string) {
91 ips = cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() (ips []string) {
97 ips = make([]string, len(cl.badPeerIPs))
99 for k := range cl.badPeerIPs {
106 func (cl *Client) PeerID() PeerID {
110 // Returns the port number for the first listener that has one. No longer assumes that all port
111 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
113 func (cl *Client) LocalPort() (port int) {
114 for i := 0; i < len(cl.listeners); i += 1 {
115 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
122 func writeDhtServerStatus(w io.Writer, s DhtServer) {
123 dhtStats := s.Stats()
124 fmt.Fprintf(w, " ID: %x\n", s.ID())
125 spew.Fdump(w, dhtStats)
128 // Writes out a human readable status of the client, such as for writing to a
130 func (cl *Client) WriteStatus(_w io.Writer) {
133 w := bufio.NewWriter(_w)
135 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
136 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
137 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
138 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
139 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
140 cl.eachDhtServer(func(s DhtServer) {
141 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
142 writeDhtServerStatus(w, s)
144 spew.Fdump(w, &cl.stats)
145 torrentsSlice := cl.torrentsAsSlice()
146 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
148 sort.Slice(torrentsSlice, func(l, r int) bool {
149 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
151 for _, t := range torrentsSlice {
153 fmt.Fprint(w, "<unknown name>")
155 fmt.Fprint(w, t.name())
161 "%f%% of %d bytes (%s)",
162 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
164 humanize.Bytes(uint64(*t.length)))
166 w.WriteString("<missing metainfo>")
174 // Filters things that are less than warning from UPnP discovery.
175 func upnpDiscoverLogFilter(m log.Msg) bool {
176 level, ok := m.GetLevel()
177 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
180 func (cl *Client) initLogger() {
181 logger := cl.config.Logger
184 if !cl.config.Debug {
185 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
188 cl.logger = logger.WithValues(cl)
191 func (cl *Client) announceKey() int32 {
192 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
195 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
196 func (cl *Client) init(cfg *ClientConfig) {
198 cl.dopplegangerAddrs = make(map[string]struct{})
199 cl.torrents = make(map[metainfo.Hash]*Torrent)
200 cl.dialRateLimiter = rate.NewLimiter(10, 10)
201 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 cl.event.L = cl.locker()
204 cl.ipBlockList = cfg.IPBlocklist
207 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
209 cfg = NewDefaultClientConfig()
215 go cl.acceptLimitClearer()
224 storageImpl := cfg.DefaultStorage
225 if storageImpl == nil {
226 // We'd use mmap by default but HFS+ doesn't support sparse files.
227 storageImplCloser := storage.NewFile(cfg.DataDir)
228 cl.onClose = append(cl.onClose, func() {
229 if err := storageImplCloser.Close(); err != nil {
230 cl.logger.Printf("error closing default storage: %s", err)
233 storageImpl = storageImplCloser
235 cl.defaultStorage = storage.NewClient(storageImpl)
237 if cfg.PeerID != "" {
238 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
240 o := copy(cl.peerID[:], cfg.Bep20)
241 _, err = rand.Read(cl.peerID[o:])
243 panic("error generating peer id")
247 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
255 for _, _s := range sockets {
256 s := _s // Go is fucking retarded.
257 cl.onClose = append(cl.onClose, func() { s.Close() })
258 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
259 cl.dialers = append(cl.dialers, s)
260 cl.listeners = append(cl.listeners, s)
261 if cl.config.AcceptPeerConnections {
262 go cl.acceptConnections(s)
269 for _, s := range sockets {
270 if pc, ok := s.(net.PacketConn); ok {
271 ds, err := cl.NewAnacrolixDhtServer(pc)
275 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
276 cl.onClose = append(cl.onClose, func() { ds.Close() })
281 cl.websocketTrackers = websocketTrackers{
284 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
287 t, ok := cl.torrents[infoHash]
289 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
291 return t.announceRequest(event), nil
293 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
296 t, ok := cl.torrents[dcc.InfoHash]
298 cl.logger.WithDefaultLevel(log.Warning).Printf(
299 "got webrtc conn for unloaded torrent with infohash %x",
305 go t.onWebRtcConn(dc, dcc)
312 func (cl *Client) AddDhtServer(d DhtServer) {
313 cl.dhtServers = append(cl.dhtServers, d)
316 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
317 // given address for any Torrent.
318 func (cl *Client) AddDialer(d Dialer) {
321 cl.dialers = append(cl.dialers, d)
322 for _, t := range cl.torrents {
327 func (cl *Client) Listeners() []Listener {
331 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
333 func (cl *Client) AddListener(l Listener) {
334 cl.listeners = append(cl.listeners, l)
335 if cl.config.AcceptPeerConnections {
336 go cl.acceptConnections(l)
340 func (cl *Client) firewallCallback(net.Addr) bool {
342 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
345 torrent.Add("connections firewalled", 1)
347 torrent.Add("connections not firewalled", 1)
352 func (cl *Client) listenOnNetwork(n network) bool {
353 if n.Ipv4 && cl.config.DisableIPv4 {
356 if n.Ipv6 && cl.config.DisableIPv6 {
359 if n.Tcp && cl.config.DisableTCP {
362 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
368 func (cl *Client) listenNetworks() (ns []network) {
369 for _, n := range allPeerNetworks {
370 if cl.listenOnNetwork(n) {
377 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
378 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
379 cfg := dht.ServerConfig{
380 IPBlocklist: cl.ipBlockList,
382 OnAnnouncePeer: cl.onDHTAnnouncePeer,
383 PublicIP: func() net.IP {
384 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
385 return cl.config.PublicIp6
387 return cl.config.PublicIp4
389 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
390 OnQuery: cl.config.DHTOnQuery,
391 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
393 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
396 s, err = dht.NewServer(&cfg)
399 ts, err := s.Bootstrap()
401 cl.logger.Printf("error bootstrapping dht: %s", err)
403 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
409 func (cl *Client) Closed() events.Done {
410 return cl.closed.Done()
413 func (cl *Client) eachDhtServer(f func(DhtServer)) {
414 for _, ds := range cl.dhtServers {
419 // Stops the client. All connections to peers are closed and all activity will
421 func (cl *Client) Close() (errs []error) {
423 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
426 for _, t := range cl.torrents {
427 err := t.close(&closeGroup)
429 errs = append(errs, err)
433 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
435 for i := range cl.onClose {
436 cl.onClose[len(cl.onClose)-1-i]()
442 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
443 if cl.ipBlockList == nil {
446 return cl.ipBlockList.Lookup(ip)
449 func (cl *Client) ipIsBlocked(ip net.IP) bool {
450 _, blocked := cl.ipBlockRange(ip)
454 func (cl *Client) wantConns() bool {
455 if cl.config.AlwaysWantConns {
458 for _, t := range cl.torrents {
466 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
467 func (cl *Client) rejectAccepted(conn net.Conn) error {
469 return errors.New("don't want conns right now")
471 ra := conn.RemoteAddr()
472 if rip := addrIpOrNil(ra); rip != nil {
473 if cl.config.DisableIPv4Peers && rip.To4() != nil {
474 return errors.New("ipv4 peers disabled")
476 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
477 return errors.New("ipv4 disabled")
480 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
481 return errors.New("ipv6 disabled")
483 if cl.rateLimitAccept(rip) {
484 return errors.New("source IP accepted rate limited")
486 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
487 return errors.New("bad source addr")
493 func (cl *Client) acceptConnections(l Listener) {
495 conn, err := l.Accept()
496 torrent.Add("client listener accepts", 1)
497 conn = pproffd.WrapNetConn(conn)
499 closed := cl.closed.IsSet()
502 reject = cl.rejectAccepted(conn)
512 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
517 torrent.Add("rejected accepted connections", 1)
518 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
521 go cl.incomingConnection(conn)
523 log.Fmsg("accepted %q connection at %q from %q",
527 ).SetLevel(log.Debug).Log(cl.logger)
528 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
529 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
530 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
535 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
536 func regularNetConnPeerConnConnString(nc net.Conn) string {
537 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
540 func (cl *Client) incomingConnection(nc net.Conn) {
542 if tc, ok := nc.(*net.TCPConn); ok {
545 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
546 regularNetConnPeerConnConnString(nc))
552 c.Discovery = PeerSourceIncoming
553 cl.runReceivedConn(c)
556 // Returns a handle to the given torrent, if it's present in the client.
557 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
560 t, ok = cl.torrents[ih]
564 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
565 return cl.torrents[ih]
568 type DialResult struct {
573 func countDialResult(err error) {
575 torrent.Add("successful dials", 1)
577 torrent.Add("unsuccessful dials", 1)
581 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
582 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
583 if ret < minDialTimeout {
589 // Returns whether an address is known to connect to a client with our own ID.
590 func (cl *Client) dopplegangerAddr(addr string) bool {
591 _, ok := cl.dopplegangerAddrs[addr]
595 // Returns a connection over UTP or TCP, whichever is first to connect.
596 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
597 return DialFirst(ctx, addr, cl.dialers)
600 // Returns a connection over UTP or TCP, whichever is first to connect.
601 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
603 t := perf.NewTimer(perf.CallerName(0))
606 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
608 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
612 ctx, cancel := context.WithCancel(ctx)
613 // As soon as we return one connection, cancel the others.
616 resCh := make(chan DialResult, left)
617 for _, _s := range dialers {
622 dialFromSocket(ctx, s, addr),
627 // Wait for a successful connection.
629 defer perf.ScopeTimer()()
630 for ; left > 0 && res.Conn == nil; left-- {
634 // There are still incompleted dials.
636 for ; left > 0; left-- {
637 conn := (<-resCh).Conn
644 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
649 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
650 c, err := s.Dial(ctx, addr)
651 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
652 // it now in case we close the connection forthwith.
653 if tc, ok := c.(*net.TCPConn); ok {
660 func forgettableDialError(err error) bool {
661 return strings.Contains(err.Error(), "no suitable address found")
664 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
665 if _, ok := t.halfOpen[addr]; !ok {
666 panic("invariant broken")
668 delete(t.halfOpen, addr)
670 for _, t := range cl.torrents {
675 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
676 // for valid reasons.
677 func (cl *Client) initiateProtocolHandshakes(
681 outgoing, encryptHeader bool,
682 remoteAddr PeerRemoteAddr,
683 network, connString string,
685 c *PeerConn, err error,
687 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
688 c.headerEncrypted = encryptHeader
689 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
691 dl, ok := ctx.Deadline()
695 err = nc.SetDeadline(dl)
699 err = cl.initiateHandshakes(c, t)
703 // Returns nil connection and nil error if no connection could be established for valid reasons.
704 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
705 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
708 return t.dialTimeout()
711 dr := cl.dialFirst(dialCtx, addr.String())
714 if dialCtx.Err() != nil {
715 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
717 return nil, errors.New("dial failed")
719 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
726 // Returns nil connection and nil error if no connection could be established
727 // for valid reasons.
728 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
729 torrent.Add("establish outgoing connection", 1)
730 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
731 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
733 torrent.Add("initiated conn with preferred header obfuscation", 1)
736 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
737 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
738 // We should have just tried with the preferred header obfuscation. If it was required,
739 // there's nothing else to try.
742 // Try again with encryption if we didn't earlier, or without if we did.
743 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
745 torrent.Add("initiated conn with fallback header obfuscation", 1)
747 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
751 // Called to dial out and run a connection. The addr we're given is already
752 // considered half-open.
753 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
754 cl.dialRateLimiter.Wait(context.Background())
755 c, err := cl.establishOutgoingConn(t, addr)
758 // Don't release lock between here and addPeerConn, unless it's for
760 cl.noLongerHalfOpen(t, addr.String())
763 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
770 t.runHandshookConnLoggingErr(c)
773 // The port number for incoming peer connections. 0 if the client isn't listening.
774 func (cl *Client) incomingPeerPort() int {
775 return cl.LocalPort()
778 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
779 if c.headerEncrypted {
782 rw, c.cryptoMethod, err = mse.InitiateHandshake(
789 cl.config.CryptoProvides,
793 return fmt.Errorf("header obfuscation handshake: %w", err)
796 ih, err := cl.connBtHandshake(c, &t.infoHash)
798 return fmt.Errorf("bittorrent protocol handshake: %w", err)
800 if ih != t.infoHash {
801 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
806 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
807 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
808 func (cl *Client) forSkeys(f func([]byte) bool) {
811 if false { // Emulate the bug from #114
813 for ih := range cl.torrents {
817 for range cl.torrents {
824 for ih := range cl.torrents {
831 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
832 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
838 // Do encryption and bittorrent handshakes as receiver.
839 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
840 defer perf.ScopeTimerErr(&err)()
842 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
844 if err == nil || err == mse.ErrNoSecretKeyMatch {
845 if c.headerEncrypted {
846 torrent.Add("handshakes received encrypted", 1)
848 torrent.Add("handshakes received unencrypted", 1)
851 torrent.Add("handshakes received with error while handling encryption", 1)
854 if err == mse.ErrNoSecretKeyMatch {
859 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
860 err = errors.New("connection does not have required header obfuscation")
863 ih, err := cl.connBtHandshake(c, nil)
865 return nil, fmt.Errorf("during bt handshake: %w", err)
873 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
874 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
879 c.PeerExtensionBytes = res.PeerExtensionBits
880 c.PeerID = res.PeerID
881 c.completedHandshake = time.Now()
882 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
888 func (cl *Client) runReceivedConn(c *PeerConn) {
889 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
893 t, err := cl.receiveHandshakes(c)
896 "error receiving handshakes on %v: %s", c, err,
897 ).SetLevel(log.Debug).
899 "network", c.Network,
901 torrent.Add("error receiving handshake", 1)
903 cl.onBadAccept(c.RemoteAddr)
908 torrent.Add("received handshake for unloaded torrent", 1)
909 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
911 cl.onBadAccept(c.RemoteAddr)
915 torrent.Add("received handshake for loaded torrent", 1)
918 t.runHandshookConnLoggingErr(c)
921 // Client lock must be held before entering this.
922 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
924 if c.PeerID == cl.peerID {
927 addr := c.conn.RemoteAddr().String()
928 cl.dopplegangerAddrs[addr] = struct{}{}
930 // Because the remote address is not necessarily the same as its client's torrent listen
931 // address, we won't record the remote address as a doppleganger. Instead, the initiator
932 // can record *us* as the doppleganger.
934 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
937 c.conn.SetWriteDeadline(time.Time{})
938 c.r = deadlineReader{c.conn, c.r}
939 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
940 if connIsIpv6(c.conn) {
941 torrent.Add("completed handshake over ipv6", 1)
943 if err := t.addPeerConn(c); err != nil {
944 return fmt.Errorf("adding connection: %w", err)
946 defer t.dropConnection(c)
948 cl.sendInitialMessages(c, t)
949 err := c.mainReadLoop()
951 return fmt.Errorf("main read loop: %w", err)
956 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
957 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
958 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
959 const localClientReqq = 1 << 5
961 // See the order given in Transmission's tr_peerMsgsNew.
962 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
963 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
964 conn.write(pp.Message{
966 ExtendedID: pp.HandshakeExtendedID,
967 ExtendedPayload: func() []byte {
968 msg := pp.ExtendedHandshakeMessage{
969 M: map[pp.ExtensionName]pp.ExtensionNumber{
970 pp.ExtensionNameMetadata: metadataExtendedId,
972 V: cl.config.ExtendedHandshakeClientVersion,
973 Reqq: localClientReqq,
974 YourIp: pp.CompactIp(conn.remoteIp()),
975 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
976 Port: cl.incomingPeerPort(),
977 MetadataSize: torrent.metadataSize(),
978 // TODO: We can figured these out specific to the socket
980 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
981 Ipv6: cl.config.PublicIp6.To16(),
983 if !cl.config.DisablePEX {
984 msg.M[pp.ExtensionNamePex] = pexExtendedId
986 return bencode.MustMarshal(msg)
991 if conn.fastEnabled() {
992 if torrent.haveAllPieces() {
993 conn.write(pp.Message{Type: pp.HaveAll})
994 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
996 } else if !torrent.haveAnyPieces() {
997 conn.write(pp.Message{Type: pp.HaveNone})
998 conn.sentHaves.Clear()
1004 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1005 conn.write(pp.Message{
1012 func (cl *Client) dhtPort() (ret uint16) {
1013 if len(cl.dhtServers) == 0 {
1016 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1019 func (cl *Client) haveDhtServer() bool {
1020 return len(cl.dhtServers) > 0
1023 // Process incoming ut_metadata message.
1024 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1025 var d pp.ExtendedMetadataRequestMsg
1026 err := bencode.Unmarshal(payload, &d)
1027 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1028 } else if err != nil {
1029 return fmt.Errorf("error unmarshalling bencode: %s", err)
1033 case pp.DataMetadataExtensionMsgType:
1034 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1035 if !c.requestedMetadataPiece(piece) {
1036 return fmt.Errorf("got unexpected piece %d", piece)
1038 c.metadataRequests[piece] = false
1039 begin := len(payload) - d.PieceSize()
1040 if begin < 0 || begin >= len(payload) {
1041 return fmt.Errorf("data has bad offset in payload: %d", begin)
1043 t.saveMetadataPiece(piece, payload[begin:])
1044 c.lastUsefulChunkReceived = time.Now()
1045 err = t.maybeCompleteMetadata()
1047 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1048 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1049 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1050 // log consumers can filter for this message.
1051 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1054 case pp.RequestMetadataExtensionMsgType:
1055 if !t.haveMetadataPiece(piece) {
1056 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1059 start := (1 << 14) * piece
1060 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1061 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1063 case pp.RejectMetadataExtensionMsgType:
1066 return errors.New("unknown msg_type value")
1070 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1071 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1072 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1077 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1081 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1084 if _, ok := cl.ipBlockRange(ip); ok {
1087 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1093 // Return a Torrent ready for insertion into a Client.
1094 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1095 return cl.newTorrentOpt(addTorrentOpts{
1097 Storage: specStorage,
1101 // Return a Torrent ready for insertion into a Client.
1102 func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) {
1103 // use provided storage, if provided
1104 storageClient := cl.defaultStorage
1105 if opts.Storage != nil {
1106 storageClient = storage.NewClient(opts.Storage)
1111 infoHash: opts.InfoHash,
1112 peers: prioritizedPeers{
1114 getPrio: func(p PeerInfo) peerPriority {
1116 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1119 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1121 halfOpen: make(map[string]PeerInfo),
1122 pieceStateChanges: pubsub.NewPubSub(),
1124 storageOpener: storageClient,
1125 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1127 metadataChanged: sync.Cond{
1130 webSeeds: make(map[string]*Peer),
1131 gotMetainfoC: make(chan struct{}),
1133 t.networkingEnabled.Set()
1134 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1135 t.logger = cl.logger.WithContextValue(t)
1136 if opts.ChunkSize == 0 {
1137 opts.ChunkSize = defaultChunkSize
1139 t.setChunkSize(opts.ChunkSize)
1143 // A file-like handle to some torrent data resource.
1144 type Handle interface {
1151 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1152 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1155 // Adds a torrent by InfoHash with a custom Storage implementation.
1156 // If the torrent already exists then this Storage is ignored and the
1157 // existing torrent returned with `new` set to `false`
1158 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1161 t, ok := cl.torrents[infoHash]
1167 t = cl.newTorrent(infoHash, specStorage)
1168 cl.eachDhtServer(func(s DhtServer) {
1169 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1170 go t.dhtAnnouncer(s)
1173 cl.torrents[infoHash] = t
1174 cl.clearAcceptLimits()
1175 t.updateWantPeersEvent()
1176 // Tickle Client.waitAccept, new torrent may want conns.
1177 cl.event.Broadcast()
1181 // Adds a torrent by InfoHash with a custom Storage implementation.
1182 // If the torrent already exists then this Storage is ignored and the
1183 // existing torrent returned with `new` set to `false`
1184 func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) {
1185 infoHash := opts.InfoHash
1188 t, ok := cl.torrents[infoHash]
1194 t = cl.newTorrentOpt(opts)
1195 cl.eachDhtServer(func(s DhtServer) {
1196 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1197 go t.dhtAnnouncer(s)
1200 cl.torrents[infoHash] = t
1201 cl.clearAcceptLimits()
1202 t.updateWantPeersEvent()
1203 // Tickle Client.waitAccept, new torrent may want conns.
1204 cl.event.Broadcast()
1208 type addTorrentOpts struct {
1210 Storage storage.ClientImpl
1211 ChunkSize pp.Integer
1214 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1215 // Torrent.MergeSpec.
1216 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1217 t, new = cl.AddTorrentOpt(addTorrentOpts{
1218 InfoHash: spec.InfoHash,
1219 Storage: spec.Storage,
1220 ChunkSize: spec.ChunkSize,
1224 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1226 modSpec.ChunkSize = 0
1228 err = t.MergeSpec(&modSpec)
1229 if err != nil && new {
1235 type stringAddr string
1237 var _ net.Addr = stringAddr("")
1239 func (stringAddr) Network() string { return "" }
1240 func (me stringAddr) String() string { return string(me) }
1242 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1243 // spec.DisallowDataDownload/Upload will be read and applied
1244 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1245 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1246 if spec.DisplayName != "" {
1247 t.SetDisplayName(spec.DisplayName)
1249 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1250 if spec.InfoBytes != nil {
1251 err := t.SetInfoBytes(spec.InfoBytes)
1257 cl.AddDhtNodes(spec.DhtNodes)
1260 useTorrentSources(spec.Sources, t)
1261 for _, url := range spec.Webseeds {
1264 for _, peerAddr := range spec.PeerAddrs {
1266 Addr: stringAddr(peerAddr),
1267 Source: PeerSourceDirect,
1271 if spec.ChunkSize != 0 {
1272 panic("chunk size cannot be changed for existing Torrent")
1274 t.addTrackers(spec.Trackers)
1276 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1277 t.dataUploadDisallowed = spec.DisallowDataUpload
1281 func useTorrentSources(sources []string, t *Torrent) {
1282 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1283 ctx := context.Background()
1284 for i := 0; i < len(sources); i += 1 {
1287 if err := useTorrentSource(ctx, s, t); err != nil {
1288 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1290 t.logger.Printf("successfully used source %q", s)
1296 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1297 ctx, cancel := context.WithCancel(ctx)
1307 var req *http.Request
1308 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1311 var resp *http.Response
1312 if resp, err = http.DefaultClient.Do(req); err != nil {
1315 var mi metainfo.MetaInfo
1316 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1319 if ctx.Err() != nil {
1324 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1327 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1328 t, ok := cl.torrents[infoHash]
1330 err = fmt.Errorf("no such torrent")
1337 delete(cl.torrents, infoHash)
1341 func (cl *Client) allTorrentsCompleted() bool {
1342 for _, t := range cl.torrents {
1346 if !t.haveAllPieces() {
1353 // Returns true when all torrents are completely downloaded and false if the
1354 // client is stopped before that.
1355 func (cl *Client) WaitAll() bool {
1358 for !cl.allTorrentsCompleted() {
1359 if cl.closed.IsSet() {
1367 // Returns handles to all the torrents loaded in the Client.
1368 func (cl *Client) Torrents() []*Torrent {
1371 return cl.torrentsAsSlice()
1374 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1375 for _, t := range cl.torrents {
1376 ret = append(ret, t)
1381 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1382 spec, err := TorrentSpecFromMagnetUri(uri)
1386 T, _, err = cl.AddTorrentSpec(spec)
1390 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1391 ts, err := TorrentSpecFromMetaInfoErr(mi)
1395 T, _, err = cl.AddTorrentSpec(ts)
1399 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1400 mi, err := metainfo.LoadFromFile(filename)
1404 return cl.AddTorrent(mi)
1407 func (cl *Client) DhtServers() []DhtServer {
1408 return cl.dhtServers
1411 func (cl *Client) AddDhtNodes(nodes []string) {
1412 for _, n := range nodes {
1413 hmp := missinggo.SplitHostMaybePort(n)
1414 ip := net.ParseIP(hmp.Host)
1416 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1419 ni := krpc.NodeInfo{
1420 Addr: krpc.NodeAddr{
1425 cl.eachDhtServer(func(s DhtServer) {
1431 func (cl *Client) banPeerIP(ip net.IP) {
1432 cl.logger.Printf("banning ip %v", ip)
1433 if cl.badPeerIPs == nil {
1434 cl.badPeerIPs = make(map[string]struct{})
1436 cl.badPeerIPs[ip.String()] = struct{}{}
1439 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1448 PeerMaxRequests: 250,
1450 RemoteAddr: remoteAddr,
1452 callbacks: &cl.config.Callbacks,
1454 connString: connString,
1458 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1459 c.setRW(connStatsReadWriter{nc, c})
1460 c.r = &rateLimitedReader{
1461 l: cl.config.DownloadRateLimiter,
1464 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1465 for _, f := range cl.config.Callbacks.NewPeer {
1471 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1478 t.addPeers([]PeerInfo{{
1479 Addr: ipPortAddr{ip, port},
1480 Source: PeerSourceDhtAnnouncePeer,
1484 func firstNotNil(ips ...net.IP) net.IP {
1485 for _, ip := range ips {
1493 func (cl *Client) eachListener(f func(Listener) bool) {
1494 for _, s := range cl.listeners {
1501 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1502 for i := 0; i < len(cl.listeners); i += 1 {
1503 if ret = cl.listeners[i]; f(ret) {
1510 func (cl *Client) publicIp(peer net.IP) net.IP {
1511 // TODO: Use BEP 10 to determine how peers are seeing us.
1512 if peer.To4() != nil {
1514 cl.config.PublicIp4,
1515 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1520 cl.config.PublicIp6,
1521 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1525 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1526 l := cl.findListener(
1527 func(l Listener) bool {
1528 return f(addrIpOrNil(l.Addr()))
1534 return addrIpOrNil(l.Addr())
1537 // Our IP as a peer should see it.
1538 func (cl *Client) publicAddr(peer net.IP) IpPort {
1539 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1542 // ListenAddrs addresses currently being listened to.
1543 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1545 ret = make([]net.Addr, len(cl.listeners))
1546 for i := 0; i < len(cl.listeners); i += 1 {
1547 ret[i] = cl.listeners[i].Addr()
1553 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1554 ipa, ok := tryIpPortFromNetAddr(addr)
1558 ip := maskIpForAcceptLimiting(ipa.IP)
1559 if cl.acceptLimiter == nil {
1560 cl.acceptLimiter = make(map[ipStr]int)
1562 cl.acceptLimiter[ipStr(ip.String())]++
1565 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1566 if ip4 := ip.To4(); ip4 != nil {
1567 return ip4.Mask(net.CIDRMask(24, 32))
1572 func (cl *Client) clearAcceptLimits() {
1573 cl.acceptLimiter = nil
1576 func (cl *Client) acceptLimitClearer() {
1579 case <-cl.closed.Done():
1581 case <-time.After(15 * time.Minute):
1583 cl.clearAcceptLimits()
1589 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1590 if cl.config.DisableAcceptRateLimiting {
1593 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1596 func (cl *Client) rLock() {
1600 func (cl *Client) rUnlock() {
1604 func (cl *Client) lock() {
1608 func (cl *Client) unlock() {
1612 func (cl *Client) locker() *lockWithDeferreds {
1616 func (cl *Client) String() string {
1617 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1620 // Returns connection-level aggregate stats at the Client level. See the comment on
1621 // TorrentStats.ConnStats.
1622 func (cl *Client) ConnStats() ConnStats {
1623 return cl.stats.Copy()