20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 updateRequests chansync.BroadcastCond
91 func (cl *Client) BadPeerIPs() (ips []string) {
93 ips = cl.badPeerIPsLocked()
98 func (cl *Client) badPeerIPsLocked() (ips []string) {
99 ips = make([]string, len(cl.badPeerIPs))
101 for k := range cl.badPeerIPs {
108 func (cl *Client) PeerID() PeerID {
112 // Returns the port number for the first listener that has one. No longer assumes that all port
113 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 func (cl *Client) LocalPort() (port int) {
116 for i := 0; i < len(cl.listeners); i += 1 {
117 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
124 func writeDhtServerStatus(w io.Writer, s DhtServer) {
125 dhtStats := s.Stats()
126 fmt.Fprintf(w, " ID: %x\n", s.ID())
127 spew.Fdump(w, dhtStats)
130 // Writes out a human readable status of the client, such as for writing to a
132 func (cl *Client) WriteStatus(_w io.Writer) {
135 w := bufio.NewWriter(_w)
137 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
138 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
139 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
140 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
141 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
142 cl.eachDhtServer(func(s DhtServer) {
143 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
144 writeDhtServerStatus(w, s)
146 spew.Fdump(w, &cl.stats)
147 torrentsSlice := cl.torrentsAsSlice()
148 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150 sort.Slice(torrentsSlice, func(l, r int) bool {
151 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153 for _, t := range torrentsSlice {
155 fmt.Fprint(w, "<unknown name>")
157 fmt.Fprint(w, t.name())
163 "%f%% of %d bytes (%s)",
164 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166 humanize.Bytes(uint64(*t.length)))
168 w.WriteString("<missing metainfo>")
176 // Filters things that are less than warning from UPnP discovery.
177 func upnpDiscoverLogFilter(m log.Msg) bool {
178 level, ok := m.GetLevel()
179 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
182 func (cl *Client) initLogger() {
183 logger := cl.config.Logger
186 if !cl.config.Debug {
187 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
190 cl.logger = logger.WithValues(cl)
193 func (cl *Client) announceKey() int32 {
194 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
197 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
198 func (cl *Client) init(cfg *ClientConfig) {
200 cl.dopplegangerAddrs = make(map[string]struct{})
201 cl.torrents = make(map[metainfo.Hash]*Torrent)
202 cl.dialRateLimiter = rate.NewLimiter(10, 10)
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
211 cfg = NewDefaultClientConfig()
217 go cl.acceptLimitClearer()
226 storageImpl := cfg.DefaultStorage
227 if storageImpl == nil {
228 // We'd use mmap by default but HFS+ doesn't support sparse files.
229 storageImplCloser := storage.NewFile(cfg.DataDir)
230 cl.onClose = append(cl.onClose, func() {
231 if err := storageImplCloser.Close(); err != nil {
232 cl.logger.Printf("error closing default storage: %s", err)
235 storageImpl = storageImplCloser
237 cl.defaultStorage = storage.NewClient(storageImpl)
239 if cfg.PeerID != "" {
240 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
242 o := copy(cl.peerID[:], cfg.Bep20)
243 _, err = rand.Read(cl.peerID[o:])
245 panic("error generating peer id")
249 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
257 for _, _s := range sockets {
258 s := _s // Go is fucking retarded.
259 cl.onClose = append(cl.onClose, func() { s.Close() })
260 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261 cl.dialers = append(cl.dialers, s)
262 cl.listeners = append(cl.listeners, s)
263 if cl.config.AcceptPeerConnections {
264 go cl.acceptConnections(s)
271 for _, s := range sockets {
272 if pc, ok := s.(net.PacketConn); ok {
273 ds, err := cl.NewAnacrolixDhtServer(pc)
277 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278 cl.onClose = append(cl.onClose, func() { ds.Close() })
283 cl.websocketTrackers = websocketTrackers{
286 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
289 t, ok := cl.torrents[infoHash]
291 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
293 return t.announceRequest(event), nil
295 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
298 t, ok := cl.torrents[dcc.InfoHash]
300 cl.logger.WithDefaultLevel(log.Warning).Printf(
301 "got webrtc conn for unloaded torrent with infohash %x",
307 go t.onWebRtcConn(dc, dcc)
314 func (cl *Client) AddDhtServer(d DhtServer) {
315 cl.dhtServers = append(cl.dhtServers, d)
318 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
319 // given address for any Torrent.
320 func (cl *Client) AddDialer(d Dialer) {
323 cl.dialers = append(cl.dialers, d)
324 for _, t := range cl.torrents {
329 func (cl *Client) Listeners() []Listener {
333 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 func (cl *Client) AddListener(l Listener) {
336 cl.listeners = append(cl.listeners, l)
337 if cl.config.AcceptPeerConnections {
338 go cl.acceptConnections(l)
342 func (cl *Client) firewallCallback(net.Addr) bool {
344 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
347 torrent.Add("connections firewalled", 1)
349 torrent.Add("connections not firewalled", 1)
354 func (cl *Client) listenOnNetwork(n network) bool {
355 if n.Ipv4 && cl.config.DisableIPv4 {
358 if n.Ipv6 && cl.config.DisableIPv6 {
361 if n.Tcp && cl.config.DisableTCP {
364 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
370 func (cl *Client) listenNetworks() (ns []network) {
371 for _, n := range allPeerNetworks {
372 if cl.listenOnNetwork(n) {
379 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
380 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
381 cfg := dht.ServerConfig{
382 IPBlocklist: cl.ipBlockList,
384 OnAnnouncePeer: cl.onDHTAnnouncePeer,
385 PublicIP: func() net.IP {
386 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
387 return cl.config.PublicIp6
389 return cl.config.PublicIp4
391 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
392 OnQuery: cl.config.DHTOnQuery,
393 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
398 s, err = dht.NewServer(&cfg)
401 ts, err := s.Bootstrap()
403 cl.logger.Printf("error bootstrapping dht: %s", err)
405 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
411 func (cl *Client) Closed() events.Done {
412 return cl.closed.Done()
415 func (cl *Client) eachDhtServer(f func(DhtServer)) {
416 for _, ds := range cl.dhtServers {
421 // Stops the client. All connections to peers are closed and all activity will
423 func (cl *Client) Close() (errs []error) {
425 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
428 for _, t := range cl.torrents {
429 err := t.close(&closeGroup)
431 errs = append(errs, err)
435 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
444 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
445 if cl.ipBlockList == nil {
448 return cl.ipBlockList.Lookup(ip)
451 func (cl *Client) ipIsBlocked(ip net.IP) bool {
452 _, blocked := cl.ipBlockRange(ip)
456 func (cl *Client) wantConns() bool {
457 if cl.config.AlwaysWantConns {
460 for _, t := range cl.torrents {
468 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
469 func (cl *Client) rejectAccepted(conn net.Conn) error {
471 return errors.New("don't want conns right now")
473 ra := conn.RemoteAddr()
474 if rip := addrIpOrNil(ra); rip != nil {
475 if cl.config.DisableIPv4Peers && rip.To4() != nil {
476 return errors.New("ipv4 peers disabled")
478 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
479 return errors.New("ipv4 disabled")
482 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
483 return errors.New("ipv6 disabled")
485 if cl.rateLimitAccept(rip) {
486 return errors.New("source IP accepted rate limited")
488 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
489 return errors.New("bad source addr")
495 func (cl *Client) acceptConnections(l Listener) {
497 conn, err := l.Accept()
498 torrent.Add("client listener accepts", 1)
499 conn = pproffd.WrapNetConn(conn)
501 closed := cl.closed.IsSet()
504 reject = cl.rejectAccepted(conn)
514 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add("rejected accepted connections", 1)
520 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
523 go cl.incomingConnection(conn)
525 log.Fmsg("accepted %q connection at %q from %q",
529 ).SetLevel(log.Debug).Log(cl.logger)
530 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
531 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
532 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
537 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
538 func regularNetConnPeerConnConnString(nc net.Conn) string {
539 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
542 func (cl *Client) incomingConnection(nc net.Conn) {
544 if tc, ok := nc.(*net.TCPConn); ok {
547 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
548 regularNetConnPeerConnConnString(nc))
554 c.Discovery = PeerSourceIncoming
555 cl.runReceivedConn(c)
558 // Returns a handle to the given torrent, if it's present in the client.
559 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
562 t, ok = cl.torrents[ih]
566 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
567 return cl.torrents[ih]
570 type DialResult struct {
575 func countDialResult(err error) {
577 torrent.Add("successful dials", 1)
579 torrent.Add("unsuccessful dials", 1)
583 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
584 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
585 if ret < minDialTimeout {
591 // Returns whether an address is known to connect to a client with our own ID.
592 func (cl *Client) dopplegangerAddr(addr string) bool {
593 _, ok := cl.dopplegangerAddrs[addr]
597 // Returns a connection over UTP or TCP, whichever is first to connect.
598 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
599 return DialFirst(ctx, addr, cl.dialers)
602 // Returns a connection over UTP or TCP, whichever is first to connect.
603 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605 t := perf.NewTimer(perf.CallerName(0))
608 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
614 ctx, cancel := context.WithCancel(ctx)
615 // As soon as we return one connection, cancel the others.
618 resCh := make(chan DialResult, left)
619 for _, _s := range dialers {
624 dialFromSocket(ctx, s, addr),
629 // Wait for a successful connection.
631 defer perf.ScopeTimer()()
632 for ; left > 0 && res.Conn == nil; left-- {
636 // There are still incompleted dials.
638 for ; left > 0; left-- {
639 conn := (<-resCh).Conn
646 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
651 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
662 func forgettableDialError(err error) bool {
663 return strings.Contains(err.Error(), "no suitable address found")
666 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
667 if _, ok := t.halfOpen[addr]; !ok {
668 panic("invariant broken")
670 delete(t.halfOpen, addr)
672 for _, t := range cl.torrents {
677 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
678 // for valid reasons.
679 func (cl *Client) initiateProtocolHandshakes(
683 outgoing, encryptHeader bool,
684 remoteAddr PeerRemoteAddr,
685 network, connString string,
687 c *PeerConn, err error,
689 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
690 c.headerEncrypted = encryptHeader
691 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693 dl, ok := ctx.Deadline()
697 err = nc.SetDeadline(dl)
701 err = cl.initiateHandshakes(c, t)
705 // Returns nil connection and nil error if no connection could be established for valid reasons.
706 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
707 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
710 return t.dialTimeout()
713 dr := cl.dialFirst(dialCtx, addr.String())
716 if dialCtx.Err() != nil {
717 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719 return nil, errors.New("dial failed")
721 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
728 // Returns nil connection and nil error if no connection could be established
729 // for valid reasons.
730 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
731 torrent.Add("establish outgoing connection", 1)
732 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
733 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with preferred header obfuscation", 1)
738 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
739 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
740 // We should have just tried with the preferred header obfuscation. If it was required,
741 // there's nothing else to try.
744 // Try again with encryption if we didn't earlier, or without if we did.
745 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with fallback header obfuscation", 1)
749 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
753 // Called to dial out and run a connection. The addr we're given is already
754 // considered half-open.
755 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
756 cl.dialRateLimiter.Wait(context.Background())
757 c, err := cl.establishOutgoingConn(t, addr)
760 // Don't release lock between here and addPeerConn, unless it's for
762 cl.noLongerHalfOpen(t, addr.String())
765 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
772 t.runHandshookConnLoggingErr(c)
775 // The port number for incoming peer connections. 0 if the client isn't listening.
776 func (cl *Client) incomingPeerPort() int {
777 return cl.LocalPort()
780 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
781 if c.headerEncrypted {
784 rw, c.cryptoMethod, err = mse.InitiateHandshake(
791 cl.config.CryptoProvides,
795 return fmt.Errorf("header obfuscation handshake: %w", err)
798 ih, err := cl.connBtHandshake(c, &t.infoHash)
800 return fmt.Errorf("bittorrent protocol handshake: %w", err)
802 if ih != t.infoHash {
803 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
808 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
809 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
810 func (cl *Client) forSkeys(f func([]byte) bool) {
813 if false { // Emulate the bug from #114
815 for ih := range cl.torrents {
819 for range cl.torrents {
826 for ih := range cl.torrents {
833 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
834 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
840 // Do encryption and bittorrent handshakes as receiver.
841 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
842 defer perf.ScopeTimerErr(&err)()
844 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
846 if err == nil || err == mse.ErrNoSecretKeyMatch {
847 if c.headerEncrypted {
848 torrent.Add("handshakes received encrypted", 1)
850 torrent.Add("handshakes received unencrypted", 1)
853 torrent.Add("handshakes received with error while handling encryption", 1)
856 if err == mse.ErrNoSecretKeyMatch {
861 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
862 err = errors.New("connection does not have required header obfuscation")
865 ih, err := cl.connBtHandshake(c, nil)
867 return nil, fmt.Errorf("during bt handshake: %w", err)
875 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
879 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
880 &successfulPeerWireProtocolHandshakePeerReservedBytes)
883 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
884 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
888 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
890 c.PeerExtensionBytes = res.PeerExtensionBits
891 c.PeerID = res.PeerID
892 c.completedHandshake = time.Now()
893 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
899 func (cl *Client) runReceivedConn(c *PeerConn) {
900 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
904 t, err := cl.receiveHandshakes(c)
907 "error receiving handshakes on %v: %s", c, err,
908 ).SetLevel(log.Debug).
910 "network", c.Network,
912 torrent.Add("error receiving handshake", 1)
914 cl.onBadAccept(c.RemoteAddr)
919 torrent.Add("received handshake for unloaded torrent", 1)
920 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
922 cl.onBadAccept(c.RemoteAddr)
926 torrent.Add("received handshake for loaded torrent", 1)
929 t.runHandshookConnLoggingErr(c)
932 // Client lock must be held before entering this.
933 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
935 if c.PeerID == cl.peerID {
938 addr := c.conn.RemoteAddr().String()
939 cl.dopplegangerAddrs[addr] = struct{}{}
941 // Because the remote address is not necessarily the same as its client's torrent listen
942 // address, we won't record the remote address as a doppleganger. Instead, the initiator
943 // can record *us* as the doppleganger.
945 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
948 c.conn.SetWriteDeadline(time.Time{})
949 c.r = deadlineReader{c.conn, c.r}
950 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
951 if connIsIpv6(c.conn) {
952 torrent.Add("completed handshake over ipv6", 1)
954 if err := t.addPeerConn(c); err != nil {
955 return fmt.Errorf("adding connection: %w", err)
957 defer t.dropConnection(c)
959 cl.sendInitialMessages(c, t)
960 c.initUpdateRequestsTimer()
961 err := c.mainReadLoop()
963 return fmt.Errorf("main read loop: %w", err)
970 func (p *Peer) initUpdateRequestsTimer() {
972 if p.updateRequestsTimer != nil {
973 panic(p.updateRequestsTimer)
976 p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
977 p.updateRequestsTimer.Stop()
980 func (c *Peer) updateRequestsTimerFunc() {
982 defer c.locker().Unlock()
983 if c.needRequestUpdate != "" {
986 if c.actualRequestState.Requests.IsEmpty() {
987 // If there are no outstanding requests, then a request update should have already run.
990 c.updateRequests("updateRequestsTimer")
993 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
994 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
995 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
996 const localClientReqq = 1 << 5
998 // See the order given in Transmission's tr_peerMsgsNew.
999 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
1000 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
1001 conn.write(pp.Message{
1003 ExtendedID: pp.HandshakeExtendedID,
1004 ExtendedPayload: func() []byte {
1005 msg := pp.ExtendedHandshakeMessage{
1006 M: map[pp.ExtensionName]pp.ExtensionNumber{
1007 pp.ExtensionNameMetadata: metadataExtendedId,
1009 V: cl.config.ExtendedHandshakeClientVersion,
1010 Reqq: localClientReqq,
1011 YourIp: pp.CompactIp(conn.remoteIp()),
1012 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1013 Port: cl.incomingPeerPort(),
1014 MetadataSize: torrent.metadataSize(),
1015 // TODO: We can figured these out specific to the socket
1017 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1018 Ipv6: cl.config.PublicIp6.To16(),
1020 if !cl.config.DisablePEX {
1021 msg.M[pp.ExtensionNamePex] = pexExtendedId
1023 return bencode.MustMarshal(msg)
1028 if conn.fastEnabled() {
1029 if torrent.haveAllPieces() {
1030 conn.write(pp.Message{Type: pp.HaveAll})
1031 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1033 } else if !torrent.haveAnyPieces() {
1034 conn.write(pp.Message{Type: pp.HaveNone})
1035 conn.sentHaves.Clear()
1041 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1042 conn.write(pp.Message{
1049 func (cl *Client) dhtPort() (ret uint16) {
1050 if len(cl.dhtServers) == 0 {
1053 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1056 func (cl *Client) haveDhtServer() bool {
1057 return len(cl.dhtServers) > 0
1060 // Process incoming ut_metadata message.
1061 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1062 var d pp.ExtendedMetadataRequestMsg
1063 err := bencode.Unmarshal(payload, &d)
1064 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1065 } else if err != nil {
1066 return fmt.Errorf("error unmarshalling bencode: %s", err)
1070 case pp.DataMetadataExtensionMsgType:
1071 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1072 if !c.requestedMetadataPiece(piece) {
1073 return fmt.Errorf("got unexpected piece %d", piece)
1075 c.metadataRequests[piece] = false
1076 begin := len(payload) - d.PieceSize()
1077 if begin < 0 || begin >= len(payload) {
1078 return fmt.Errorf("data has bad offset in payload: %d", begin)
1080 t.saveMetadataPiece(piece, payload[begin:])
1081 c.lastUsefulChunkReceived = time.Now()
1082 err = t.maybeCompleteMetadata()
1084 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1085 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1086 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1087 // log consumers can filter for this message.
1088 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1091 case pp.RequestMetadataExtensionMsgType:
1092 if !t.haveMetadataPiece(piece) {
1093 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1096 start := (1 << 14) * piece
1097 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1098 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1100 case pp.RejectMetadataExtensionMsgType:
1103 return errors.New("unknown msg_type value")
1107 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1108 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1109 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1114 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1118 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1121 if _, ok := cl.ipBlockRange(ip); ok {
1124 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1130 // Return a Torrent ready for insertion into a Client.
1131 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1132 return cl.newTorrentOpt(addTorrentOpts{
1134 Storage: specStorage,
1138 // Return a Torrent ready for insertion into a Client.
1139 func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) {
1140 // use provided storage, if provided
1141 storageClient := cl.defaultStorage
1142 if opts.Storage != nil {
1143 storageClient = storage.NewClient(opts.Storage)
1148 infoHash: opts.InfoHash,
1149 peers: prioritizedPeers{
1151 getPrio: func(p PeerInfo) peerPriority {
1153 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1156 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1158 halfOpen: make(map[string]PeerInfo),
1159 pieceStateChanges: pubsub.NewPubSub(),
1161 storageOpener: storageClient,
1162 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1164 metadataChanged: sync.Cond{
1167 webSeeds: make(map[string]*Peer),
1168 gotMetainfoC: make(chan struct{}),
1170 t.networkingEnabled.Set()
1171 t.logger = cl.logger.WithContextValue(t)
1172 if opts.ChunkSize == 0 {
1173 opts.ChunkSize = defaultChunkSize
1175 t.setChunkSize(opts.ChunkSize)
1179 // A file-like handle to some torrent data resource.
1180 type Handle interface {
1187 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1188 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1191 // Adds a torrent by InfoHash with a custom Storage implementation.
1192 // If the torrent already exists then this Storage is ignored and the
1193 // existing torrent returned with `new` set to `false`
1194 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1197 t, ok := cl.torrents[infoHash]
1203 t = cl.newTorrent(infoHash, specStorage)
1204 cl.eachDhtServer(func(s DhtServer) {
1205 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1206 go t.dhtAnnouncer(s)
1209 cl.torrents[infoHash] = t
1210 cl.clearAcceptLimits()
1211 t.updateWantPeersEvent()
1212 // Tickle Client.waitAccept, new torrent may want conns.
1213 cl.event.Broadcast()
1217 // Adds a torrent by InfoHash with a custom Storage implementation.
1218 // If the torrent already exists then this Storage is ignored and the
1219 // existing torrent returned with `new` set to `false`
1220 func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) {
1221 infoHash := opts.InfoHash
1224 t, ok := cl.torrents[infoHash]
1230 t = cl.newTorrentOpt(opts)
1231 cl.eachDhtServer(func(s DhtServer) {
1232 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1233 go t.dhtAnnouncer(s)
1236 cl.torrents[infoHash] = t
1237 cl.clearAcceptLimits()
1238 t.updateWantPeersEvent()
1239 // Tickle Client.waitAccept, new torrent may want conns.
1240 cl.event.Broadcast()
1244 type addTorrentOpts struct {
1246 Storage storage.ClientImpl
1247 ChunkSize pp.Integer
1250 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1251 // Torrent.MergeSpec.
1252 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1253 t, new = cl.AddTorrentOpt(addTorrentOpts{
1254 InfoHash: spec.InfoHash,
1255 Storage: spec.Storage,
1256 ChunkSize: spec.ChunkSize,
1260 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1262 modSpec.ChunkSize = 0
1264 err = t.MergeSpec(&modSpec)
1265 if err != nil && new {
1271 type stringAddr string
1273 var _ net.Addr = stringAddr("")
1275 func (stringAddr) Network() string { return "" }
1276 func (me stringAddr) String() string { return string(me) }
1278 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1279 // spec.DisallowDataDownload/Upload will be read and applied
1280 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1281 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1282 if spec.DisplayName != "" {
1283 t.SetDisplayName(spec.DisplayName)
1285 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1286 if spec.InfoBytes != nil {
1287 err := t.SetInfoBytes(spec.InfoBytes)
1293 cl.AddDhtNodes(spec.DhtNodes)
1296 useTorrentSources(spec.Sources, t)
1297 for _, url := range spec.Webseeds {
1300 for _, peerAddr := range spec.PeerAddrs {
1302 Addr: stringAddr(peerAddr),
1303 Source: PeerSourceDirect,
1307 if spec.ChunkSize != 0 {
1308 panic("chunk size cannot be changed for existing Torrent")
1310 t.addTrackers(spec.Trackers)
1312 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1313 t.dataUploadDisallowed = spec.DisallowDataUpload
1317 func useTorrentSources(sources []string, t *Torrent) {
1318 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1319 ctx := context.Background()
1320 for i := 0; i < len(sources); i += 1 {
1323 if err := useTorrentSource(ctx, s, t); err != nil {
1324 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1326 t.logger.Printf("successfully used source %q", s)
1332 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1333 ctx, cancel := context.WithCancel(ctx)
1343 var req *http.Request
1344 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1347 var resp *http.Response
1348 if resp, err = http.DefaultClient.Do(req); err != nil {
1351 var mi metainfo.MetaInfo
1352 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1355 if ctx.Err() != nil {
1360 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1363 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1364 t, ok := cl.torrents[infoHash]
1366 err = fmt.Errorf("no such torrent")
1373 delete(cl.torrents, infoHash)
1377 func (cl *Client) allTorrentsCompleted() bool {
1378 for _, t := range cl.torrents {
1382 if !t.haveAllPieces() {
1389 // Returns true when all torrents are completely downloaded and false if the
1390 // client is stopped before that.
1391 func (cl *Client) WaitAll() bool {
1394 for !cl.allTorrentsCompleted() {
1395 if cl.closed.IsSet() {
1403 // Returns handles to all the torrents loaded in the Client.
1404 func (cl *Client) Torrents() []*Torrent {
1407 return cl.torrentsAsSlice()
1410 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1411 for _, t := range cl.torrents {
1412 ret = append(ret, t)
1417 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1418 spec, err := TorrentSpecFromMagnetUri(uri)
1422 T, _, err = cl.AddTorrentSpec(spec)
1426 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1427 ts, err := TorrentSpecFromMetaInfoErr(mi)
1431 T, _, err = cl.AddTorrentSpec(ts)
1435 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1436 mi, err := metainfo.LoadFromFile(filename)
1440 return cl.AddTorrent(mi)
1443 func (cl *Client) DhtServers() []DhtServer {
1444 return cl.dhtServers
1447 func (cl *Client) AddDhtNodes(nodes []string) {
1448 for _, n := range nodes {
1449 hmp := missinggo.SplitHostMaybePort(n)
1450 ip := net.ParseIP(hmp.Host)
1452 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1455 ni := krpc.NodeInfo{
1456 Addr: krpc.NodeAddr{
1461 cl.eachDhtServer(func(s DhtServer) {
1467 func (cl *Client) banPeerIP(ip net.IP) {
1468 cl.logger.Printf("banning ip %v", ip)
1469 if cl.badPeerIPs == nil {
1470 cl.badPeerIPs = make(map[string]struct{})
1472 cl.badPeerIPs[ip.String()] = struct{}{}
1475 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1484 PeerMaxRequests: 250,
1486 RemoteAddr: remoteAddr,
1488 callbacks: &cl.config.Callbacks,
1490 connString: connString,
1494 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1495 c.setRW(connStatsReadWriter{nc, c})
1496 c.r = &rateLimitedReader{
1497 l: cl.config.DownloadRateLimiter,
1500 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1501 for _, f := range cl.config.Callbacks.NewPeer {
1507 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1514 t.addPeers([]PeerInfo{{
1515 Addr: ipPortAddr{ip, port},
1516 Source: PeerSourceDhtAnnouncePeer,
1520 func firstNotNil(ips ...net.IP) net.IP {
1521 for _, ip := range ips {
1529 func (cl *Client) eachListener(f func(Listener) bool) {
1530 for _, s := range cl.listeners {
1537 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1538 for i := 0; i < len(cl.listeners); i += 1 {
1539 if ret = cl.listeners[i]; f(ret) {
1546 func (cl *Client) publicIp(peer net.IP) net.IP {
1547 // TODO: Use BEP 10 to determine how peers are seeing us.
1548 if peer.To4() != nil {
1550 cl.config.PublicIp4,
1551 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1556 cl.config.PublicIp6,
1557 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1561 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1562 l := cl.findListener(
1563 func(l Listener) bool {
1564 return f(addrIpOrNil(l.Addr()))
1570 return addrIpOrNil(l.Addr())
1573 // Our IP as a peer should see it.
1574 func (cl *Client) publicAddr(peer net.IP) IpPort {
1575 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1578 // ListenAddrs addresses currently being listened to.
1579 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1581 ret = make([]net.Addr, len(cl.listeners))
1582 for i := 0; i < len(cl.listeners); i += 1 {
1583 ret[i] = cl.listeners[i].Addr()
1589 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1590 ipa, ok := tryIpPortFromNetAddr(addr)
1594 ip := maskIpForAcceptLimiting(ipa.IP)
1595 if cl.acceptLimiter == nil {
1596 cl.acceptLimiter = make(map[ipStr]int)
1598 cl.acceptLimiter[ipStr(ip.String())]++
1601 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1602 if ip4 := ip.To4(); ip4 != nil {
1603 return ip4.Mask(net.CIDRMask(24, 32))
1608 func (cl *Client) clearAcceptLimits() {
1609 cl.acceptLimiter = nil
1612 func (cl *Client) acceptLimitClearer() {
1615 case <-cl.closed.Done():
1617 case <-time.After(15 * time.Minute):
1619 cl.clearAcceptLimits()
1625 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1626 if cl.config.DisableAcceptRateLimiting {
1629 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1632 func (cl *Client) rLock() {
1636 func (cl *Client) rUnlock() {
1640 func (cl *Client) lock() {
1644 func (cl *Client) unlock() {
1648 func (cl *Client) locker() *lockWithDeferreds {
1652 func (cl *Client) String() string {
1653 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1656 // Returns connection-level aggregate stats at the Client level. See the comment on
1657 // TorrentStats.ConnStats.
1658 func (cl *Client) ConnStats() ConnStats {
1659 return cl.stats.Copy()