18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
85 clientPieceRequestOrder
90 func (cl *Client) BadPeerIPs() []string {
93 return cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() []string {
97 return slices.FromMapKeys(cl.badPeerIPs).([]string)
100 func (cl *Client) PeerID() PeerID {
104 // Returns the port number for the first listener that has one. No longer assumes that all port
105 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
107 func (cl *Client) LocalPort() (port int) {
108 cl.eachListener(func(l Listener) bool {
109 port = addrPortOrZero(l.Addr())
115 func writeDhtServerStatus(w io.Writer, s DhtServer) {
116 dhtStats := s.Stats()
117 fmt.Fprintf(w, " ID: %x\n", s.ID())
118 spew.Fdump(w, dhtStats)
121 // Writes out a human readable status of the client, such as for writing to a
123 func (cl *Client) WriteStatus(_w io.Writer) {
126 w := bufio.NewWriter(_w)
128 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
129 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
130 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
131 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
132 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
133 cl.eachDhtServer(func(s DhtServer) {
134 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
135 writeDhtServerStatus(w, s)
137 spew.Fdump(w, &cl.stats)
138 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
140 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
141 return l.InfoHash().AsString() < r.InfoHash().AsString()
144 fmt.Fprint(w, "<unknown name>")
146 fmt.Fprint(w, t.name())
152 "%f%% of %d bytes (%s)",
153 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
155 humanize.Bytes(uint64(*t.length)))
157 w.WriteString("<missing metainfo>")
165 // Filters things that are less than warning from UPnP discovery.
166 func upnpDiscoverLogFilter(m log.Msg) bool {
167 level, ok := m.GetLevel()
168 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
171 func (cl *Client) initLogger() {
172 logger := cl.config.Logger
175 if !cl.config.Debug {
176 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
179 cl.logger = logger.WithValues(cl)
182 func (cl *Client) announceKey() int32 {
183 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
186 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
188 cfg = NewDefaultClientConfig()
198 dopplegangerAddrs: make(map[string]struct{}),
199 torrents: make(map[metainfo.Hash]*Torrent),
200 dialRateLimiter: rate.NewLimiter(10, 10),
202 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 go cl.acceptLimitClearer()
211 cl.event.L = cl.locker()
212 storageImpl := cfg.DefaultStorage
213 if storageImpl == nil {
214 // We'd use mmap by default but HFS+ doesn't support sparse files.
215 storageImplCloser := storage.NewFile(cfg.DataDir)
216 cl.onClose = append(cl.onClose, func() {
217 if err := storageImplCloser.Close(); err != nil {
218 cl.logger.Printf("error closing default storage: %s", err)
221 storageImpl = storageImplCloser
223 cl.defaultStorage = storage.NewClient(storageImpl)
224 if cfg.IPBlocklist != nil {
225 cl.ipBlockList = cfg.IPBlocklist
228 if cfg.PeerID != "" {
229 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
231 o := copy(cl.peerID[:], cfg.Bep20)
232 _, err = rand.Read(cl.peerID[o:])
234 panic("error generating peer id")
238 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
246 for _, _s := range sockets {
247 s := _s // Go is fucking retarded.
248 cl.onClose = append(cl.onClose, func() { s.Close() })
249 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
250 cl.dialers = append(cl.dialers, s)
251 cl.listeners = append(cl.listeners, s)
252 go cl.acceptConnections(s)
258 for _, s := range sockets {
259 if pc, ok := s.(net.PacketConn); ok {
260 ds, err := cl.newAnacrolixDhtServer(pc)
264 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
265 cl.onClose = append(cl.onClose, func() { ds.Close() })
270 cl.websocketTrackers = websocketTrackers{
273 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
276 t, ok := cl.torrents[infoHash]
278 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
280 return t.announceRequest(event), nil
282 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
285 t, ok := cl.torrents[dcc.InfoHash]
287 cl.logger.WithDefaultLevel(log.Warning).Printf(
288 "got webrtc conn for unloaded torrent with infohash %x",
294 go t.onWebRtcConn(dc, dcc)
303 func (cl *Client) AddDhtServer(d DhtServer) {
304 cl.dhtServers = append(cl.dhtServers, d)
307 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
308 // given address for any Torrent.
309 func (cl *Client) AddDialer(d Dialer) {
312 cl.dialers = append(cl.dialers, d)
313 for _, t := range cl.torrents {
318 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
320 func (cl *Client) AddListener(l Listener) {
321 cl.listeners = append(cl.listeners, l)
322 go cl.acceptConnections(l)
325 func (cl *Client) firewallCallback(net.Addr) bool {
327 block := !cl.wantConns()
330 torrent.Add("connections firewalled", 1)
332 torrent.Add("connections not firewalled", 1)
337 func (cl *Client) listenOnNetwork(n network) bool {
338 if n.Ipv4 && cl.config.DisableIPv4 {
341 if n.Ipv6 && cl.config.DisableIPv6 {
344 if n.Tcp && cl.config.DisableTCP {
347 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
353 func (cl *Client) listenNetworks() (ns []network) {
354 for _, n := range allPeerNetworks {
355 if cl.listenOnNetwork(n) {
362 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
363 cfg := dht.ServerConfig{
364 IPBlocklist: cl.ipBlockList,
366 OnAnnouncePeer: cl.onDHTAnnouncePeer,
367 PublicIP: func() net.IP {
368 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
369 return cl.config.PublicIp6
371 return cl.config.PublicIp4
373 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
374 ConnectionTracking: cl.config.ConnTracker,
375 OnQuery: cl.config.DHTOnQuery,
376 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
378 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
381 s, err = dht.NewServer(&cfg)
384 ts, err := s.Bootstrap()
386 cl.logger.Printf("error bootstrapping dht: %s", err)
388 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
394 func (cl *Client) Closed() <-chan struct{} {
400 func (cl *Client) eachDhtServer(f func(DhtServer)) {
401 for _, ds := range cl.dhtServers {
406 // Stops the client. All connections to peers are closed and all activity will
408 func (cl *Client) Close() {
412 for _, t := range cl.torrents {
415 for i := range cl.onClose {
416 cl.onClose[len(cl.onClose)-1-i]()
421 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
422 if cl.ipBlockList == nil {
425 return cl.ipBlockList.Lookup(ip)
428 func (cl *Client) ipIsBlocked(ip net.IP) bool {
429 _, blocked := cl.ipBlockRange(ip)
433 func (cl *Client) wantConns() bool {
434 for _, t := range cl.torrents {
442 func (cl *Client) waitAccept() {
444 if cl.closed.IsSet() {
454 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
455 func (cl *Client) rejectAccepted(conn net.Conn) error {
456 ra := conn.RemoteAddr()
457 if rip := addrIpOrNil(ra); rip != nil {
458 if cl.config.DisableIPv4Peers && rip.To4() != nil {
459 return errors.New("ipv4 peers disabled")
461 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
462 return errors.New("ipv4 disabled")
465 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
466 return errors.New("ipv6 disabled")
468 if cl.rateLimitAccept(rip) {
469 return errors.New("source IP accepted rate limited")
471 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
472 return errors.New("bad source addr")
478 func (cl *Client) acceptConnections(l Listener) {
480 conn, err := l.Accept()
481 torrent.Add("client listener accepts", 1)
482 conn = pproffd.WrapNetConn(conn)
484 closed := cl.closed.IsSet()
487 reject = cl.rejectAccepted(conn)
497 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
502 torrent.Add("rejected accepted connections", 1)
503 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
506 go cl.incomingConnection(conn)
508 log.Fmsg("accepted %q connection at %q from %q",
512 ).SetLevel(log.Debug).Log(cl.logger)
513 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
514 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
515 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
520 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
521 func regularNetConnPeerConnConnString(nc net.Conn) string {
522 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
525 func (cl *Client) incomingConnection(nc net.Conn) {
527 if tc, ok := nc.(*net.TCPConn); ok {
530 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
531 regularNetConnPeerConnConnString(nc))
533 c.Discovery = PeerSourceIncoming
534 cl.runReceivedConn(c)
537 // Returns a handle to the given torrent, if it's present in the client.
538 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
541 t, ok = cl.torrents[ih]
545 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
546 return cl.torrents[ih]
549 type dialResult struct {
554 func countDialResult(err error) {
556 torrent.Add("successful dials", 1)
558 torrent.Add("unsuccessful dials", 1)
562 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
563 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
564 if ret < minDialTimeout {
570 // Returns whether an address is known to connect to a client with our own ID.
571 func (cl *Client) dopplegangerAddr(addr string) bool {
572 _, ok := cl.dopplegangerAddrs[addr]
576 // Returns a connection over UTP or TCP, whichever is first to connect.
577 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
579 t := perf.NewTimer(perf.CallerName(0))
582 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
584 t.Mark("returned conn over " + res.Network)
588 ctx, cancel := context.WithCancel(ctx)
589 // As soon as we return one connection, cancel the others.
592 resCh := make(chan dialResult, left)
596 cl.eachDialer(func(s Dialer) bool {
599 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
602 cl.dialFromSocket(ctx, s, addr),
603 s.LocalAddr().Network(),
610 // Wait for a successful connection.
612 defer perf.ScopeTimer()()
613 for ; left > 0 && res.Conn == nil; left-- {
617 // There are still incompleted dials.
619 for ; left > 0; left-- {
620 conn := (<-resCh).Conn
627 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
629 //if res.Conn != nil {
630 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
632 // cl.logger.Printf("failed to dial %s", addr)
637 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
638 network := s.LocalAddr().Network()
639 cte := cl.config.ConnTracker.Wait(
641 conntrack.Entry{network, s.LocalAddr().String(), addr},
642 "dial torrent client",
645 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
646 // which dial errors allow us to forget the connection tracking entry handle.
647 if ctx.Err() != nil {
653 c, err := s.Dial(ctx, addr)
654 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
655 // it now in case we close the connection forthwith.
656 if tc, ok := c.(*net.TCPConn); ok {
661 if err != nil && forgettableDialError(err) {
668 return closeWrapper{c, func() error {
675 func forgettableDialError(err error) bool {
676 return strings.Contains(err.Error(), "no suitable address found")
679 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
680 if _, ok := t.halfOpen[addr]; !ok {
681 panic("invariant broken")
683 delete(t.halfOpen, addr)
685 for _, t := range cl.torrents {
690 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
691 // for valid reasons.
692 func (cl *Client) initiateProtocolHandshakes(
696 outgoing, encryptHeader bool,
697 remoteAddr PeerRemoteAddr,
698 network, connString string,
700 c *PeerConn, err error,
702 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
703 c.headerEncrypted = encryptHeader
704 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
706 dl, ok := ctx.Deadline()
710 err = nc.SetDeadline(dl)
714 err = cl.initiateHandshakes(c, t)
718 // Returns nil connection and nil error if no connection could be established for valid reasons.
719 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
720 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
723 return t.dialTimeout()
726 dr := cl.dialFirst(dialCtx, addr.String())
729 if dialCtx.Err() != nil {
730 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
732 return nil, errors.New("dial failed")
734 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
741 // Returns nil connection and nil error if no connection could be established
742 // for valid reasons.
743 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
744 torrent.Add("establish outgoing connection", 1)
745 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
746 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
748 torrent.Add("initiated conn with preferred header obfuscation", 1)
751 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
752 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
753 // We should have just tried with the preferred header obfuscation. If it was required,
754 // there's nothing else to try.
757 // Try again with encryption if we didn't earlier, or without if we did.
758 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
760 torrent.Add("initiated conn with fallback header obfuscation", 1)
762 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
766 // Called to dial out and run a connection. The addr we're given is already
767 // considered half-open.
768 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
769 cl.dialRateLimiter.Wait(context.Background())
770 c, err := cl.establishOutgoingConn(t, addr)
773 // Don't release lock between here and addPeerConn, unless it's for
775 cl.noLongerHalfOpen(t, addr.String())
778 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
785 t.runHandshookConnLoggingErr(c)
788 // The port number for incoming peer connections. 0 if the client isn't listening.
789 func (cl *Client) incomingPeerPort() int {
790 return cl.LocalPort()
793 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
794 if c.headerEncrypted {
797 rw, c.cryptoMethod, err = mse.InitiateHandshake(
804 cl.config.CryptoProvides,
808 return xerrors.Errorf("header obfuscation handshake: %w", err)
811 ih, err := cl.connBtHandshake(c, &t.infoHash)
813 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
815 if ih != t.infoHash {
816 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
821 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
822 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
823 func (cl *Client) forSkeys(f func([]byte) bool) {
826 if false { // Emulate the bug from #114
828 for ih := range cl.torrents {
832 for range cl.torrents {
839 for ih := range cl.torrents {
846 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
847 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
853 // Do encryption and bittorrent handshakes as receiver.
854 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
855 defer perf.ScopeTimerErr(&err)()
857 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
859 if err == nil || err == mse.ErrNoSecretKeyMatch {
860 if c.headerEncrypted {
861 torrent.Add("handshakes received encrypted", 1)
863 torrent.Add("handshakes received unencrypted", 1)
866 torrent.Add("handshakes received with error while handling encryption", 1)
869 if err == mse.ErrNoSecretKeyMatch {
874 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
875 err = errors.New("connection does not have required header obfuscation")
878 ih, err := cl.connBtHandshake(c, nil)
880 err = xerrors.Errorf("during bt handshake: %w", err)
889 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
890 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
895 c.PeerExtensionBytes = res.PeerExtensionBits
896 c.PeerID = res.PeerID
897 c.completedHandshake = time.Now()
898 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
904 func (cl *Client) runReceivedConn(c *PeerConn) {
905 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
909 t, err := cl.receiveHandshakes(c)
912 "error receiving handshakes on %v: %s", c, err,
913 ).SetLevel(log.Debug).
915 "network", c.Network,
917 torrent.Add("error receiving handshake", 1)
919 cl.onBadAccept(c.RemoteAddr)
924 torrent.Add("received handshake for unloaded torrent", 1)
925 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
927 cl.onBadAccept(c.RemoteAddr)
931 torrent.Add("received handshake for loaded torrent", 1)
934 t.runHandshookConnLoggingErr(c)
937 // Client lock must be held before entering this.
938 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
940 if c.PeerID == cl.peerID {
943 addr := c.conn.RemoteAddr().String()
944 cl.dopplegangerAddrs[addr] = struct{}{}
946 // Because the remote address is not necessarily the same as its client's torrent listen
947 // address, we won't record the remote address as a doppleganger. Instead, the initiator
948 // can record *us* as the doppleganger.
950 return errors.New("local and remote peer ids are the same")
952 c.conn.SetWriteDeadline(time.Time{})
953 c.r = deadlineReader{c.conn, c.r}
954 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
955 if connIsIpv6(c.conn) {
956 torrent.Add("completed handshake over ipv6", 1)
958 if err := t.addPeerConn(c); err != nil {
959 return fmt.Errorf("adding connection: %w", err)
961 defer t.dropConnection(c)
962 go c.writer(time.Minute)
963 cl.sendInitialMessages(c, t)
964 err := c.mainReadLoop()
966 return fmt.Errorf("main read loop: %w", err)
971 // If peer requests are buffered on read, this instructs the amount of memory that might be used to
972 // cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks.
973 const localClientReqq = 1 << 5
975 // See the order given in Transmission's tr_peerMsgsNew.
976 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
977 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
978 conn.post(pp.Message{
980 ExtendedID: pp.HandshakeExtendedID,
981 ExtendedPayload: func() []byte {
982 msg := pp.ExtendedHandshakeMessage{
983 M: map[pp.ExtensionName]pp.ExtensionNumber{
984 pp.ExtensionNameMetadata: metadataExtendedId,
986 V: cl.config.ExtendedHandshakeClientVersion,
987 Reqq: localClientReqq,
988 YourIp: pp.CompactIp(conn.remoteIp()),
989 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
990 Port: cl.incomingPeerPort(),
991 MetadataSize: torrent.metadataSize(),
992 // TODO: We can figured these out specific to the socket
994 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
995 Ipv6: cl.config.PublicIp6.To16(),
997 if !cl.config.DisablePEX {
998 msg.M[pp.ExtensionNamePex] = pexExtendedId
1000 return bencode.MustMarshal(msg)
1005 if conn.fastEnabled() {
1006 if torrent.haveAllPieces() {
1007 conn.post(pp.Message{Type: pp.HaveAll})
1008 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1010 } else if !torrent.haveAnyPieces() {
1011 conn.post(pp.Message{Type: pp.HaveNone})
1012 conn.sentHaves.Clear()
1018 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1019 conn.post(pp.Message{
1026 func (cl *Client) dhtPort() (ret uint16) {
1027 cl.eachDhtServer(func(s DhtServer) {
1028 ret = uint16(missinggo.AddrPort(s.Addr()))
1033 func (cl *Client) haveDhtServer() (ret bool) {
1034 cl.eachDhtServer(func(_ DhtServer) {
1040 // Process incoming ut_metadata message.
1041 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1042 var d map[string]int
1043 err := bencode.Unmarshal(payload, &d)
1044 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1045 } else if err != nil {
1046 return fmt.Errorf("error unmarshalling bencode: %s", err)
1048 msgType, ok := d["msg_type"]
1050 return errors.New("missing msg_type field")
1054 case pp.DataMetadataExtensionMsgType:
1055 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1056 if !c.requestedMetadataPiece(piece) {
1057 return fmt.Errorf("got unexpected piece %d", piece)
1059 c.metadataRequests[piece] = false
1060 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1061 if begin < 0 || begin >= len(payload) {
1062 return fmt.Errorf("data has bad offset in payload: %d", begin)
1064 t.saveMetadataPiece(piece, payload[begin:])
1065 c.lastUsefulChunkReceived = time.Now()
1066 err = t.maybeCompleteMetadata()
1068 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1069 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1070 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1071 // log consumers can filter for this message.
1072 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1075 case pp.RequestMetadataExtensionMsgType:
1076 if !t.haveMetadataPiece(piece) {
1077 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1080 start := (1 << 14) * piece
1081 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1082 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1084 case pp.RejectMetadataExtensionMsgType:
1087 return errors.New("unknown msg_type value")
1091 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1092 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1093 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1098 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1102 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1105 if _, ok := cl.ipBlockRange(ip); ok {
1108 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1114 // Return a Torrent ready for insertion into a Client.
1115 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1116 // use provided storage, if provided
1117 storageClient := cl.defaultStorage
1118 if specStorage != nil {
1119 storageClient = storage.NewClient(specStorage)
1125 peers: prioritizedPeers{
1127 getPrio: func(p PeerInfo) peerPriority {
1129 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1132 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1134 halfOpen: make(map[string]PeerInfo),
1135 pieceStateChanges: pubsub.NewPubSub(),
1137 storageOpener: storageClient,
1138 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1140 networkingEnabled: true,
1141 metadataChanged: sync.Cond{
1144 webSeeds: make(map[string]*Peer),
1146 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1147 t.logger = cl.logger.WithContextValue(t)
1148 t.setChunkSize(defaultChunkSize)
1152 // A file-like handle to some torrent data resource.
1153 type Handle interface {
1160 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1161 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1164 // Adds a torrent by InfoHash with a custom Storage implementation.
1165 // If the torrent already exists then this Storage is ignored and the
1166 // existing torrent returned with `new` set to `false`
1167 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1170 t, ok := cl.torrents[infoHash]
1176 t = cl.newTorrent(infoHash, specStorage)
1177 cl.eachDhtServer(func(s DhtServer) {
1178 go t.dhtAnnouncer(s)
1180 cl.torrents[infoHash] = t
1181 cl.clearAcceptLimits()
1182 t.updateWantPeersEvent()
1183 // Tickle Client.waitAccept, new torrent may want conns.
1184 cl.event.Broadcast()
1188 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1189 // Torrent.MergeSpec.
1190 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1191 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1192 err = t.MergeSpec(spec)
1193 if err != nil && new {
1199 type stringAddr string
1201 var _ net.Addr = stringAddr("")
1203 func (stringAddr) Network() string { return "" }
1204 func (me stringAddr) String() string { return string(me) }
1206 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1207 // spec.DisallowDataDownload/Upload will be read and applied
1208 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1209 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1210 if spec.DisplayName != "" {
1211 t.SetDisplayName(spec.DisplayName)
1213 if spec.InfoBytes != nil {
1214 err := t.SetInfoBytes(spec.InfoBytes)
1220 cl.AddDhtNodes(spec.DhtNodes)
1223 useTorrentSources(spec.Sources, t)
1224 for _, url := range spec.Webseeds {
1227 for _, peerAddr := range spec.PeerAddrs {
1229 Addr: stringAddr(peerAddr),
1230 Source: PeerSourceDirect,
1234 if spec.ChunkSize != 0 {
1235 t.setChunkSize(pp.Integer(spec.ChunkSize))
1237 t.addTrackers(spec.Trackers)
1239 t.dataDownloadDisallowed = spec.DisallowDataDownload
1240 t.dataUploadDisallowed = spec.DisallowDataUpload
1244 func useTorrentSources(sources []string, t *Torrent) {
1245 for _, s := range sources {
1247 err := useTorrentSource(s, t)
1249 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1251 t.logger.Printf("successfully used source %q", s)
1257 func useTorrentSource(source string, t *Torrent) error {
1258 req, err := http.NewRequest(http.MethodGet, source, nil)
1262 ctx, cancel := context.WithCancel(context.Background())
1272 req = req.WithContext(ctx)
1273 resp, err := http.DefaultClient.Do(req)
1277 mi, err := metainfo.Load(resp.Body)
1279 if ctx.Err() != nil {
1284 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1287 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1288 t, ok := cl.torrents[infoHash]
1290 err = fmt.Errorf("no such torrent")
1297 delete(cl.torrents, infoHash)
1301 func (cl *Client) allTorrentsCompleted() bool {
1302 for _, t := range cl.torrents {
1306 if !t.haveAllPieces() {
1313 // Returns true when all torrents are completely downloaded and false if the
1314 // client is stopped before that.
1315 func (cl *Client) WaitAll() bool {
1318 for !cl.allTorrentsCompleted() {
1319 if cl.closed.IsSet() {
1327 // Returns handles to all the torrents loaded in the Client.
1328 func (cl *Client) Torrents() []*Torrent {
1331 return cl.torrentsAsSlice()
1334 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1335 for _, t := range cl.torrents {
1336 ret = append(ret, t)
1341 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1342 spec, err := TorrentSpecFromMagnetUri(uri)
1346 T, _, err = cl.AddTorrentSpec(spec)
1350 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1351 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1355 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1356 mi, err := metainfo.LoadFromFile(filename)
1360 return cl.AddTorrent(mi)
1363 func (cl *Client) DhtServers() []DhtServer {
1364 return cl.dhtServers
1367 func (cl *Client) AddDhtNodes(nodes []string) {
1368 for _, n := range nodes {
1369 hmp := missinggo.SplitHostMaybePort(n)
1370 ip := net.ParseIP(hmp.Host)
1372 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1375 ni := krpc.NodeInfo{
1376 Addr: krpc.NodeAddr{
1381 cl.eachDhtServer(func(s DhtServer) {
1387 func (cl *Client) banPeerIP(ip net.IP) {
1388 cl.logger.Printf("banning ip %v", ip)
1389 if cl.badPeerIPs == nil {
1390 cl.badPeerIPs = make(map[string]struct{})
1392 cl.badPeerIPs[ip.String()] = struct{}{}
1395 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1404 PeerMaxRequests: 250,
1406 RemoteAddr: remoteAddr,
1408 callbacks: &cl.config.Callbacks,
1410 connString: connString,
1412 writeBuffer: new(bytes.Buffer),
1415 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1416 c.writerCond.L = cl.locker()
1417 c.setRW(connStatsReadWriter{nc, c})
1418 c.r = &rateLimitedReader{
1419 l: cl.config.DownloadRateLimiter,
1422 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1423 for _, f := range cl.config.Callbacks.NewPeer {
1429 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1436 t.addPeers([]PeerInfo{{
1437 Addr: ipPortAddr{ip, port},
1438 Source: PeerSourceDhtAnnouncePeer,
1442 func firstNotNil(ips ...net.IP) net.IP {
1443 for _, ip := range ips {
1451 func (cl *Client) eachDialer(f func(Dialer) bool) {
1452 for _, s := range cl.dialers {
1459 func (cl *Client) eachListener(f func(Listener) bool) {
1460 for _, s := range cl.listeners {
1467 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1468 cl.eachListener(func(l Listener) bool {
1475 func (cl *Client) publicIp(peer net.IP) net.IP {
1476 // TODO: Use BEP 10 to determine how peers are seeing us.
1477 if peer.To4() != nil {
1479 cl.config.PublicIp4,
1480 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1485 cl.config.PublicIp6,
1486 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1490 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1491 l := cl.findListener(
1492 func(l Listener) bool {
1493 return f(addrIpOrNil(l.Addr()))
1499 return addrIpOrNil(l.Addr())
1502 // Our IP as a peer should see it.
1503 func (cl *Client) publicAddr(peer net.IP) IpPort {
1504 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1507 // ListenAddrs addresses currently being listened to.
1508 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1511 cl.eachListener(func(l Listener) bool {
1512 ret = append(ret, l.Addr())
1518 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1519 ipa, ok := tryIpPortFromNetAddr(addr)
1523 ip := maskIpForAcceptLimiting(ipa.IP)
1524 if cl.acceptLimiter == nil {
1525 cl.acceptLimiter = make(map[ipStr]int)
1527 cl.acceptLimiter[ipStr(ip.String())]++
1530 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1531 if ip4 := ip.To4(); ip4 != nil {
1532 return ip4.Mask(net.CIDRMask(24, 32))
1537 func (cl *Client) clearAcceptLimits() {
1538 cl.acceptLimiter = nil
1541 func (cl *Client) acceptLimitClearer() {
1544 case <-cl.closed.LockedChan(cl.locker()):
1546 case <-time.After(15 * time.Minute):
1548 cl.clearAcceptLimits()
1554 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1555 if cl.config.DisableAcceptRateLimiting {
1558 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1561 func (cl *Client) rLock() {
1565 func (cl *Client) rUnlock() {
1569 func (cl *Client) lock() {
1573 func (cl *Client) unlock() {
1577 func (cl *Client) locker() *lockWithDeferreds {
1581 func (cl *Client) String() string {
1582 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1585 // Returns connection-level aggregate stats at the Client level. See the comment on
1586 // TorrentStats.ConnStats.
1587 func (cl *Client) ConnStats() ConnStats {
1588 return cl.stats.Copy()