20 "github.com/anacrolix/chansync/events"
21 "github.com/anacrolix/dht/v2"
22 "github.com/anacrolix/dht/v2/krpc"
23 "github.com/anacrolix/log"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/v2"
27 "github.com/anacrolix/missinggo/v2/bitmap"
28 "github.com/anacrolix/missinggo/v2/pproffd"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
36 "github.com/anacrolix/chansync"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/internal/limiter"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
45 "github.com/anacrolix/torrent/tracker"
46 "github.com/anacrolix/torrent/webtorrent"
49 // Clients contain zero or more Torrents. A Client manages a blocklist, the
50 // TCP/UDP protocol ports, and DHT as desired.
52 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
58 closed chansync.SetOnce
64 defaultStorage *storage.Client
68 dhtServers []DhtServer
69 ipBlockList iplist.Ranger
71 // Set of addresses that have our client ID. This intentionally will
72 // include ourselves if we end up trying to connect to our own address
73 // through legitimate channels.
74 dopplegangerAddrs map[string]struct{}
75 badPeerIPs map[string]struct{}
76 torrents map[InfoHash]*Torrent
78 acceptLimiter map[ipStr]int
79 dialRateLimiter *rate.Limiter
82 websocketTrackers websocketTrackers
84 activeAnnounceLimiter limiter.Instance
86 updateRequests chansync.BroadcastCond
91 func (cl *Client) BadPeerIPs() (ips []string) {
93 ips = cl.badPeerIPsLocked()
98 func (cl *Client) badPeerIPsLocked() (ips []string) {
99 ips = make([]string, len(cl.badPeerIPs))
101 for k := range cl.badPeerIPs {
108 func (cl *Client) PeerID() PeerID {
112 // Returns the port number for the first listener that has one. No longer assumes that all port
113 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
115 func (cl *Client) LocalPort() (port int) {
116 for i := 0; i < len(cl.listeners); i += 1 {
117 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
124 func writeDhtServerStatus(w io.Writer, s DhtServer) {
125 dhtStats := s.Stats()
126 fmt.Fprintf(w, " ID: %x\n", s.ID())
127 spew.Fdump(w, dhtStats)
130 // Writes out a human readable status of the client, such as for writing to a
132 func (cl *Client) WriteStatus(_w io.Writer) {
135 w := bufio.NewWriter(_w)
137 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
138 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
139 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
140 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
141 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
142 cl.eachDhtServer(func(s DhtServer) {
143 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
144 writeDhtServerStatus(w, s)
146 spew.Fdump(w, &cl.stats)
147 torrentsSlice := cl.torrentsAsSlice()
148 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
150 sort.Slice(torrentsSlice, func(l, r int) bool {
151 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
153 for _, t := range torrentsSlice {
155 fmt.Fprint(w, "<unknown name>")
157 fmt.Fprint(w, t.name())
163 "%f%% of %d bytes (%s)",
164 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
166 humanize.Bytes(uint64(*t.length)))
168 w.WriteString("<missing metainfo>")
176 // Filters things that are less than warning from UPnP discovery.
177 func upnpDiscoverLogFilter(m log.Msg) bool {
178 level, ok := m.GetLevel()
179 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
182 func (cl *Client) initLogger() {
183 logger := cl.config.Logger
186 if !cl.config.Debug {
187 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
190 cl.logger = logger.WithValues(cl)
193 func (cl *Client) announceKey() int32 {
194 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
197 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
198 func (cl *Client) init(cfg *ClientConfig) {
200 cl.dopplegangerAddrs = make(map[string]struct{})
201 cl.torrents = make(map[metainfo.Hash]*Torrent)
202 cl.dialRateLimiter = rate.NewLimiter(10, 10)
203 cl.activeAnnounceLimiter.SlotsPerKey = 2
205 cl.event.L = cl.locker()
206 cl.ipBlockList = cfg.IPBlocklist
209 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
211 cfg = NewDefaultClientConfig()
217 go cl.acceptLimitClearer()
226 storageImpl := cfg.DefaultStorage
227 if storageImpl == nil {
228 // We'd use mmap by default but HFS+ doesn't support sparse files.
229 storageImplCloser := storage.NewFile(cfg.DataDir)
230 cl.onClose = append(cl.onClose, func() {
231 if err := storageImplCloser.Close(); err != nil {
232 cl.logger.Printf("error closing default storage: %s", err)
235 storageImpl = storageImplCloser
237 cl.defaultStorage = storage.NewClient(storageImpl)
239 if cfg.PeerID != "" {
240 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
242 o := copy(cl.peerID[:], cfg.Bep20)
243 _, err = rand.Read(cl.peerID[o:])
245 panic("error generating peer id")
249 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
257 for _, _s := range sockets {
258 s := _s // Go is fucking retarded.
259 cl.onClose = append(cl.onClose, func() { s.Close() })
260 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
261 cl.dialers = append(cl.dialers, s)
262 cl.listeners = append(cl.listeners, s)
263 if cl.config.AcceptPeerConnections {
264 go cl.acceptConnections(s)
271 for _, s := range sockets {
272 if pc, ok := s.(net.PacketConn); ok {
273 ds, err := cl.NewAnacrolixDhtServer(pc)
277 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
278 cl.onClose = append(cl.onClose, func() { ds.Close() })
283 cl.websocketTrackers = websocketTrackers{
286 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
289 t, ok := cl.torrents[infoHash]
291 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
293 return t.announceRequest(event), nil
295 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
298 t, ok := cl.torrents[dcc.InfoHash]
300 cl.logger.WithDefaultLevel(log.Warning).Printf(
301 "got webrtc conn for unloaded torrent with infohash %x",
307 go t.onWebRtcConn(dc, dcc)
314 func (cl *Client) AddDhtServer(d DhtServer) {
315 cl.dhtServers = append(cl.dhtServers, d)
318 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
319 // given address for any Torrent.
320 func (cl *Client) AddDialer(d Dialer) {
323 cl.dialers = append(cl.dialers, d)
324 for _, t := range cl.torrents {
329 func (cl *Client) Listeners() []Listener {
333 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
335 func (cl *Client) AddListener(l Listener) {
336 cl.listeners = append(cl.listeners, l)
337 if cl.config.AcceptPeerConnections {
338 go cl.acceptConnections(l)
342 func (cl *Client) firewallCallback(net.Addr) bool {
344 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
347 torrent.Add("connections firewalled", 1)
349 torrent.Add("connections not firewalled", 1)
354 func (cl *Client) listenOnNetwork(n network) bool {
355 if n.Ipv4 && cl.config.DisableIPv4 {
358 if n.Ipv6 && cl.config.DisableIPv6 {
361 if n.Tcp && cl.config.DisableTCP {
364 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
370 func (cl *Client) listenNetworks() (ns []network) {
371 for _, n := range allPeerNetworks {
372 if cl.listenOnNetwork(n) {
379 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
380 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
381 cfg := dht.ServerConfig{
382 IPBlocklist: cl.ipBlockList,
384 OnAnnouncePeer: cl.onDHTAnnouncePeer,
385 PublicIP: func() net.IP {
386 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
387 return cl.config.PublicIp6
389 return cl.config.PublicIp4
391 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
392 OnQuery: cl.config.DHTOnQuery,
393 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
395 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
398 s, err = dht.NewServer(&cfg)
401 ts, err := s.Bootstrap()
403 cl.logger.Printf("error bootstrapping dht: %s", err)
405 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
411 func (cl *Client) Closed() events.Done {
412 return cl.closed.Done()
415 func (cl *Client) eachDhtServer(f func(DhtServer)) {
416 for _, ds := range cl.dhtServers {
421 // Stops the client. All connections to peers are closed and all activity will
423 func (cl *Client) Close() (errs []error) {
425 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
428 for _, t := range cl.torrents {
429 err := t.close(&closeGroup)
431 errs = append(errs, err)
435 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
437 for i := range cl.onClose {
438 cl.onClose[len(cl.onClose)-1-i]()
444 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
445 if cl.ipBlockList == nil {
448 return cl.ipBlockList.Lookup(ip)
451 func (cl *Client) ipIsBlocked(ip net.IP) bool {
452 _, blocked := cl.ipBlockRange(ip)
456 func (cl *Client) wantConns() bool {
457 if cl.config.AlwaysWantConns {
460 for _, t := range cl.torrents {
468 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
469 func (cl *Client) rejectAccepted(conn net.Conn) error {
471 return errors.New("don't want conns right now")
473 ra := conn.RemoteAddr()
474 if rip := addrIpOrNil(ra); rip != nil {
475 if cl.config.DisableIPv4Peers && rip.To4() != nil {
476 return errors.New("ipv4 peers disabled")
478 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
479 return errors.New("ipv4 disabled")
482 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
483 return errors.New("ipv6 disabled")
485 if cl.rateLimitAccept(rip) {
486 return errors.New("source IP accepted rate limited")
488 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
489 return errors.New("bad source addr")
495 func (cl *Client) acceptConnections(l Listener) {
497 conn, err := l.Accept()
498 torrent.Add("client listener accepts", 1)
499 conn = pproffd.WrapNetConn(conn)
501 closed := cl.closed.IsSet()
504 reject = cl.rejectAccepted(conn)
514 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
519 torrent.Add("rejected accepted connections", 1)
520 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
523 go cl.incomingConnection(conn)
525 log.Fmsg("accepted %q connection at %q from %q",
529 ).SetLevel(log.Debug).Log(cl.logger)
530 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
531 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
532 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
537 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
538 func regularNetConnPeerConnConnString(nc net.Conn) string {
539 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
542 func (cl *Client) incomingConnection(nc net.Conn) {
544 if tc, ok := nc.(*net.TCPConn); ok {
547 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
548 regularNetConnPeerConnConnString(nc))
554 c.Discovery = PeerSourceIncoming
555 cl.runReceivedConn(c)
558 // Returns a handle to the given torrent, if it's present in the client.
559 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
562 t, ok = cl.torrents[ih]
566 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
567 return cl.torrents[ih]
570 type DialResult struct {
575 func countDialResult(err error) {
577 torrent.Add("successful dials", 1)
579 torrent.Add("unsuccessful dials", 1)
583 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
584 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
585 if ret < minDialTimeout {
591 // Returns whether an address is known to connect to a client with our own ID.
592 func (cl *Client) dopplegangerAddr(addr string) bool {
593 _, ok := cl.dopplegangerAddrs[addr]
597 // Returns a connection over UTP or TCP, whichever is first to connect.
598 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
599 return DialFirst(ctx, addr, cl.dialers)
602 // Returns a connection over UTP or TCP, whichever is first to connect.
603 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
605 t := perf.NewTimer(perf.CallerName(0))
608 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
610 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
614 ctx, cancel := context.WithCancel(ctx)
615 // As soon as we return one connection, cancel the others.
618 resCh := make(chan DialResult, left)
619 for _, _s := range dialers {
624 dialFromSocket(ctx, s, addr),
629 // Wait for a successful connection.
631 defer perf.ScopeTimer()()
632 for ; left > 0 && res.Conn == nil; left-- {
636 // There are still incompleted dials.
638 for ; left > 0; left-- {
639 conn := (<-resCh).Conn
646 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
651 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
652 c, err := s.Dial(ctx, addr)
653 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
654 // it now in case we close the connection forthwith.
655 if tc, ok := c.(*net.TCPConn); ok {
662 func forgettableDialError(err error) bool {
663 return strings.Contains(err.Error(), "no suitable address found")
666 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
667 if _, ok := t.halfOpen[addr]; !ok {
668 panic("invariant broken")
670 delete(t.halfOpen, addr)
672 for _, t := range cl.torrents {
677 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
678 // for valid reasons.
679 func (cl *Client) initiateProtocolHandshakes(
683 outgoing, encryptHeader bool,
684 remoteAddr PeerRemoteAddr,
685 network, connString string,
687 c *PeerConn, err error,
689 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
690 c.headerEncrypted = encryptHeader
691 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
693 dl, ok := ctx.Deadline()
697 err = nc.SetDeadline(dl)
701 err = cl.initiateHandshakes(c, t)
705 // Returns nil connection and nil error if no connection could be established for valid reasons.
706 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
707 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
710 return t.dialTimeout()
713 dr := cl.dialFirst(dialCtx, addr.String())
716 if dialCtx.Err() != nil {
717 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
719 return nil, errors.New("dial failed")
721 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
728 // Returns nil connection and nil error if no connection could be established
729 // for valid reasons.
730 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
731 torrent.Add("establish outgoing connection", 1)
732 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
733 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with preferred header obfuscation", 1)
738 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
739 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
740 // We should have just tried with the preferred header obfuscation. If it was required,
741 // there's nothing else to try.
744 // Try again with encryption if we didn't earlier, or without if we did.
745 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
747 torrent.Add("initiated conn with fallback header obfuscation", 1)
749 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
753 // Called to dial out and run a connection. The addr we're given is already
754 // considered half-open.
755 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
756 cl.dialRateLimiter.Wait(context.Background())
757 c, err := cl.establishOutgoingConn(t, addr)
760 // Don't release lock between here and addPeerConn, unless it's for
762 cl.noLongerHalfOpen(t, addr.String())
765 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
772 t.runHandshookConnLoggingErr(c)
775 // The port number for incoming peer connections. 0 if the client isn't listening.
776 func (cl *Client) incomingPeerPort() int {
777 return cl.LocalPort()
780 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
781 if c.headerEncrypted {
784 rw, c.cryptoMethod, err = mse.InitiateHandshake(
791 cl.config.CryptoProvides,
795 return fmt.Errorf("header obfuscation handshake: %w", err)
798 ih, err := cl.connBtHandshake(c, &t.infoHash)
800 return fmt.Errorf("bittorrent protocol handshake: %w", err)
802 if ih != t.infoHash {
803 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
808 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
809 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
810 func (cl *Client) forSkeys(f func([]byte) bool) {
813 if false { // Emulate the bug from #114
815 for ih := range cl.torrents {
819 for range cl.torrents {
826 for ih := range cl.torrents {
833 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
834 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
840 // Do encryption and bittorrent handshakes as receiver.
841 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
842 defer perf.ScopeTimerErr(&err)()
844 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
846 if err == nil || err == mse.ErrNoSecretKeyMatch {
847 if c.headerEncrypted {
848 torrent.Add("handshakes received encrypted", 1)
850 torrent.Add("handshakes received unencrypted", 1)
853 torrent.Add("handshakes received with error while handling encryption", 1)
856 if err == mse.ErrNoSecretKeyMatch {
861 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
862 err = errors.New("connection does not have required header obfuscation")
865 ih, err := cl.connBtHandshake(c, nil)
867 return nil, fmt.Errorf("during bt handshake: %w", err)
875 var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
879 "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
880 &successfulPeerWireProtocolHandshakePeerReservedBytes)
883 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
884 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
888 successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1)
890 c.PeerExtensionBytes = res.PeerExtensionBits
891 c.PeerID = res.PeerID
892 c.completedHandshake = time.Now()
893 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
899 func (cl *Client) runReceivedConn(c *PeerConn) {
900 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
904 t, err := cl.receiveHandshakes(c)
907 "error receiving handshakes on %v: %s", c, err,
908 ).SetLevel(log.Debug).
910 "network", c.Network,
912 torrent.Add("error receiving handshake", 1)
914 cl.onBadAccept(c.RemoteAddr)
919 torrent.Add("received handshake for unloaded torrent", 1)
920 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
922 cl.onBadAccept(c.RemoteAddr)
926 torrent.Add("received handshake for loaded torrent", 1)
929 t.runHandshookConnLoggingErr(c)
932 // Client lock must be held before entering this.
933 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
935 if c.PeerID == cl.peerID {
938 addr := c.conn.RemoteAddr().String()
939 cl.dopplegangerAddrs[addr] = struct{}{}
941 // Because the remote address is not necessarily the same as its client's torrent listen
942 // address, we won't record the remote address as a doppleganger. Instead, the initiator
943 // can record *us* as the doppleganger.
945 t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same")
948 c.conn.SetWriteDeadline(time.Time{})
949 c.r = deadlineReader{c.conn, c.r}
950 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
951 if connIsIpv6(c.conn) {
952 torrent.Add("completed handshake over ipv6", 1)
954 if err := t.addPeerConn(c); err != nil {
955 return fmt.Errorf("adding connection: %w", err)
957 defer t.dropConnection(c)
959 cl.sendInitialMessages(c, t)
960 c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, func() {
961 if c.needRequestUpdate != "" {
964 if c.actualRequestState.Requests.IsEmpty() {
965 panic("updateRequestsTimer should have been stopped")
967 c.updateRequests("updateRequestsTimer")
969 c.updateRequestsTimer.Stop()
970 err := c.mainReadLoop()
972 return fmt.Errorf("main read loop: %w", err)
977 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
978 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
979 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
980 const localClientReqq = 1 << 5
982 // See the order given in Transmission's tr_peerMsgsNew.
983 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
984 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
985 conn.write(pp.Message{
987 ExtendedID: pp.HandshakeExtendedID,
988 ExtendedPayload: func() []byte {
989 msg := pp.ExtendedHandshakeMessage{
990 M: map[pp.ExtensionName]pp.ExtensionNumber{
991 pp.ExtensionNameMetadata: metadataExtendedId,
993 V: cl.config.ExtendedHandshakeClientVersion,
994 Reqq: localClientReqq,
995 YourIp: pp.CompactIp(conn.remoteIp()),
996 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
997 Port: cl.incomingPeerPort(),
998 MetadataSize: torrent.metadataSize(),
999 // TODO: We can figured these out specific to the socket
1001 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
1002 Ipv6: cl.config.PublicIp6.To16(),
1004 if !cl.config.DisablePEX {
1005 msg.M[pp.ExtensionNamePex] = pexExtendedId
1007 return bencode.MustMarshal(msg)
1012 if conn.fastEnabled() {
1013 if torrent.haveAllPieces() {
1014 conn.write(pp.Message{Type: pp.HaveAll})
1015 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
1017 } else if !torrent.haveAnyPieces() {
1018 conn.write(pp.Message{Type: pp.HaveNone})
1019 conn.sentHaves.Clear()
1025 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1026 conn.write(pp.Message{
1033 func (cl *Client) dhtPort() (ret uint16) {
1034 if len(cl.dhtServers) == 0 {
1037 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1040 func (cl *Client) haveDhtServer() bool {
1041 return len(cl.dhtServers) > 0
1044 // Process incoming ut_metadata message.
1045 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1046 var d pp.ExtendedMetadataRequestMsg
1047 err := bencode.Unmarshal(payload, &d)
1048 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1049 } else if err != nil {
1050 return fmt.Errorf("error unmarshalling bencode: %s", err)
1054 case pp.DataMetadataExtensionMsgType:
1055 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1056 if !c.requestedMetadataPiece(piece) {
1057 return fmt.Errorf("got unexpected piece %d", piece)
1059 c.metadataRequests[piece] = false
1060 begin := len(payload) - d.PieceSize()
1061 if begin < 0 || begin >= len(payload) {
1062 return fmt.Errorf("data has bad offset in payload: %d", begin)
1064 t.saveMetadataPiece(piece, payload[begin:])
1065 c.lastUsefulChunkReceived = time.Now()
1066 err = t.maybeCompleteMetadata()
1068 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1069 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1070 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1071 // log consumers can filter for this message.
1072 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1075 case pp.RequestMetadataExtensionMsgType:
1076 if !t.haveMetadataPiece(piece) {
1077 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1080 start := (1 << 14) * piece
1081 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1082 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1084 case pp.RejectMetadataExtensionMsgType:
1087 return errors.New("unknown msg_type value")
1091 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1092 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1093 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1098 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1102 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1105 if _, ok := cl.ipBlockRange(ip); ok {
1108 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1114 // Return a Torrent ready for insertion into a Client.
1115 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1116 return cl.newTorrentOpt(addTorrentOpts{
1118 Storage: specStorage,
1122 // Return a Torrent ready for insertion into a Client.
1123 func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) {
1124 // use provided storage, if provided
1125 storageClient := cl.defaultStorage
1126 if opts.Storage != nil {
1127 storageClient = storage.NewClient(opts.Storage)
1132 infoHash: opts.InfoHash,
1133 peers: prioritizedPeers{
1135 getPrio: func(p PeerInfo) peerPriority {
1137 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1140 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1142 halfOpen: make(map[string]PeerInfo),
1143 pieceStateChanges: pubsub.NewPubSub(),
1145 storageOpener: storageClient,
1146 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1148 metadataChanged: sync.Cond{
1151 webSeeds: make(map[string]*Peer),
1152 gotMetainfoC: make(chan struct{}),
1154 t.networkingEnabled.Set()
1155 t.logger = cl.logger.WithContextValue(t)
1156 if opts.ChunkSize == 0 {
1157 opts.ChunkSize = defaultChunkSize
1159 t.setChunkSize(opts.ChunkSize)
1163 // A file-like handle to some torrent data resource.
1164 type Handle interface {
1171 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1172 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1175 // Adds a torrent by InfoHash with a custom Storage implementation.
1176 // If the torrent already exists then this Storage is ignored and the
1177 // existing torrent returned with `new` set to `false`
1178 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1181 t, ok := cl.torrents[infoHash]
1187 t = cl.newTorrent(infoHash, specStorage)
1188 cl.eachDhtServer(func(s DhtServer) {
1189 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1190 go t.dhtAnnouncer(s)
1193 cl.torrents[infoHash] = t
1194 cl.clearAcceptLimits()
1195 t.updateWantPeersEvent()
1196 // Tickle Client.waitAccept, new torrent may want conns.
1197 cl.event.Broadcast()
1201 // Adds a torrent by InfoHash with a custom Storage implementation.
1202 // If the torrent already exists then this Storage is ignored and the
1203 // existing torrent returned with `new` set to `false`
1204 func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) {
1205 infoHash := opts.InfoHash
1208 t, ok := cl.torrents[infoHash]
1214 t = cl.newTorrentOpt(opts)
1215 cl.eachDhtServer(func(s DhtServer) {
1216 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1217 go t.dhtAnnouncer(s)
1220 cl.torrents[infoHash] = t
1221 cl.clearAcceptLimits()
1222 t.updateWantPeersEvent()
1223 // Tickle Client.waitAccept, new torrent may want conns.
1224 cl.event.Broadcast()
1228 type addTorrentOpts struct {
1230 Storage storage.ClientImpl
1231 ChunkSize pp.Integer
1234 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1235 // Torrent.MergeSpec.
1236 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1237 t, new = cl.AddTorrentOpt(addTorrentOpts{
1238 InfoHash: spec.InfoHash,
1239 Storage: spec.Storage,
1240 ChunkSize: spec.ChunkSize,
1244 // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
1246 modSpec.ChunkSize = 0
1248 err = t.MergeSpec(&modSpec)
1249 if err != nil && new {
1255 type stringAddr string
1257 var _ net.Addr = stringAddr("")
1259 func (stringAddr) Network() string { return "" }
1260 func (me stringAddr) String() string { return string(me) }
1262 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1263 // spec.DisallowDataDownload/Upload will be read and applied
1264 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1265 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1266 if spec.DisplayName != "" {
1267 t.SetDisplayName(spec.DisplayName)
1269 t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
1270 if spec.InfoBytes != nil {
1271 err := t.SetInfoBytes(spec.InfoBytes)
1277 cl.AddDhtNodes(spec.DhtNodes)
1280 useTorrentSources(spec.Sources, t)
1281 for _, url := range spec.Webseeds {
1284 for _, peerAddr := range spec.PeerAddrs {
1286 Addr: stringAddr(peerAddr),
1287 Source: PeerSourceDirect,
1291 if spec.ChunkSize != 0 {
1292 panic("chunk size cannot be changed for existing Torrent")
1294 t.addTrackers(spec.Trackers)
1296 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1297 t.dataUploadDisallowed = spec.DisallowDataUpload
1301 func useTorrentSources(sources []string, t *Torrent) {
1302 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1303 ctx := context.Background()
1304 for i := 0; i < len(sources); i += 1 {
1307 if err := useTorrentSource(ctx, s, t); err != nil {
1308 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1310 t.logger.Printf("successfully used source %q", s)
1316 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1317 ctx, cancel := context.WithCancel(ctx)
1327 var req *http.Request
1328 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1331 var resp *http.Response
1332 if resp, err = http.DefaultClient.Do(req); err != nil {
1335 var mi metainfo.MetaInfo
1336 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1339 if ctx.Err() != nil {
1344 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1347 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1348 t, ok := cl.torrents[infoHash]
1350 err = fmt.Errorf("no such torrent")
1357 delete(cl.torrents, infoHash)
1361 func (cl *Client) allTorrentsCompleted() bool {
1362 for _, t := range cl.torrents {
1366 if !t.haveAllPieces() {
1373 // Returns true when all torrents are completely downloaded and false if the
1374 // client is stopped before that.
1375 func (cl *Client) WaitAll() bool {
1378 for !cl.allTorrentsCompleted() {
1379 if cl.closed.IsSet() {
1387 // Returns handles to all the torrents loaded in the Client.
1388 func (cl *Client) Torrents() []*Torrent {
1391 return cl.torrentsAsSlice()
1394 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1395 for _, t := range cl.torrents {
1396 ret = append(ret, t)
1401 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1402 spec, err := TorrentSpecFromMagnetUri(uri)
1406 T, _, err = cl.AddTorrentSpec(spec)
1410 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1411 ts, err := TorrentSpecFromMetaInfoErr(mi)
1415 T, _, err = cl.AddTorrentSpec(ts)
1419 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1420 mi, err := metainfo.LoadFromFile(filename)
1424 return cl.AddTorrent(mi)
1427 func (cl *Client) DhtServers() []DhtServer {
1428 return cl.dhtServers
1431 func (cl *Client) AddDhtNodes(nodes []string) {
1432 for _, n := range nodes {
1433 hmp := missinggo.SplitHostMaybePort(n)
1434 ip := net.ParseIP(hmp.Host)
1436 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1439 ni := krpc.NodeInfo{
1440 Addr: krpc.NodeAddr{
1445 cl.eachDhtServer(func(s DhtServer) {
1451 func (cl *Client) banPeerIP(ip net.IP) {
1452 cl.logger.Printf("banning ip %v", ip)
1453 if cl.badPeerIPs == nil {
1454 cl.badPeerIPs = make(map[string]struct{})
1456 cl.badPeerIPs[ip.String()] = struct{}{}
1459 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1468 PeerMaxRequests: 250,
1470 RemoteAddr: remoteAddr,
1472 callbacks: &cl.config.Callbacks,
1474 connString: connString,
1478 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1479 c.setRW(connStatsReadWriter{nc, c})
1480 c.r = &rateLimitedReader{
1481 l: cl.config.DownloadRateLimiter,
1484 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1485 for _, f := range cl.config.Callbacks.NewPeer {
1491 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1498 t.addPeers([]PeerInfo{{
1499 Addr: ipPortAddr{ip, port},
1500 Source: PeerSourceDhtAnnouncePeer,
1504 func firstNotNil(ips ...net.IP) net.IP {
1505 for _, ip := range ips {
1513 func (cl *Client) eachListener(f func(Listener) bool) {
1514 for _, s := range cl.listeners {
1521 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1522 for i := 0; i < len(cl.listeners); i += 1 {
1523 if ret = cl.listeners[i]; f(ret) {
1530 func (cl *Client) publicIp(peer net.IP) net.IP {
1531 // TODO: Use BEP 10 to determine how peers are seeing us.
1532 if peer.To4() != nil {
1534 cl.config.PublicIp4,
1535 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1540 cl.config.PublicIp6,
1541 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1545 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1546 l := cl.findListener(
1547 func(l Listener) bool {
1548 return f(addrIpOrNil(l.Addr()))
1554 return addrIpOrNil(l.Addr())
1557 // Our IP as a peer should see it.
1558 func (cl *Client) publicAddr(peer net.IP) IpPort {
1559 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1562 // ListenAddrs addresses currently being listened to.
1563 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1565 ret = make([]net.Addr, len(cl.listeners))
1566 for i := 0; i < len(cl.listeners); i += 1 {
1567 ret[i] = cl.listeners[i].Addr()
1573 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1574 ipa, ok := tryIpPortFromNetAddr(addr)
1578 ip := maskIpForAcceptLimiting(ipa.IP)
1579 if cl.acceptLimiter == nil {
1580 cl.acceptLimiter = make(map[ipStr]int)
1582 cl.acceptLimiter[ipStr(ip.String())]++
1585 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1586 if ip4 := ip.To4(); ip4 != nil {
1587 return ip4.Mask(net.CIDRMask(24, 32))
1592 func (cl *Client) clearAcceptLimits() {
1593 cl.acceptLimiter = nil
1596 func (cl *Client) acceptLimitClearer() {
1599 case <-cl.closed.Done():
1601 case <-time.After(15 * time.Minute):
1603 cl.clearAcceptLimits()
1609 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1610 if cl.config.DisableAcceptRateLimiting {
1613 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1616 func (cl *Client) rLock() {
1620 func (cl *Client) rUnlock() {
1624 func (cl *Client) lock() {
1628 func (cl *Client) unlock() {
1632 func (cl *Client) locker() *lockWithDeferreds {
1636 func (cl *Client) String() string {
1637 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1640 // Returns connection-level aggregate stats at the Client level. See the comment on
1641 // TorrentStats.ConnStats.
1642 func (cl *Client) ConnStats() ConnStats {
1643 return cl.stats.Copy()