20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 request_strategy "github.com/anacrolix/torrent/request-strategy"
31 "github.com/davecgh/go-spew/spew"
32 "github.com/dustin/go-humanize"
33 "github.com/google/btree"
34 "github.com/pion/datachannel"
35 "golang.org/x/time/rate"
37 "github.com/anacrolix/chansync"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/internal/limiter"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
46 "github.com/anacrolix/torrent/tracker"
47 "github.com/anacrolix/torrent/webtorrent"
50 // Clients contain zero or more Torrents. A Client manages a blocklist, the
51 // TCP/UDP protocol ports, and DHT as desired.
53 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
59 closed chansync.SetOnce
65 defaultStorage *storage.Client
69 dhtServers []DhtServer
70 ipBlockList iplist.Ranger
72 // Set of addresses that have our client ID. This intentionally will
73 // include ourselves if we end up trying to connect to our own address
74 // through legitimate channels.
75 dopplegangerAddrs map[string]struct{}
76 badPeerIPs map[string]struct{}
77 torrents map[InfoHash]*Torrent
78 pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
80 acceptLimiter map[ipStr]int
81 dialRateLimiter *rate.Limiter
84 websocketTrackers websocketTrackers
86 activeAnnounceLimiter limiter.Instance
87 webseedHttpClient *http.Client
92 func (cl *Client) BadPeerIPs() (ips []string) {
94 ips = cl.badPeerIPsLocked()
99 func (cl *Client) badPeerIPsLocked() (ips []string) {
100 ips = make([]string, len(cl.badPeerIPs))
102 for k := range cl.badPeerIPs {
109 func (cl *Client) PeerID() PeerID {
113 // Returns the port number for the first listener that has one. No longer assumes that all port
114 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
116 func (cl *Client) LocalPort() (port int) {
117 for i := 0; i < len(cl.listeners); i += 1 {
118 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
125 func writeDhtServerStatus(w io.Writer, s DhtServer) {
126 dhtStats := s.Stats()
127 fmt.Fprintf(w, " ID: %x\n", s.ID())
128 spew.Fdump(w, dhtStats)
131 // Writes out a human readable status of the client, such as for writing to a
133 func (cl *Client) WriteStatus(_w io.Writer) {
136 w := bufio.NewWriter(_w)
138 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
139 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
140 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
141 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
142 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
143 cl.eachDhtServer(func(s DhtServer) {
144 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
145 writeDhtServerStatus(w, s)
147 spew.Fdump(w, &cl.stats)
148 torrentsSlice := cl.torrentsAsSlice()
149 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
151 sort.Slice(torrentsSlice, func(l, r int) bool {
152 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
154 for _, t := range torrentsSlice {
156 fmt.Fprint(w, "<unknown name>")
158 fmt.Fprint(w, t.name())
164 "%f%% of %d bytes (%s)",
165 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
167 humanize.Bytes(uint64(*t.length)))
169 w.WriteString("<missing metainfo>")
177 // Filters things that are less than warning from UPnP discovery.
178 func upnpDiscoverLogFilter(m log.Msg) bool {
179 level, ok := m.GetLevel()
180 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
183 func (cl *Client) initLogger() {
184 logger := cl.config.Logger
187 if !cl.config.Debug {
188 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
191 cl.logger = logger.WithValues(cl)
194 func (cl *Client) announceKey() int32 {
195 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
198 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
199 func (cl *Client) init(cfg *ClientConfig) {
201 cl.dopplegangerAddrs = make(map[string]struct{})
202 cl.torrents = make(map[metainfo.Hash]*Torrent)
203 cl.dialRateLimiter = rate.NewLimiter(10, 10)
204 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
207 cl.webseedHttpClient = &http.Client{
208 Transport: &http.Transport{
209 Proxy: cfg.HTTPProxy,
215 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
217 cfg = NewDefaultClientConfig()
223 go cl.acceptLimitClearer()
232 storageImpl := cfg.DefaultStorage
233 if storageImpl == nil {
234 // We'd use mmap by default but HFS+ doesn't support sparse files.
235 storageImplCloser := storage.NewFile(cfg.DataDir)
236 cl.onClose = append(cl.onClose, func() {
237 if err := storageImplCloser.Close(); err != nil {
238 cl.logger.Printf("error closing default storage: %s", err)
241 storageImpl = storageImplCloser
243 cl.defaultStorage = storage.NewClient(storageImpl)
245 if cfg.PeerID != "" {
246 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
248 o := copy(cl.peerID[:], cfg.Bep20)
249 _, err = rand.Read(cl.peerID[o:])
251 panic("error generating peer id")
255 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
263 for _, _s := range sockets {
264 s := _s // Go is fucking retarded.
265 cl.onClose = append(cl.onClose, func() { go s.Close() })
266 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
267 cl.dialers = append(cl.dialers, s)
268 cl.listeners = append(cl.listeners, s)
269 if cl.config.AcceptPeerConnections {
270 go cl.acceptConnections(s)
277 for _, s := range sockets {
278 if pc, ok := s.(net.PacketConn); ok {
279 ds, err := cl.NewAnacrolixDhtServer(pc)
283 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
284 cl.onClose = append(cl.onClose, func() { ds.Close() })
289 cl.websocketTrackers = websocketTrackers{
292 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
295 t, ok := cl.torrents[infoHash]
297 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
299 return t.announceRequest(event), nil
301 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
304 t, ok := cl.torrents[dcc.InfoHash]
306 cl.logger.WithDefaultLevel(log.Warning).Printf(
307 "got webrtc conn for unloaded torrent with infohash %x",
313 go t.onWebRtcConn(dc, dcc)
320 func (cl *Client) AddDhtServer(d DhtServer) {
321 cl.dhtServers = append(cl.dhtServers, d)
324 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
325 // given address for any Torrent.
326 func (cl *Client) AddDialer(d Dialer) {
329 cl.dialers = append(cl.dialers, d)
330 for _, t := range cl.torrents {
335 func (cl *Client) Listeners() []Listener {
339 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
341 func (cl *Client) AddListener(l Listener) {
342 cl.listeners = append(cl.listeners, l)
343 if cl.config.AcceptPeerConnections {
344 go cl.acceptConnections(l)
348 func (cl *Client) firewallCallback(net.Addr) bool {
350 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
353 torrent.Add("connections firewalled", 1)
355 torrent.Add("connections not firewalled", 1)
360 func (cl *Client) listenOnNetwork(n network) bool {
361 if n.Ipv4 && cl.config.DisableIPv4 {
364 if n.Ipv6 && cl.config.DisableIPv6 {
367 if n.Tcp && cl.config.DisableTCP {
370 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
376 func (cl *Client) listenNetworks() (ns []network) {
377 for _, n := range allPeerNetworks {
378 if cl.listenOnNetwork(n) {
385 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
386 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
387 cfg := dht.ServerConfig{
388 IPBlocklist: cl.ipBlockList,
390 OnAnnouncePeer: cl.onDHTAnnouncePeer,
391 PublicIP: func() net.IP {
392 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
393 return cl.config.PublicIp6
395 return cl.config.PublicIp4
397 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
398 OnQuery: cl.config.DHTOnQuery,
399 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
401 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
404 s, err = dht.NewServer(&cfg)
407 ts, err := s.Bootstrap()
409 cl.logger.Printf("error bootstrapping dht: %s", err)
411 log.Fstr("%v completed bootstrap (%+v)", s, ts).AddValues(s, ts).Log(cl.logger)
417 func (cl *Client) Closed() events.Done {
418 return cl.closed.Done()
421 func (cl *Client) eachDhtServer(f func(DhtServer)) {
422 for _, ds := range cl.dhtServers {
427 // Stops the client. All connections to peers are closed and all activity will come to a halt.
428 func (cl *Client) Close() (errs []error) {
429 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
431 for _, t := range cl.torrents {
432 err := t.close(&closeGroup)
434 errs = append(errs, err)
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
443 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
447 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
448 if cl.ipBlockList == nil {
451 return cl.ipBlockList.Lookup(ip)
454 func (cl *Client) ipIsBlocked(ip net.IP) bool {
455 _, blocked := cl.ipBlockRange(ip)
459 func (cl *Client) wantConns() bool {
460 if cl.config.AlwaysWantConns {
463 for _, t := range cl.torrents {
471 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
472 func (cl *Client) rejectAccepted(conn net.Conn) error {
474 return errors.New("don't want conns right now")
476 ra := conn.RemoteAddr()
477 if rip := addrIpOrNil(ra); rip != nil {
478 if cl.config.DisableIPv4Peers && rip.To4() != nil {
479 return errors.New("ipv4 peers disabled")
481 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
482 return errors.New("ipv4 disabled")
484 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
485 return errors.New("ipv6 disabled")
487 if cl.rateLimitAccept(rip) {
488 return errors.New("source IP accepted rate limited")
490 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
491 return errors.New("bad source addr")
497 func (cl *Client) acceptConnections(l Listener) {
499 conn, err := l.Accept()
500 torrent.Add("client listener accepts", 1)
501 conn = pproffd.WrapNetConn(conn)
503 closed := cl.closed.IsSet()
505 if !closed && conn != nil {
506 reject = cl.rejectAccepted(conn)
516 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
521 torrent.Add("rejected accepted connections", 1)
522 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
525 go cl.incomingConnection(conn)
527 log.Fmsg("accepted %q connection at %q from %q",
531 ).SetLevel(log.Debug).Log(cl.logger)
532 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
533 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
534 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
539 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
540 func regularNetConnPeerConnConnString(nc net.Conn) string {
541 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
544 func (cl *Client) incomingConnection(nc net.Conn) {
546 if tc, ok := nc.(*net.TCPConn); ok {
549 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
550 regularNetConnPeerConnConnString(nc))
556 c.Discovery = PeerSourceIncoming
557 cl.runReceivedConn(c)
560 // Returns a handle to the given torrent, if it's present in the client.
561 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
564 t, ok = cl.torrents[ih]
568 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
569 return cl.torrents[ih]
572 type DialResult struct {
577 func countDialResult(err error) {
579 torrent.Add("successful dials", 1)
581 torrent.Add("unsuccessful dials", 1)
585 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
586 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
587 if ret < minDialTimeout {
593 // Returns whether an address is known to connect to a client with our own ID.
594 func (cl *Client) dopplegangerAddr(addr string) bool {
595 _, ok := cl.dopplegangerAddrs[addr]
599 // Returns a connection over UTP or TCP, whichever is first to connect.
600 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
601 return DialFirst(ctx, addr, cl.dialers)
604 // Returns a connection over UTP or TCP, whichever is first to connect.
605 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
607 t := perf.NewTimer(perf.CallerName(0))
610 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
612 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
616 ctx, cancel := context.WithCancel(ctx)
617 // As soon as we return one connection, cancel the others.
620 resCh := make(chan DialResult, left)
621 for _, _s := range dialers {
626 dialFromSocket(ctx, s, addr),
631 // Wait for a successful connection.
633 defer perf.ScopeTimer()()
634 for ; left > 0 && res.Conn == nil; left-- {
638 // There are still incompleted dials.
640 for ; left > 0; left-- {
641 conn := (<-resCh).Conn
648 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
653 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
654 c, err := s.Dial(ctx, addr)
655 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
656 // it now in case we close the connection forthwith.
657 if tc, ok := c.(*net.TCPConn); ok {
664 func forgettableDialError(err error) bool {
665 return strings.Contains(err.Error(), "no suitable address found")
668 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
669 if _, ok := t.halfOpen[addr]; !ok {
670 panic("invariant broken")
672 delete(t.halfOpen, addr)
674 for _, t := range cl.torrents {
679 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
680 // for valid reasons.
681 func (cl *Client) initiateProtocolHandshakes(
685 outgoing, encryptHeader bool,
686 remoteAddr PeerRemoteAddr,
687 network, connString string,
689 c *PeerConn, err error,
691 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
692 c.headerEncrypted = encryptHeader
693 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
695 dl, ok := ctx.Deadline()
699 err = nc.SetDeadline(dl)
703 err = cl.initiateHandshakes(c, t)
707 // Returns nil connection and nil error if no connection could be established for valid reasons.
708 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
709 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
712 return t.dialTimeout()
715 dr := cl.dialFirst(dialCtx, addr.String())
718 if dialCtx.Err() != nil {
719 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
721 return nil, errors.New("dial failed")
723 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
730 // Returns nil connection and nil error if no connection could be established
731 // for valid reasons.
732 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
733 torrent.Add("establish outgoing connection", 1)
734 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
735 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
737 torrent.Add("initiated conn with preferred header obfuscation", 1)
740 // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
741 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
742 // We should have just tried with the preferred header obfuscation. If it was required,
743 // there's nothing else to try.
746 // Try again with encryption if we didn't earlier, or without if we did.
747 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
749 torrent.Add("initiated conn with fallback header obfuscation", 1)
751 // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
755 // Called to dial out and run a connection. The addr we're given is already
756 // considered half-open.
757 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
758 cl.dialRateLimiter.Wait(context.Background())
759 c, err := cl.establishOutgoingConn(t, addr)
761 c.conn.SetWriteDeadline(time.Time{})
765 // Don't release lock between here and addPeerConn, unless it's for
767 cl.noLongerHalfOpen(t, addr.String())
770 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
777 t.runHandshookConnLoggingErr(c)
780 // The port number for incoming peer connections. 0 if the client isn't listening.
781 func (cl *Client) incomingPeerPort() int {
782 return cl.LocalPort()
785 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
786 if c.headerEncrypted {
789 rw, c.cryptoMethod, err = mse.InitiateHandshake(
796 cl.config.CryptoProvides,
800 return fmt.Errorf("header obfuscation handshake: %w", err)
803 ih, err := cl.connBtHandshake(c, &t.infoHash)
805 return fmt.Errorf("bittorrent protocol handshake: %w", err)
807 if ih != t.infoHash {
808 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
813 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
814 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
815 func (cl *Client) forSkeys(f func([]byte) bool) {
818 if false { // Emulate the bug from #114
820 for ih := range cl.torrents {
824 for range cl.torrents {
831 for ih := range cl.torrents {
838 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
839 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
845 // Do encryption and bittorrent handshakes as receiver.
846 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
847 defer perf.ScopeTimerErr(&err)()
849 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
851 if err == nil || err == mse.ErrNoSecretKeyMatch {
852 if c.headerEncrypted {
853 torrent.Add("handshakes received encrypted", 1)
855 torrent.Add("handshakes received unencrypted", 1)
858 torrent.Add("handshakes received with error while handling encryption", 1)
861 if err == mse.ErrNoSecretKeyMatch {
866 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
867 err = errors.New("connection does not have required header obfuscation")
870 ih, err := cl.connBtHandshake(c, nil)
872 return nil, fmt.Errorf("during bt handshake: %w", err)
880 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
884 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
885 &successfulPeerWireProtocolHandshakePeerReservedBytes)
888 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
889 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
893 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
895 c.PeerExtensionBytes = res.PeerExtensionBits
896 c.PeerID = res.PeerID
897 c.completedHandshake = time.Now()
898 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
904 func (cl *Client) runReceivedConn(c *PeerConn) {
905 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
909 t, err := cl.receiveHandshakes(c)
912 "error receiving handshakes on %v: %s", c, err,
913 ).SetLevel(log.Debug).
915 "network", c.Network,
917 torrent.Add("error receiving handshake", 1)
919 cl.onBadAccept(c.RemoteAddr)
924 torrent.Add("received handshake for unloaded torrent", 1)
925 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
927 cl.onBadAccept(c.RemoteAddr)
931 torrent.Add("received handshake for loaded torrent", 1)
932 c.conn.SetWriteDeadline(time.Time{})
935 t.runHandshookConnLoggingErr(c)
938 // Client lock must be held before entering this.
939 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
941 for i, b := range cl.config.MinPeerExtensions {
942 if c.PeerExtensionBytes[i]&b != b {
943 return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
946 if c.PeerID == cl.peerID {
949 addr := c.RemoteAddr.String()
950 cl.dopplegangerAddrs[addr] = struct{}{}
952 // Because the remote address is not necessarily the same as its client's torrent listen
953 // address, we won't record the remote address as a doppleganger. Instead, the initiator
954 // can record *us* as the doppleganger.
956 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
959 c.r = deadlineReader{c.conn, c.r}
960 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
961 if connIsIpv6(c.conn) {
962 torrent.Add("completed handshake over ipv6", 1)
964 if err := t.addPeerConn(c); err != nil {
965 return fmt.Errorf("adding connection: %w", err)
967 defer t.dropConnection(c)
969 cl.sendInitialMessages(c, t)
970 c.initUpdateRequestsTimer()
971 err := c.mainReadLoop()
973 return fmt.Errorf("main read loop: %w", err)
980 func (p *Peer) initUpdateRequestsTimer() {
982 if p.updateRequestsTimer != nil {
983 panic(p.updateRequestsTimer)
986 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
987 p.updateRequestsTimer.Stop()
990 func (c *Peer) updateRequestsTimerFunc() {
992 defer c.locker().Unlock()
993 if c.closed.IsSet() {
996 if c.needRequestUpdate != "" {
999 if c.isLowOnRequests() {
1000 // If there are no outstanding requests, then a request update should have already run.
1003 c.updateRequests("updateRequestsTimer")
1006 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
1007 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
1008 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
1009 const localClientReqq = 1 << 5
1011 // See the order given in Transmission's tr_peerMsgsNew.
1012 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1013 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1014 conn.write(pp.Message{
1016 ExtendedID: pp.HandshakeExtendedID,
1017 ExtendedPayload: func() []byte {
1018 msg := pp.ExtendedHandshakeMessage{
1019 M: map[pp.ExtensionName]pp.ExtensionNumber{
1020 pp.ExtensionNameMetadata: metadataExtendedId,
1022 V: cl.config.ExtendedHandshakeClientVersion,
1023 Reqq: localClientReqq,
1024 YourIp: pp.CompactIp(conn.remoteIp()),
1025 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1026 Port: cl.incomingPeerPort(),
1027 MetadataSize: torrent.metadataSize(),
1028 // TODO: We can figured these out specific to the socket
1030 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1031 Ipv6: cl.config.PublicIp6.To16(),
1033 if !cl.config.DisablePEX {
1034 msg.M[pp.ExtensionNamePex] = pexExtendedId
1036 return bencode.MustMarshal(msg)
1041 if conn.fastEnabled() {
1042 if torrent.haveAllPieces() {
1043 conn.write(pp.Message{Type: pp.HaveAll})
1044 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1046 } else if !torrent.haveAnyPieces() {
1047 conn.write(pp.Message{Type: pp.HaveNone})
1048 conn.sentHaves.Clear()
1054 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1055 conn.write(pp.Message{
1062 func (cl *Client) dhtPort() (ret uint16) {
1063 if len(cl.dhtServers) == 0 {
1066 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1069 func (cl *Client) haveDhtServer() bool {
1070 return len(cl.dhtServers) > 0
1073 // Process incoming ut_metadata message.
1074 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1075 var d pp.ExtendedMetadataRequestMsg
1076 err := bencode.Unmarshal(payload, &d)
1077 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1078 } else if err != nil {
1079 return fmt.Errorf("error unmarshalling bencode: %s", err)
1083 case pp.DataMetadataExtensionMsgType:
1084 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1085 if !c.requestedMetadataPiece(piece) {
1086 return fmt.Errorf("got unexpected piece %d", piece)
1088 c.metadataRequests[piece] = false
1089 begin := len(payload) - d.PieceSize()
1090 if begin < 0 || begin >= len(payload) {
1091 return fmt.Errorf("data has bad offset in payload: %d", begin)
1093 t.saveMetadataPiece(piece, payload[begin:])
1094 c.lastUsefulChunkReceived = time.Now()
1095 err = t.maybeCompleteMetadata()
1097 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1098 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1099 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1100 // log consumers can filter for this message.
1101 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1104 case pp.RequestMetadataExtensionMsgType:
1105 if !t.haveMetadataPiece(piece) {
1106 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1109 start := (1 << 14) * piece
1110 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1111 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1113 case pp.RejectMetadataExtensionMsgType:
1116 return errors.New("unknown msg_type value")
1120 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1121 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1122 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1127 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1131 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1134 if _, ok := cl.ipBlockRange(ip); ok {
1137 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1143 // Return a Torrent ready for insertion into a Client.
1144 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1145 return cl.newTorrentOpt(AddTorrentOpts{
1147 Storage: specStorage,
1151 // Return a Torrent ready for insertion into a Client.
1152 func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
1153 // use provided storage, if provided
1154 storageClient := cl.defaultStorage
1155 if opts.Storage != nil {
1156 storageClient = storage.NewClient(opts.Storage)
1161 infoHash: opts.InfoHash,
1162 peers: prioritizedPeers{
1164 getPrio: func(p PeerInfo) peerPriority {
1166 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1169 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1171 halfOpen: make(map[string]PeerInfo),
1172 pieceStateChanges: pubsub.NewPubSub(),
1174 storageOpener: storageClient,
1175 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1177 metadataChanged: sync.Cond{
1180 webSeeds: make(map[string]*Peer),
1181 gotMetainfoC: make(chan struct{}),
1183 t.networkingEnabled.Set()
1184 t.logger = cl.logger.WithContextValue(t)
1185 if opts.ChunkSize == 0 {
1186 opts.ChunkSize = defaultChunkSize
1188 t.setChunkSize(opts.ChunkSize)
1192 // A file-like handle to some torrent data resource.
1193 type Handle interface {
1200 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1201 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1204 // Adds a torrent by InfoHash with a custom Storage implementation.
1205 // If the torrent already exists then this Storage is ignored and the
1206 // existing torrent returned with `new` set to `false`
1207 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1210 t, ok := cl.torrents[infoHash]
1216 t = cl.newTorrent(infoHash, specStorage)
1217 cl.eachDhtServer(func(s DhtServer) {
1218 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1219 go t.dhtAnnouncer(s)
1222 cl.torrents[infoHash] = t
1223 cl.clearAcceptLimits()
1224 t.updateWantPeersEvent()
1225 // Tickle Client.waitAccept, new torrent may want conns.
1226 cl.event.Broadcast()
1230 // Adds a torrent by InfoHash with a custom Storage implementation.
1231 // If the torrent already exists then this Storage is ignored and the
1232 // existing torrent returned with `new` set to `false`
1233 func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
1234 infoHash := opts.InfoHash
1237 t, ok := cl.torrents[infoHash]
1243 t = cl.newTorrentOpt(opts)
1244 cl.eachDhtServer(func(s DhtServer) {
1245 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1246 go t.dhtAnnouncer(s)
1249 cl.torrents[infoHash] = t
1250 cl.clearAcceptLimits()
1251 t.updateWantPeersEvent()
1252 // Tickle Client.waitAccept, new torrent may want conns.
1253 cl.event.Broadcast()
1257 type AddTorrentOpts struct {
1259 Storage storage.ClientImpl
1260 ChunkSize pp.Integer
1263 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1264 // Torrent.MergeSpec.
1265 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1266 t, new = cl.AddTorrentOpt(AddTorrentOpts{
1267 InfoHash: spec.InfoHash,
1268 Storage: spec.Storage,
1269 ChunkSize: spec.ChunkSize,
1273 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1275 modSpec.ChunkSize = 0
1277 err = t.MergeSpec(&modSpec)
1278 if err != nil && new {
1284 type stringAddr string
1286 var _ net.Addr = stringAddr("")
1288 func (stringAddr) Network() string { return "" }
1289 func (me stringAddr) String() string { return string(me) }
1291 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1292 // spec.DisallowDataDownload/Upload will be read and applied
1293 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1294 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1295 if spec.DisplayName != "" {
1296 t.SetDisplayName(spec.DisplayName)
1298 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1299 if spec.InfoBytes != nil {
1300 err := t.SetInfoBytes(spec.InfoBytes)
1306 cl.AddDhtNodes(spec.DhtNodes)
1309 useTorrentSources(spec.Sources, t)
1310 for _, url := range spec.Webseeds {
1313 for _, peerAddr := range spec.PeerAddrs {
1315 Addr: stringAddr(peerAddr),
1316 Source: PeerSourceDirect,
1320 if spec.ChunkSize != 0 {
1321 panic("chunk size cannot be changed for existing Torrent")
1323 t.addTrackers(spec.Trackers)
1325 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1326 t.dataUploadDisallowed = spec.DisallowDataUpload
1330 func useTorrentSources(sources []string, t *Torrent) {
1331 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1332 ctx := context.Background()
1333 for i := 0; i < len(sources); i += 1 {
1336 if err := useTorrentSource(ctx, s, t); err != nil {
1337 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1339 t.logger.Printf("successfully used source %q", s)
1345 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1346 ctx, cancel := context.WithCancel(ctx)
1356 var req *http.Request
1357 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1360 var resp *http.Response
1361 if resp, err = http.DefaultClient.Do(req); err != nil {
1364 var mi metainfo.MetaInfo
1365 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1368 if ctx.Err() != nil {
1373 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1376 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1377 t, ok := cl.torrents[infoHash]
1379 err = fmt.Errorf("no such torrent")
1386 delete(cl.torrents, infoHash)
1390 func (cl *Client) allTorrentsCompleted() bool {
1391 for _, t := range cl.torrents {
1395 if !t.haveAllPieces() {
1402 // Returns true when all torrents are completely downloaded and false if the
1403 // client is stopped before that.
1404 func (cl *Client) WaitAll() bool {
1407 for !cl.allTorrentsCompleted() {
1408 if cl.closed.IsSet() {
1416 // Returns handles to all the torrents loaded in the Client.
1417 func (cl *Client) Torrents() []*Torrent {
1420 return cl.torrentsAsSlice()
1423 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1424 for _, t := range cl.torrents {
1425 ret = append(ret, t)
1430 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1431 spec, err := TorrentSpecFromMagnetUri(uri)
1435 T, _, err = cl.AddTorrentSpec(spec)
1439 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1440 ts, err := TorrentSpecFromMetaInfoErr(mi)
1444 T, _, err = cl.AddTorrentSpec(ts)
1448 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1449 mi, err := metainfo.LoadFromFile(filename)
1453 return cl.AddTorrent(mi)
1456 func (cl *Client) DhtServers() []DhtServer {
1457 return cl.dhtServers
1460 func (cl *Client) AddDhtNodes(nodes []string) {
1461 for _, n := range nodes {
1462 hmp := missinggo.SplitHostMaybePort(n)
1463 ip := net.ParseIP(hmp.Host)
1465 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1468 ni := krpc.NodeInfo{
1469 Addr: krpc.NodeAddr{
1474 cl.eachDhtServer(func(s DhtServer) {
1480 func (cl *Client) banPeerIP(ip net.IP) {
1481 cl.logger.Printf("banning ip %v", ip)
1482 if cl.badPeerIPs == nil {
1483 cl.badPeerIPs = make(map[string]struct{})
1485 cl.badPeerIPs[ip.String()] = struct{}{}
1488 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1497 PeerMaxRequests: 250,
1499 RemoteAddr: remoteAddr,
1501 callbacks: &cl.config.Callbacks,
1503 connString: connString,
1507 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1508 c.setRW(connStatsReadWriter{nc, c})
1509 c.r = &rateLimitedReader{
1510 l: cl.config.DownloadRateLimiter,
1513 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1514 for _, f := range cl.config.Callbacks.NewPeer {
1520 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1527 t.addPeers([]PeerInfo{{
1528 Addr: ipPortAddr{ip, port},
1529 Source: PeerSourceDhtAnnouncePeer,
1533 func firstNotNil(ips ...net.IP) net.IP {
1534 for _, ip := range ips {
1542 func (cl *Client) eachListener(f func(Listener) bool) {
1543 for _, s := range cl.listeners {
1550 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1551 for i := 0; i < len(cl.listeners); i += 1 {
1552 if ret = cl.listeners[i]; f(ret) {
1559 func (cl *Client) publicIp(peer net.IP) net.IP {
1560 // TODO: Use BEP 10 to determine how peers are seeing us.
1561 if peer.To4() != nil {
1563 cl.config.PublicIp4,
1564 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1569 cl.config.PublicIp6,
1570 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1574 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1575 l := cl.findListener(
1576 func(l Listener) bool {
1577 return f(addrIpOrNil(l.Addr()))
1583 return addrIpOrNil(l.Addr())
1586 // Our IP as a peer should see it.
1587 func (cl *Client) publicAddr(peer net.IP) IpPort {
1588 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1591 // ListenAddrs addresses currently being listened to.
1592 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1594 ret = make([]net.Addr, len(cl.listeners))
1595 for i := 0; i < len(cl.listeners); i += 1 {
1596 ret[i] = cl.listeners[i].Addr()
1602 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1603 ipa, ok := tryIpPortFromNetAddr(addr)
1607 ip := maskIpForAcceptLimiting(ipa.IP)
1608 if cl.acceptLimiter == nil {
1609 cl.acceptLimiter = make(map[ipStr]int)
1611 cl.acceptLimiter[ipStr(ip.String())]++
1614 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1615 if ip4 := ip.To4(); ip4 != nil {
1616 return ip4.Mask(net.CIDRMask(24, 32))
1621 func (cl *Client) clearAcceptLimits() {
1622 cl.acceptLimiter = nil
1625 func (cl *Client) acceptLimitClearer() {
1628 case <-cl.closed.Done():
1630 case <-time.After(15 * time.Minute):
1632 cl.clearAcceptLimits()
1638 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1639 if cl.config.DisableAcceptRateLimiting {
1642 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1645 func (cl *Client) rLock() {
1649 func (cl *Client) rUnlock() {
1653 func (cl *Client) lock() {
1657 func (cl *Client) unlock() {
1661 func (cl *Client) locker() *lockWithDeferreds {
1665 func (cl *Client) String() string {
1666 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1669 // Returns connection-level aggregate stats at the Client level. See the comment on
1670 // TorrentStats.ConnStats.
1671 func (cl *Client) ConnStats() ConnStats {
1672 return cl.stats.Copy()