20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 updateRequests chansync.BroadcastCond
91 func (cl *Client) BadPeerIPs() (ips []string) {
93 ips = cl.badPeerIPsLocked()
98 func (cl *Client) badPeerIPsLocked() (ips []string) {
99 ips = make([]string, len(cl.badPeerIPs))
101 for k := range cl.badPeerIPs {
108 func (cl *Client) PeerID() PeerID {
112 // Returns the port number for the first listener that has one. No longer assumes that all port
113 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 func (cl *Client) LocalPort() (port int) {
116 for i := 0; i < len(cl.listeners); i += 1 {
117 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
124 func writeDhtServerStatus(w io.Writer, s DhtServer) {
125 dhtStats := s.Stats()
126 fmt.Fprintf(w, " ID: %x\n", s.ID())
127 spew.Fdump(w, dhtStats)
130 // Writes out a human readable status of the client, such as for writing to a
132 func (cl *Client) WriteStatus(_w io.Writer) {
135 w := bufio.NewWriter(_w)
137 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
138 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
139 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
140 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
141 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
142 cl.eachDhtServer(func(s DhtServer) {
143 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
144 writeDhtServerStatus(w, s)
146 spew.Fdump(w, &cl.stats)
147 torrentsSlice := cl.torrentsAsSlice()
148 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150 sort.Slice(torrentsSlice, func(l, r int) bool {
151 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153 for _, t := range torrentsSlice {
155 fmt.Fprint(w, "<unknown name>")
157 fmt.Fprint(w, t.name())
163 "%f%% of %d bytes (%s)",
164 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166 humanize.Bytes(uint64(*t.length)))
168 w.WriteString("<missing metainfo>")
176 // Filters things that are less than warning from UPnP discovery.
177 func upnpDiscoverLogFilter(m log.Msg) bool {
178 level, ok := m.GetLevel()
179 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
182 func (cl *Client) initLogger() {
183 logger := cl.config.Logger
186 if !cl.config.Debug {
187 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
190 cl.logger = logger.WithValues(cl)
193 func (cl *Client) announceKey() int32 {
194 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
197 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
198 func (cl *Client) init(cfg *ClientConfig) {
200 cl.dopplegangerAddrs = make(map[string]struct{})
201 cl.torrents = make(map[metainfo.Hash]*Torrent)
202 cl.dialRateLimiter = rate.NewLimiter(10, 10)
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
211 cfg = NewDefaultClientConfig()
217 go cl.acceptLimitClearer()
226 storageImpl := cfg.DefaultStorage
227 if storageImpl == nil {
228 // We'd use mmap by default but HFS+ doesn't support sparse files.
229 storageImplCloser := storage.NewFile(cfg.DataDir)
230 cl.onClose = append(cl.onClose, func() {
231 if err := storageImplCloser.Close(); err != nil {
232 cl.logger.Printf("error closing default storage: %s", err)
235 storageImpl = storageImplCloser
237 cl.defaultStorage = storage.NewClient(storageImpl)
239 if cfg.PeerID != "" {
240 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
242 o := copy(cl.peerID[:], cfg.Bep20)
243 _, err = rand.Read(cl.peerID[o:])
245 panic("error generating peer id")
249 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
257 for _, _s := range sockets {
258 s := _s // Go is fucking retarded.
259 cl.onClose = append(cl.onClose, func() { s.Close() })
260 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261 cl.dialers = append(cl.dialers, s)
262 cl.listeners = append(cl.listeners, s)
263 if cl.config.AcceptPeerConnections {
264 go cl.acceptConnections(s)
271 for _, s := range sockets {
272 if pc, ok := s.(net.PacketConn); ok {
273 ds, err := cl.NewAnacrolixDhtServer(pc)
277 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278 cl.onClose = append(cl.onClose, func() { ds.Close() })
283 cl.websocketTrackers = websocketTrackers{
286 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
289 t, ok := cl.torrents[infoHash]
291 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
293 return t.announceRequest(event), nil
295 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
298 t, ok := cl.torrents[dcc.InfoHash]
300 cl.logger.WithDefaultLevel(log.Warning).Printf(
301 "got webrtc conn for unloaded torrent with infohash %x",
307 go t.onWebRtcConn(dc, dcc)
314 func (cl *Client) AddDhtServer(d DhtServer) {
315 cl.dhtServers = append(cl.dhtServers, d)
318 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
319 // given address for any Torrent.
320 func (cl *Client) AddDialer(d Dialer) {
323 cl.dialers = append(cl.dialers, d)
324 for _, t := range cl.torrents {
329 func (cl *Client) Listeners() []Listener {
333 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 func (cl *Client) AddListener(l Listener) {
336 cl.listeners = append(cl.listeners, l)
337 if cl.config.AcceptPeerConnections {
338 go cl.acceptConnections(l)
342 func (cl *Client) firewallCallback(net.Addr) bool {
344 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
347 torrent.Add("connections firewalled", 1)
349 torrent.Add("connections not firewalled", 1)
354 func (cl *Client) listenOnNetwork(n network) bool {
355 if n.Ipv4 && cl.config.DisableIPv4 {
358 if n.Ipv6 && cl.config.DisableIPv6 {
361 if n.Tcp && cl.config.DisableTCP {
364 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
370 func (cl *Client) listenNetworks() (ns []network) {
371 for _, n := range allPeerNetworks {
372 if cl.listenOnNetwork(n) {
379 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
380 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
381 cfg := dht.ServerConfig{
382 IPBlocklist: cl.ipBlockList,
384 OnAnnouncePeer: cl.onDHTAnnouncePeer,
385 PublicIP: func() net.IP {
386 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
387 return cl.config.PublicIp6
389 return cl.config.PublicIp4
391 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
392 OnQuery: cl.config.DHTOnQuery,
393 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
398 s, err = dht.NewServer(&cfg)
401 ts, err := s.Bootstrap()
403 cl.logger.Printf("error bootstrapping dht: %s", err)
405 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
411 func (cl *Client) Closed() events.Done {
412 return cl.closed.Done()
415 func (cl *Client) eachDhtServer(f func(DhtServer)) {
416 for _, ds := range cl.dhtServers {
421 // Stops the client. All connections to peers are closed and all activity will
423 func (cl *Client) Close() (errs []error) {
425 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
428 for _, t := range cl.torrents {
429 err := t.close(&closeGroup)
431 errs = append(errs, err)
435 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
444 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
445 if cl.ipBlockList == nil {
448 return cl.ipBlockList.Lookup(ip)
451 func (cl *Client) ipIsBlocked(ip net.IP) bool {
452 _, blocked := cl.ipBlockRange(ip)
456 func (cl *Client) wantConns() bool {
457 if cl.config.AlwaysWantConns {
460 for _, t := range cl.torrents {
468 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
469 func (cl *Client) rejectAccepted(conn net.Conn) error {
471 return errors.New("don't want conns right now")
473 ra := conn.RemoteAddr()
474 if rip := addrIpOrNil(ra); rip != nil {
475 if cl.config.DisableIPv4Peers && rip.To4() != nil {
476 return errors.New("ipv4 peers disabled")
478 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
479 return errors.New("ipv4 disabled")
482 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
483 return errors.New("ipv6 disabled")
485 if cl.rateLimitAccept(rip) {
486 return errors.New("source IP accepted rate limited")
488 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
489 return errors.New("bad source addr")
495 func (cl *Client) acceptConnections(l Listener) {
497 conn, err := l.Accept()
498 torrent.Add("client listener accepts", 1)
499 conn = pproffd.WrapNetConn(conn)
501 closed := cl.closed.IsSet()
504 reject = cl.rejectAccepted(conn)
514 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add("rejected accepted connections", 1)
520 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
523 go cl.incomingConnection(conn)
525 log.Fmsg("accepted %q connection at %q from %q",
529 ).SetLevel(log.Debug).Log(cl.logger)
530 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
531 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
532 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
537 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
538 func regularNetConnPeerConnConnString(nc net.Conn) string {
539 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
542 func (cl *Client) incomingConnection(nc net.Conn) {
544 if tc, ok := nc.(*net.TCPConn); ok {
547 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
548 regularNetConnPeerConnConnString(nc))
554 c.Discovery = PeerSourceIncoming
555 cl.runReceivedConn(c)
558 // Returns a handle to the given torrent, if it's present in the client.
559 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
562 t, ok = cl.torrents[ih]
566 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
567 return cl.torrents[ih]
570 type DialResult struct {
575 func countDialResult(err error) {
577 torrent.Add("successful dials", 1)
579 torrent.Add("unsuccessful dials", 1)
583 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
584 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
585 if ret < minDialTimeout {
591 // Returns whether an address is known to connect to a client with our own ID.
592 func (cl *Client) dopplegangerAddr(addr string) bool {
593 _, ok := cl.dopplegangerAddrs[addr]
597 // Returns a connection over UTP or TCP, whichever is first to connect.
598 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
599 return DialFirst(ctx, addr, cl.dialers)
602 // Returns a connection over UTP or TCP, whichever is first to connect.
603 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605 t := perf.NewTimer(perf.CallerName(0))
608 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
614 ctx, cancel := context.WithCancel(ctx)
615 // As soon as we return one connection, cancel the others.
618 resCh := make(chan DialResult, left)
619 for _, _s := range dialers {
624 dialFromSocket(ctx, s, addr),
629 // Wait for a successful connection.
631 defer perf.ScopeTimer()()
632 for ; left > 0 && res.Conn == nil; left-- {
636 // There are still incompleted dials.
638 for ; left > 0; left-- {
639 conn := (<-resCh).Conn
646 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
651 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
662 func forgettableDialError(err error) bool {
663 return strings.Contains(err.Error(), "no suitable address found")
666 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
667 if _, ok := t.halfOpen[addr]; !ok {
668 panic("invariant broken")
670 delete(t.halfOpen, addr)
672 for _, t := range cl.torrents {
677 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
678 // for valid reasons.
679 func (cl *Client) initiateProtocolHandshakes(
683 outgoing, encryptHeader bool,
684 remoteAddr PeerRemoteAddr,
685 network, connString string,
687 c *PeerConn, err error,
689 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
690 c.headerEncrypted = encryptHeader
691 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693 dl, ok := ctx.Deadline()
697 err = nc.SetDeadline(dl)
701 err = cl.initiateHandshakes(c, t)
705 // Returns nil connection and nil error if no connection could be established for valid reasons.
706 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
707 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
710 return t.dialTimeout()
713 dr := cl.dialFirst(dialCtx, addr.String())
716 if dialCtx.Err() != nil {
717 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719 return nil, errors.New("dial failed")
721 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
728 // Returns nil connection and nil error if no connection could be established
729 // for valid reasons.
730 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
731 torrent.Add("establish outgoing connection", 1)
732 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
733 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with preferred header obfuscation", 1)
738 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
739 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
740 // We should have just tried with the preferred header obfuscation. If it was required,
741 // there's nothing else to try.
744 // Try again with encryption if we didn't earlier, or without if we did.
745 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with fallback header obfuscation", 1)
749 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
753 // Called to dial out and run a connection. The addr we're given is already
754 // considered half-open.
755 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
756 cl.dialRateLimiter.Wait(context.Background())
757 c, err := cl.establishOutgoingConn(t, addr)
760 // Don't release lock between here and addPeerConn, unless it's for
762 cl.noLongerHalfOpen(t, addr.String())
765 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
772 t.runHandshookConnLoggingErr(c)
775 // The port number for incoming peer connections. 0 if the client isn't listening.
776 func (cl *Client) incomingPeerPort() int {
777 return cl.LocalPort()
780 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
781 if c.headerEncrypted {
784 rw, c.cryptoMethod, err = mse.InitiateHandshake(
791 cl.config.CryptoProvides,
795 return fmt.Errorf("header obfuscation handshake: %w", err)
798 ih, err := cl.connBtHandshake(c, &t.infoHash)
800 return fmt.Errorf("bittorrent protocol handshake: %w", err)
802 if ih != t.infoHash {
803 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
808 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
809 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
810 func (cl *Client) forSkeys(f func([]byte) bool) {
813 if false { // Emulate the bug from #114
815 for ih := range cl.torrents {
819 for range cl.torrents {
826 for ih := range cl.torrents {
833 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
834 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
840 // Do encryption and bittorrent handshakes as receiver.
841 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
842 defer perf.ScopeTimerErr(&err)()
844 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
846 if err == nil || err == mse.ErrNoSecretKeyMatch {
847 if c.headerEncrypted {
848 torrent.Add("handshakes received encrypted", 1)
850 torrent.Add("handshakes received unencrypted", 1)
853 torrent.Add("handshakes received with error while handling encryption", 1)
856 if err == mse.ErrNoSecretKeyMatch {
861 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
862 err = errors.New("connection does not have required header obfuscation")
865 ih, err := cl.connBtHandshake(c, nil)
867 return nil, fmt.Errorf("during bt handshake: %w", err)
875 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
879 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
880 &successfulPeerWireProtocolHandshakePeerReservedBytes)
883 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
884 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
888 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
890 c.PeerExtensionBytes = res.PeerExtensionBits
891 c.PeerID = res.PeerID
892 c.completedHandshake = time.Now()
893 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
899 func (cl *Client) runReceivedConn(c *PeerConn) {
900 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
904 t, err := cl.receiveHandshakes(c)
907 "error receiving handshakes on %v: %s", c, err,
908 ).SetLevel(log.Debug).
910 "network", c.Network,
912 torrent.Add("error receiving handshake", 1)
914 cl.onBadAccept(c.RemoteAddr)
919 torrent.Add("received handshake for unloaded torrent", 1)
920 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
922 cl.onBadAccept(c.RemoteAddr)
926 torrent.Add("received handshake for loaded torrent", 1)
929 t.runHandshookConnLoggingErr(c)
932 // Client lock must be held before entering this.
933 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
935 if c.PeerID == cl.peerID {
938 addr := c.conn.RemoteAddr().String()
939 cl.dopplegangerAddrs[addr] = struct{}{}
941 // Because the remote address is not necessarily the same as its client's torrent listen
942 // address, we won't record the remote address as a doppleganger. Instead, the initiator
943 // can record *us* as the doppleganger.
945 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
948 c.conn.SetWriteDeadline(time.Time{})
949 c.r = deadlineReader{c.conn, c.r}
950 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
951 if connIsIpv6(c.conn) {
952 torrent.Add("completed handshake over ipv6", 1)
954 if err := t.addPeerConn(c); err != nil {
955 return fmt.Errorf("adding connection: %w", err)
957 defer t.dropConnection(c)
959 cl.sendInitialMessages(c, t)
960 c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc)
961 c.updateRequestsTimer.Stop()
962 err := c.mainReadLoop()
964 return fmt.Errorf("main read loop: %w", err)
969 func (c *PeerConn) updateRequestsTimerFunc() {
971 defer c.locker().Unlock()
972 if c.needRequestUpdate != "" {
975 if c.actualRequestState.Requests.IsEmpty() {
976 panic("updateRequestsTimer should have been stopped")
978 c.updateRequests("updateRequestsTimer")
981 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
982 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
983 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
984 const localClientReqq = 1 << 5
986 // See the order given in Transmission's tr_peerMsgsNew.
987 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
988 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
989 conn.write(pp.Message{
991 ExtendedID: pp.HandshakeExtendedID,
992 ExtendedPayload: func() []byte {
993 msg := pp.ExtendedHandshakeMessage{
994 M: map[pp.ExtensionName]pp.ExtensionNumber{
995 pp.ExtensionNameMetadata: metadataExtendedId,
997 V: cl.config.ExtendedHandshakeClientVersion,
998 Reqq: localClientReqq,
999 YourIp: pp.CompactIp(conn.remoteIp()),
1000 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
1001 Port: cl.incomingPeerPort(),
1002 MetadataSize: torrent.metadataSize(),
1003 // TODO: We can figured these out specific to the socket
1005 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1006 Ipv6: cl.config.PublicIp6.To16(),
1008 if !cl.config.DisablePEX {
1009 msg.M[pp.ExtensionNamePex] = pexExtendedId
1011 return bencode.MustMarshal(msg)
1016 if conn.fastEnabled() {
1017 if torrent.haveAllPieces() {
1018 conn.write(pp.Message{Type: pp.HaveAll})
1019 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1021 } else if !torrent.haveAnyPieces() {
1022 conn.write(pp.Message{Type: pp.HaveNone})
1023 conn.sentHaves.Clear()
1029 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1030 conn.write(pp.Message{
1037 func (cl *Client) dhtPort() (ret uint16) {
1038 if len(cl.dhtServers) == 0 {
1041 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1044 func (cl *Client) haveDhtServer() bool {
1045 return len(cl.dhtServers) > 0
1048 // Process incoming ut_metadata message.
1049 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1050 var d pp.ExtendedMetadataRequestMsg
1051 err := bencode.Unmarshal(payload, &d)
1052 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1053 } else if err != nil {
1054 return fmt.Errorf("error unmarshalling bencode: %s", err)
1058 case pp.DataMetadataExtensionMsgType:
1059 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1060 if !c.requestedMetadataPiece(piece) {
1061 return fmt.Errorf("got unexpected piece %d", piece)
1063 c.metadataRequests[piece] = false
1064 begin := len(payload) - d.PieceSize()
1065 if begin < 0 || begin >= len(payload) {
1066 return fmt.Errorf("data has bad offset in payload: %d", begin)
1068 t.saveMetadataPiece(piece, payload[begin:])
1069 c.lastUsefulChunkReceived = time.Now()
1070 err = t.maybeCompleteMetadata()
1072 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1073 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1074 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1075 // log consumers can filter for this message.
1076 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1079 case pp.RequestMetadataExtensionMsgType:
1080 if !t.haveMetadataPiece(piece) {
1081 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1084 start := (1 << 14) * piece
1085 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1086 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1088 case pp.RejectMetadataExtensionMsgType:
1091 return errors.New("unknown msg_type value")
1095 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1096 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1097 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1102 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1106 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1109 if _, ok := cl.ipBlockRange(ip); ok {
1112 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1118 // Return a Torrent ready for insertion into a Client.
1119 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1120 return cl.newTorrentOpt(addTorrentOpts{
1122 Storage: specStorage,
1126 // Return a Torrent ready for insertion into a Client.
1127 func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) {
1128 // use provided storage, if provided
1129 storageClient := cl.defaultStorage
1130 if opts.Storage != nil {
1131 storageClient = storage.NewClient(opts.Storage)
1136 infoHash: opts.InfoHash,
1137 peers: prioritizedPeers{
1139 getPrio: func(p PeerInfo) peerPriority {
1141 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1144 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1146 halfOpen: make(map[string]PeerInfo),
1147 pieceStateChanges: pubsub.NewPubSub(),
1149 storageOpener: storageClient,
1150 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1152 metadataChanged: sync.Cond{
1155 webSeeds: make(map[string]*Peer),
1156 gotMetainfoC: make(chan struct{}),
1158 t.networkingEnabled.Set()
1159 t.logger = cl.logger.WithContextValue(t)
1160 if opts.ChunkSize == 0 {
1161 opts.ChunkSize = defaultChunkSize
1163 t.setChunkSize(opts.ChunkSize)
1167 // A file-like handle to some torrent data resource.
1168 type Handle interface {
1175 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1176 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1179 // Adds a torrent by InfoHash with a custom Storage implementation.
1180 // If the torrent already exists then this Storage is ignored and the
1181 // existing torrent returned with `new` set to `false`
1182 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1185 t, ok := cl.torrents[infoHash]
1191 t = cl.newTorrent(infoHash, specStorage)
1192 cl.eachDhtServer(func(s DhtServer) {
1193 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1194 go t.dhtAnnouncer(s)
1197 cl.torrents[infoHash] = t
1198 cl.clearAcceptLimits()
1199 t.updateWantPeersEvent()
1200 // Tickle Client.waitAccept, new torrent may want conns.
1201 cl.event.Broadcast()
1205 // Adds a torrent by InfoHash with a custom Storage implementation.
1206 // If the torrent already exists then this Storage is ignored and the
1207 // existing torrent returned with `new` set to `false`
1208 func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) {
1209 infoHash := opts.InfoHash
1212 t, ok := cl.torrents[infoHash]
1218 t = cl.newTorrentOpt(opts)
1219 cl.eachDhtServer(func(s DhtServer) {
1220 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1221 go t.dhtAnnouncer(s)
1224 cl.torrents[infoHash] = t
1225 cl.clearAcceptLimits()
1226 t.updateWantPeersEvent()
1227 // Tickle Client.waitAccept, new torrent may want conns.
1228 cl.event.Broadcast()
1232 type addTorrentOpts struct {
1234 Storage storage.ClientImpl
1235 ChunkSize pp.Integer
1238 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1239 // Torrent.MergeSpec.
1240 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1241 t, new = cl.AddTorrentOpt(addTorrentOpts{
1242 InfoHash: spec.InfoHash,
1243 Storage: spec.Storage,
1244 ChunkSize: spec.ChunkSize,
1248 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1250 modSpec.ChunkSize = 0
1252 err = t.MergeSpec(&modSpec)
1253 if err != nil && new {
1259 type stringAddr string
1261 var _ net.Addr = stringAddr("")
1263 func (stringAddr) Network() string { return "" }
1264 func (me stringAddr) String() string { return string(me) }
1266 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1267 // spec.DisallowDataDownload/Upload will be read and applied
1268 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1269 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1270 if spec.DisplayName != "" {
1271 t.SetDisplayName(spec.DisplayName)
1273 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1274 if spec.InfoBytes != nil {
1275 err := t.SetInfoBytes(spec.InfoBytes)
1281 cl.AddDhtNodes(spec.DhtNodes)
1284 useTorrentSources(spec.Sources, t)
1285 for _, url := range spec.Webseeds {
1288 for _, peerAddr := range spec.PeerAddrs {
1290 Addr: stringAddr(peerAddr),
1291 Source: PeerSourceDirect,
1295 if spec.ChunkSize != 0 {
1296 panic("chunk size cannot be changed for existing Torrent")
1298 t.addTrackers(spec.Trackers)
1300 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1301 t.dataUploadDisallowed = spec.DisallowDataUpload
1305 func useTorrentSources(sources []string, t *Torrent) {
1306 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1307 ctx := context.Background()
1308 for i := 0; i < len(sources); i += 1 {
1311 if err := useTorrentSource(ctx, s, t); err != nil {
1312 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1314 t.logger.Printf("successfully used source %q", s)
1320 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1321 ctx, cancel := context.WithCancel(ctx)
1331 var req *http.Request
1332 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1335 var resp *http.Response
1336 if resp, err = http.DefaultClient.Do(req); err != nil {
1339 var mi metainfo.MetaInfo
1340 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1343 if ctx.Err() != nil {
1348 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1351 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1352 t, ok := cl.torrents[infoHash]
1354 err = fmt.Errorf("no such torrent")
1361 delete(cl.torrents, infoHash)
1365 func (cl *Client) allTorrentsCompleted() bool {
1366 for _, t := range cl.torrents {
1370 if !t.haveAllPieces() {
1377 // Returns true when all torrents are completely downloaded and false if the
1378 // client is stopped before that.
1379 func (cl *Client) WaitAll() bool {
1382 for !cl.allTorrentsCompleted() {
1383 if cl.closed.IsSet() {
1391 // Returns handles to all the torrents loaded in the Client.
1392 func (cl *Client) Torrents() []*Torrent {
1395 return cl.torrentsAsSlice()
1398 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1399 for _, t := range cl.torrents {
1400 ret = append(ret, t)
1405 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1406 spec, err := TorrentSpecFromMagnetUri(uri)
1410 T, _, err = cl.AddTorrentSpec(spec)
1414 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1415 ts, err := TorrentSpecFromMetaInfoErr(mi)
1419 T, _, err = cl.AddTorrentSpec(ts)
1423 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1424 mi, err := metainfo.LoadFromFile(filename)
1428 return cl.AddTorrent(mi)
1431 func (cl *Client) DhtServers() []DhtServer {
1432 return cl.dhtServers
1435 func (cl *Client) AddDhtNodes(nodes []string) {
1436 for _, n := range nodes {
1437 hmp := missinggo.SplitHostMaybePort(n)
1438 ip := net.ParseIP(hmp.Host)
1440 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1443 ni := krpc.NodeInfo{
1444 Addr: krpc.NodeAddr{
1449 cl.eachDhtServer(func(s DhtServer) {
1455 func (cl *Client) banPeerIP(ip net.IP) {
1456 cl.logger.Printf("banning ip %v", ip)
1457 if cl.badPeerIPs == nil {
1458 cl.badPeerIPs = make(map[string]struct{})
1460 cl.badPeerIPs[ip.String()] = struct{}{}
1463 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1472 PeerMaxRequests: 250,
1474 RemoteAddr: remoteAddr,
1476 callbacks: &cl.config.Callbacks,
1478 connString: connString,
1482 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1483 c.setRW(connStatsReadWriter{nc, c})
1484 c.r = &rateLimitedReader{
1485 l: cl.config.DownloadRateLimiter,
1488 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1489 for _, f := range cl.config.Callbacks.NewPeer {
1495 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1502 t.addPeers([]PeerInfo{{
1503 Addr: ipPortAddr{ip, port},
1504 Source: PeerSourceDhtAnnouncePeer,
1508 func firstNotNil(ips ...net.IP) net.IP {
1509 for _, ip := range ips {
1517 func (cl *Client) eachListener(f func(Listener) bool) {
1518 for _, s := range cl.listeners {
1525 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1526 for i := 0; i < len(cl.listeners); i += 1 {
1527 if ret = cl.listeners[i]; f(ret) {
1534 func (cl *Client) publicIp(peer net.IP) net.IP {
1535 // TODO: Use BEP 10 to determine how peers are seeing us.
1536 if peer.To4() != nil {
1538 cl.config.PublicIp4,
1539 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1544 cl.config.PublicIp6,
1545 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1549 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1550 l := cl.findListener(
1551 func(l Listener) bool {
1552 return f(addrIpOrNil(l.Addr()))
1558 return addrIpOrNil(l.Addr())
1561 // Our IP as a peer should see it.
1562 func (cl *Client) publicAddr(peer net.IP) IpPort {
1563 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1566 // ListenAddrs addresses currently being listened to.
1567 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1569 ret = make([]net.Addr, len(cl.listeners))
1570 for i := 0; i < len(cl.listeners); i += 1 {
1571 ret[i] = cl.listeners[i].Addr()
1577 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1578 ipa, ok := tryIpPortFromNetAddr(addr)
1582 ip := maskIpForAcceptLimiting(ipa.IP)
1583 if cl.acceptLimiter == nil {
1584 cl.acceptLimiter = make(map[ipStr]int)
1586 cl.acceptLimiter[ipStr(ip.String())]++
1589 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1590 if ip4 := ip.To4(); ip4 != nil {
1591 return ip4.Mask(net.CIDRMask(24, 32))
1596 func (cl *Client) clearAcceptLimits() {
1597 cl.acceptLimiter = nil
1600 func (cl *Client) acceptLimitClearer() {
1603 case <-cl.closed.Done():
1605 case <-time.After(15 * time.Minute):
1607 cl.clearAcceptLimits()
1613 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1614 if cl.config.DisableAcceptRateLimiting {
1617 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1620 func (cl *Client) rLock() {
1624 func (cl *Client) rUnlock() {
1628 func (cl *Client) lock() {
1632 func (cl *Client) unlock() {
1636 func (cl *Client) locker() *lockWithDeferreds {
1640 func (cl *Client) String() string {
1641 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1644 // Returns connection-level aggregate stats at the Client level. See the comment on
1645 // TorrentStats.ConnStats.
1646 func (cl *Client) ConnStats() ConnStats {
1647 return cl.stats.Copy()