18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
33 "github.com/anacrolix/chansync"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/limiter"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
42 "github.com/anacrolix/torrent/tracker"
43 "github.com/anacrolix/torrent/webtorrent"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
55 closed chansync.SetOnce
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
81 activeAnnounceLimiter limiter.Instance
83 updateRequests chansync.BroadcastCond
88 func (cl *Client) BadPeerIPs() (ips []string) {
90 ips = cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() (ips []string) {
96 ips = make([]string, len(cl.badPeerIPs))
98 for k := range cl.badPeerIPs {
105 func (cl *Client) PeerID() PeerID {
109 // Returns the port number for the first listener that has one. No longer assumes that all port
110 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
112 func (cl *Client) LocalPort() (port int) {
113 for i := 0; i < len(cl.listeners); i += 1 {
114 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
121 func writeDhtServerStatus(w io.Writer, s DhtServer) {
122 dhtStats := s.Stats()
123 fmt.Fprintf(w, " ID: %x\n", s.ID())
124 spew.Fdump(w, dhtStats)
127 // Writes out a human readable status of the client, such as for writing to a
129 func (cl *Client) WriteStatus(_w io.Writer) {
132 w := bufio.NewWriter(_w)
134 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
135 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
136 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
137 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
138 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
139 cl.eachDhtServer(func(s DhtServer) {
140 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
141 writeDhtServerStatus(w, s)
143 spew.Fdump(w, &cl.stats)
144 torrentsSlice := cl.torrentsAsSlice()
145 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
147 sort.Slice(torrentsSlice, func(l, r int) bool {
148 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
150 for _, t := range torrentsSlice {
152 fmt.Fprint(w, "<unknown name>")
154 fmt.Fprint(w, t.name())
160 "%f%% of %d bytes (%s)",
161 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
163 humanize.Bytes(uint64(*t.length)))
165 w.WriteString("<missing metainfo>")
173 // Filters things that are less than warning from UPnP discovery.
174 func upnpDiscoverLogFilter(m log.Msg) bool {
175 level, ok := m.GetLevel()
176 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
179 func (cl *Client) initLogger() {
180 logger := cl.config.Logger
183 if !cl.config.Debug {
184 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
187 cl.logger = logger.WithValues(cl)
190 func (cl *Client) announceKey() int32 {
191 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
194 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
195 func (cl *Client) init(cfg *ClientConfig) {
197 cl.dopplegangerAddrs = make(map[string]struct{})
198 cl.torrents = make(map[metainfo.Hash]*Torrent)
199 cl.dialRateLimiter = rate.NewLimiter(10, 10)
200 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 cl.event.L = cl.locker()
203 cl.ipBlockList = cfg.IPBlocklist
206 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
208 cfg = NewDefaultClientConfig()
214 go cl.acceptLimitClearer()
223 storageImpl := cfg.DefaultStorage
224 if storageImpl == nil {
225 // We'd use mmap by default but HFS+ doesn't support sparse files.
226 storageImplCloser := storage.NewFile(cfg.DataDir)
227 cl.onClose = append(cl.onClose, func() {
228 if err := storageImplCloser.Close(); err != nil {
229 cl.logger.Printf("error closing default storage: %s", err)
232 storageImpl = storageImplCloser
234 cl.defaultStorage = storage.NewClient(storageImpl)
236 if cfg.PeerID != "" {
237 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
239 o := copy(cl.peerID[:], cfg.Bep20)
240 _, err = rand.Read(cl.peerID[o:])
242 panic("error generating peer id")
246 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
254 for _, _s := range sockets {
255 s := _s // Go is fucking retarded.
256 cl.onClose = append(cl.onClose, func() { s.Close() })
257 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
258 cl.dialers = append(cl.dialers, s)
259 cl.listeners = append(cl.listeners, s)
260 if cl.config.AcceptPeerConnections {
261 go cl.acceptConnections(s)
268 for _, s := range sockets {
269 if pc, ok := s.(net.PacketConn); ok {
270 ds, err := cl.NewAnacrolixDhtServer(pc)
274 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
275 cl.onClose = append(cl.onClose, func() { ds.Close() })
280 cl.websocketTrackers = websocketTrackers{
283 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
286 t, ok := cl.torrents[infoHash]
288 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
290 return t.announceRequest(event), nil
292 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
295 t, ok := cl.torrents[dcc.InfoHash]
297 cl.logger.WithDefaultLevel(log.Warning).Printf(
298 "got webrtc conn for unloaded torrent with infohash %x",
304 go t.onWebRtcConn(dc, dcc)
315 func (cl *Client) AddDhtServer(d DhtServer) {
316 cl.dhtServers = append(cl.dhtServers, d)
319 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
320 // given address for any Torrent.
321 func (cl *Client) AddDialer(d Dialer) {
324 cl.dialers = append(cl.dialers, d)
325 for _, t := range cl.torrents {
330 func (cl *Client) Listeners() []Listener {
334 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
336 func (cl *Client) AddListener(l Listener) {
337 cl.listeners = append(cl.listeners, l)
338 if cl.config.AcceptPeerConnections {
339 go cl.acceptConnections(l)
343 func (cl *Client) firewallCallback(net.Addr) bool {
345 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
348 torrent.Add("connections firewalled", 1)
350 torrent.Add("connections not firewalled", 1)
355 func (cl *Client) listenOnNetwork(n network) bool {
356 if n.Ipv4 && cl.config.DisableIPv4 {
359 if n.Ipv6 && cl.config.DisableIPv6 {
362 if n.Tcp && cl.config.DisableTCP {
365 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
371 func (cl *Client) listenNetworks() (ns []network) {
372 for _, n := range allPeerNetworks {
373 if cl.listenOnNetwork(n) {
380 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
381 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
382 cfg := dht.ServerConfig{
383 IPBlocklist: cl.ipBlockList,
385 OnAnnouncePeer: cl.onDHTAnnouncePeer,
386 PublicIP: func() net.IP {
387 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
388 return cl.config.PublicIp6
390 return cl.config.PublicIp4
392 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
393 OnQuery: cl.config.DHTOnQuery,
394 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
396 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
399 s, err = dht.NewServer(&cfg)
402 ts, err := s.Bootstrap()
404 cl.logger.Printf("error bootstrapping dht: %s", err)
406 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
412 func (cl *Client) Closed() chansync.Done {
413 return cl.closed.Done()
416 func (cl *Client) eachDhtServer(f func(DhtServer)) {
417 for _, ds := range cl.dhtServers {
422 // Stops the client. All connections to peers are closed and all activity will
424 func (cl *Client) Close() {
426 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
429 for _, t := range cl.torrents {
433 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
435 for i := range cl.onClose {
436 cl.onClose[len(cl.onClose)-1-i]()
441 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
442 if cl.ipBlockList == nil {
445 return cl.ipBlockList.Lookup(ip)
448 func (cl *Client) ipIsBlocked(ip net.IP) bool {
449 _, blocked := cl.ipBlockRange(ip)
453 func (cl *Client) wantConns() bool {
454 if cl.config.AlwaysWantConns {
457 for _, t := range cl.torrents {
465 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
466 func (cl *Client) rejectAccepted(conn net.Conn) error {
468 return errors.New("don't want conns right now")
470 ra := conn.RemoteAddr()
471 if rip := addrIpOrNil(ra); rip != nil {
472 if cl.config.DisableIPv4Peers && rip.To4() != nil {
473 return errors.New("ipv4 peers disabled")
475 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
476 return errors.New("ipv4 disabled")
479 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
480 return errors.New("ipv6 disabled")
482 if cl.rateLimitAccept(rip) {
483 return errors.New("source IP accepted rate limited")
485 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
486 return errors.New("bad source addr")
492 func (cl *Client) acceptConnections(l Listener) {
494 conn, err := l.Accept()
495 torrent.Add("client listener accepts", 1)
496 conn = pproffd.WrapNetConn(conn)
498 closed := cl.closed.IsSet()
501 reject = cl.rejectAccepted(conn)
511 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
516 torrent.Add("rejected accepted connections", 1)
517 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
520 go cl.incomingConnection(conn)
522 log.Fmsg("accepted %q connection at %q from %q",
526 ).SetLevel(log.Debug).Log(cl.logger)
527 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
528 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
529 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
534 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
535 func regularNetConnPeerConnConnString(nc net.Conn) string {
536 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
539 func (cl *Client) incomingConnection(nc net.Conn) {
541 if tc, ok := nc.(*net.TCPConn); ok {
544 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
545 regularNetConnPeerConnConnString(nc))
551 c.Discovery = PeerSourceIncoming
552 cl.runReceivedConn(c)
555 // Returns a handle to the given torrent, if it's present in the client.
556 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
559 t, ok = cl.torrents[ih]
563 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
564 return cl.torrents[ih]
567 type DialResult struct {
572 func countDialResult(err error) {
574 torrent.Add("successful dials", 1)
576 torrent.Add("unsuccessful dials", 1)
580 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
581 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
582 if ret < minDialTimeout {
588 // Returns whether an address is known to connect to a client with our own ID.
589 func (cl *Client) dopplegangerAddr(addr string) bool {
590 _, ok := cl.dopplegangerAddrs[addr]
594 // Returns a connection over UTP or TCP, whichever is first to connect.
595 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
596 return DialFirst(ctx, addr, cl.dialers)
599 // Returns a connection over UTP or TCP, whichever is first to connect.
600 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
602 t := perf.NewTimer(perf.CallerName(0))
605 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
607 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
611 ctx, cancel := context.WithCancel(ctx)
612 // As soon as we return one connection, cancel the others.
615 resCh := make(chan DialResult, left)
616 for _, _s := range dialers {
621 dialFromSocket(ctx, s, addr),
626 // Wait for a successful connection.
628 defer perf.ScopeTimer()()
629 for ; left > 0 && res.Conn == nil; left-- {
633 // There are still incompleted dials.
635 for ; left > 0; left-- {
636 conn := (<-resCh).Conn
643 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
648 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
649 c, err := s.Dial(ctx, addr)
650 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
651 // it now in case we close the connection forthwith.
652 if tc, ok := c.(*net.TCPConn); ok {
659 func forgettableDialError(err error) bool {
660 return strings.Contains(err.Error(), "no suitable address found")
663 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
664 if _, ok := t.halfOpen[addr]; !ok {
665 panic("invariant broken")
667 delete(t.halfOpen, addr)
669 for _, t := range cl.torrents {
674 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
675 // for valid reasons.
676 func (cl *Client) initiateProtocolHandshakes(
680 outgoing, encryptHeader bool,
681 remoteAddr PeerRemoteAddr,
682 network, connString string,
684 c *PeerConn, err error,
686 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
687 c.headerEncrypted = encryptHeader
688 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
690 dl, ok := ctx.Deadline()
694 err = nc.SetDeadline(dl)
698 err = cl.initiateHandshakes(c, t)
702 // Returns nil connection and nil error if no connection could be established for valid reasons.
703 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
704 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
707 return t.dialTimeout()
710 dr := cl.dialFirst(dialCtx, addr.String())
713 if dialCtx.Err() != nil {
714 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
716 return nil, errors.New("dial failed")
718 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
725 // Returns nil connection and nil error if no connection could be established
726 // for valid reasons.
727 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
728 torrent.Add("establish outgoing connection", 1)
729 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
730 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
732 torrent.Add("initiated conn with preferred header obfuscation", 1)
735 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
736 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
737 // We should have just tried with the preferred header obfuscation. If it was required,
738 // there's nothing else to try.
741 // Try again with encryption if we didn't earlier, or without if we did.
742 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
744 torrent.Add("initiated conn with fallback header obfuscation", 1)
746 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
750 // Called to dial out and run a connection. The addr we're given is already
751 // considered half-open.
752 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
753 cl.dialRateLimiter.Wait(context.Background())
754 c, err := cl.establishOutgoingConn(t, addr)
757 // Don't release lock between here and addPeerConn, unless it's for
759 cl.noLongerHalfOpen(t, addr.String())
762 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
769 t.runHandshookConnLoggingErr(c)
772 // The port number for incoming peer connections. 0 if the client isn't listening.
773 func (cl *Client) incomingPeerPort() int {
774 return cl.LocalPort()
777 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
778 if c.headerEncrypted {
781 rw, c.cryptoMethod, err = mse.InitiateHandshake(
788 cl.config.CryptoProvides,
792 return fmt.Errorf("header obfuscation handshake: %w", err)
795 ih, err := cl.connBtHandshake(c, &t.infoHash)
797 return fmt.Errorf("bittorrent protocol handshake: %w", err)
799 if ih != t.infoHash {
800 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
805 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
806 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
807 func (cl *Client) forSkeys(f func([]byte) bool) {
810 if false { // Emulate the bug from #114
812 for ih := range cl.torrents {
816 for range cl.torrents {
823 for ih := range cl.torrents {
830 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
831 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
837 // Do encryption and bittorrent handshakes as receiver.
838 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
839 defer perf.ScopeTimerErr(&err)()
841 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
843 if err == nil || err == mse.ErrNoSecretKeyMatch {
844 if c.headerEncrypted {
845 torrent.Add("handshakes received encrypted", 1)
847 torrent.Add("handshakes received unencrypted", 1)
850 torrent.Add("handshakes received with error while handling encryption", 1)
853 if err == mse.ErrNoSecretKeyMatch {
858 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
859 err = errors.New("connection does not have required header obfuscation")
862 ih, err := cl.connBtHandshake(c, nil)
864 return nil, fmt.Errorf("during bt handshake: %w", err)
872 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
873 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
878 c.PeerExtensionBytes = res.PeerExtensionBits
879 c.PeerID = res.PeerID
880 c.completedHandshake = time.Now()
881 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
887 func (cl *Client) runReceivedConn(c *PeerConn) {
888 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
892 t, err := cl.receiveHandshakes(c)
895 "error receiving handshakes on %v: %s", c, err,
896 ).SetLevel(log.Debug).
898 "network", c.Network,
900 torrent.Add("error receiving handshake", 1)
902 cl.onBadAccept(c.RemoteAddr)
907 torrent.Add("received handshake for unloaded torrent", 1)
908 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
910 cl.onBadAccept(c.RemoteAddr)
914 torrent.Add("received handshake for loaded torrent", 1)
917 t.runHandshookConnLoggingErr(c)
920 // Client lock must be held before entering this.
921 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
923 if c.PeerID == cl.peerID {
926 addr := c.conn.RemoteAddr().String()
927 cl.dopplegangerAddrs[addr] = struct{}{}
929 // Because the remote address is not necessarily the same as its client's torrent listen
930 // address, we won't record the remote address as a doppleganger. Instead, the initiator
931 // can record *us* as the doppleganger.
933 return errors.New("local and remote peer ids are the same")
935 c.conn.SetWriteDeadline(time.Time{})
936 c.r = deadlineReader{c.conn, c.r}
937 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
938 if connIsIpv6(c.conn) {
939 torrent.Add("completed handshake over ipv6", 1)
941 if err := t.addPeerConn(c); err != nil {
942 return fmt.Errorf("adding connection: %w", err)
944 defer t.dropConnection(c)
946 cl.sendInitialMessages(c, t)
947 err := c.mainReadLoop()
949 return fmt.Errorf("main read loop: %w", err)
954 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
955 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
956 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
957 const localClientReqq = 1 << 5
959 // See the order given in Transmission's tr_peerMsgsNew.
960 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
961 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
962 conn.write(pp.Message{
964 ExtendedID: pp.HandshakeExtendedID,
965 ExtendedPayload: func() []byte {
966 msg := pp.ExtendedHandshakeMessage{
967 M: map[pp.ExtensionName]pp.ExtensionNumber{
968 pp.ExtensionNameMetadata: metadataExtendedId,
970 V: cl.config.ExtendedHandshakeClientVersion,
971 Reqq: localClientReqq,
972 YourIp: pp.CompactIp(conn.remoteIp()),
973 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
974 Port: cl.incomingPeerPort(),
975 MetadataSize: torrent.metadataSize(),
976 // TODO: We can figured these out specific to the socket
978 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
979 Ipv6: cl.config.PublicIp6.To16(),
981 if !cl.config.DisablePEX {
982 msg.M[pp.ExtensionNamePex] = pexExtendedId
984 return bencode.MustMarshal(msg)
989 if conn.fastEnabled() {
990 if torrent.haveAllPieces() {
991 conn.write(pp.Message{Type: pp.HaveAll})
992 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
994 } else if !torrent.haveAnyPieces() {
995 conn.write(pp.Message{Type: pp.HaveNone})
996 conn.sentHaves.Clear()
1002 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1003 conn.write(pp.Message{
1010 func (cl *Client) dhtPort() (ret uint16) {
1011 if len(cl.dhtServers) == 0 {
1014 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1017 func (cl *Client) haveDhtServer() bool {
1018 return len(cl.dhtServers) > 0
1021 // Process incoming ut_metadata message.
1022 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1023 var d pp.ExtendedMetadataRequestMsg
1024 err := bencode.Unmarshal(payload, &d)
1025 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1026 } else if err != nil {
1027 return fmt.Errorf("error unmarshalling bencode: %s", err)
1031 case pp.DataMetadataExtensionMsgType:
1032 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1033 if !c.requestedMetadataPiece(piece) {
1034 return fmt.Errorf("got unexpected piece %d", piece)
1036 c.metadataRequests[piece] = false
1037 begin := len(payload) - d.PieceSize()
1038 if begin < 0 || begin >= len(payload) {
1039 return fmt.Errorf("data has bad offset in payload: %d", begin)
1041 t.saveMetadataPiece(piece, payload[begin:])
1042 c.lastUsefulChunkReceived = time.Now()
1043 err = t.maybeCompleteMetadata()
1045 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1046 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1047 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1048 // log consumers can filter for this message.
1049 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1052 case pp.RequestMetadataExtensionMsgType:
1053 if !t.haveMetadataPiece(piece) {
1054 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1057 start := (1 << 14) * piece
1058 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1059 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1061 case pp.RejectMetadataExtensionMsgType:
1064 return errors.New("unknown msg_type value")
1068 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1069 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1070 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1075 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1079 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1082 if _, ok := cl.ipBlockRange(ip); ok {
1085 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1091 // Return a Torrent ready for insertion into a Client.
1092 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1093 // use provided storage, if provided
1094 storageClient := cl.defaultStorage
1095 if specStorage != nil {
1096 storageClient = storage.NewClient(specStorage)
1102 peers: prioritizedPeers{
1104 getPrio: func(p PeerInfo) peerPriority {
1106 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1109 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1111 halfOpen: make(map[string]PeerInfo),
1112 pieceStateChanges: pubsub.NewPubSub(),
1114 storageOpener: storageClient,
1115 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1117 metadataChanged: sync.Cond{
1120 webSeeds: make(map[string]*Peer),
1121 gotMetainfoC: make(chan struct{}),
1123 t.networkingEnabled.Set()
1124 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1125 t.logger = cl.logger.WithContextValue(t)
1126 t.setChunkSize(defaultChunkSize)
1130 // A file-like handle to some torrent data resource.
1131 type Handle interface {
1138 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1139 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1142 // Adds a torrent by InfoHash with a custom Storage implementation.
1143 // If the torrent already exists then this Storage is ignored and the
1144 // existing torrent returned with `new` set to `false`
1145 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1148 t, ok := cl.torrents[infoHash]
1154 t = cl.newTorrent(infoHash, specStorage)
1155 cl.eachDhtServer(func(s DhtServer) {
1156 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1157 go t.dhtAnnouncer(s)
1160 cl.torrents[infoHash] = t
1161 cl.clearAcceptLimits()
1162 t.updateWantPeersEvent()
1163 // Tickle Client.waitAccept, new torrent may want conns.
1164 cl.event.Broadcast()
1168 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1169 // Torrent.MergeSpec.
1170 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1171 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1172 err = t.MergeSpec(spec)
1173 if err != nil && new {
1179 type stringAddr string
1181 var _ net.Addr = stringAddr("")
1183 func (stringAddr) Network() string { return "" }
1184 func (me stringAddr) String() string { return string(me) }
1186 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1187 // spec.DisallowDataDownload/Upload will be read and applied
1188 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1189 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1190 if spec.DisplayName != "" {
1191 t.SetDisplayName(spec.DisplayName)
1193 if spec.InfoBytes != nil {
1194 err := t.SetInfoBytes(spec.InfoBytes)
1200 cl.AddDhtNodes(spec.DhtNodes)
1203 useTorrentSources(spec.Sources, t)
1204 for _, url := range spec.Webseeds {
1207 for _, peerAddr := range spec.PeerAddrs {
1209 Addr: stringAddr(peerAddr),
1210 Source: PeerSourceDirect,
1214 if spec.ChunkSize != 0 {
1215 t.setChunkSize(pp.Integer(spec.ChunkSize))
1217 t.addTrackers(spec.Trackers)
1219 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1220 t.dataUploadDisallowed = spec.DisallowDataUpload
1224 func useTorrentSources(sources []string, t *Torrent) {
1225 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1226 ctx := context.Background()
1227 for i := 0; i < len(sources); i += 1 {
1230 if err := useTorrentSource(ctx, s, t); err != nil {
1231 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1233 t.logger.Printf("successfully used source %q", s)
1239 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1240 ctx, cancel := context.WithCancel(ctx)
1250 var req *http.Request
1251 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1254 var resp *http.Response
1255 if resp, err = http.DefaultClient.Do(req); err != nil {
1258 var mi metainfo.MetaInfo
1259 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1262 if ctx.Err() != nil {
1267 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1270 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1271 t, ok := cl.torrents[infoHash]
1273 err = fmt.Errorf("no such torrent")
1280 delete(cl.torrents, infoHash)
1284 func (cl *Client) allTorrentsCompleted() bool {
1285 for _, t := range cl.torrents {
1289 if !t.haveAllPieces() {
1296 // Returns true when all torrents are completely downloaded and false if the
1297 // client is stopped before that.
1298 func (cl *Client) WaitAll() bool {
1301 for !cl.allTorrentsCompleted() {
1302 if cl.closed.IsSet() {
1310 // Returns handles to all the torrents loaded in the Client.
1311 func (cl *Client) Torrents() []*Torrent {
1314 return cl.torrentsAsSlice()
1317 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1318 for _, t := range cl.torrents {
1319 ret = append(ret, t)
1324 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1325 spec, err := TorrentSpecFromMagnetUri(uri)
1329 T, _, err = cl.AddTorrentSpec(spec)
1333 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1334 ts, err := TorrentSpecFromMetaInfoErr(mi)
1338 T, _, err = cl.AddTorrentSpec(ts)
1342 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1343 mi, err := metainfo.LoadFromFile(filename)
1347 return cl.AddTorrent(mi)
1350 func (cl *Client) DhtServers() []DhtServer {
1351 return cl.dhtServers
1354 func (cl *Client) AddDhtNodes(nodes []string) {
1355 for _, n := range nodes {
1356 hmp := missinggo.SplitHostMaybePort(n)
1357 ip := net.ParseIP(hmp.Host)
1359 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1362 ni := krpc.NodeInfo{
1363 Addr: krpc.NodeAddr{
1368 cl.eachDhtServer(func(s DhtServer) {
1374 func (cl *Client) banPeerIP(ip net.IP) {
1375 cl.logger.Printf("banning ip %v", ip)
1376 if cl.badPeerIPs == nil {
1377 cl.badPeerIPs = make(map[string]struct{})
1379 cl.badPeerIPs[ip.String()] = struct{}{}
1382 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1391 PeerMaxRequests: 250,
1393 RemoteAddr: remoteAddr,
1395 callbacks: &cl.config.Callbacks,
1397 connString: connString,
1401 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1402 c.setRW(connStatsReadWriter{nc, c})
1403 c.r = &rateLimitedReader{
1404 l: cl.config.DownloadRateLimiter,
1407 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1408 for _, f := range cl.config.Callbacks.NewPeer {
1414 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1421 t.addPeers([]PeerInfo{{
1422 Addr: ipPortAddr{ip, port},
1423 Source: PeerSourceDhtAnnouncePeer,
1427 func firstNotNil(ips ...net.IP) net.IP {
1428 for _, ip := range ips {
1436 func (cl *Client) eachListener(f func(Listener) bool) {
1437 for _, s := range cl.listeners {
1444 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1445 for i := 0; i < len(cl.listeners); i += 1 {
1446 if ret = cl.listeners[i]; f(ret) {
1453 func (cl *Client) publicIp(peer net.IP) net.IP {
1454 // TODO: Use BEP 10 to determine how peers are seeing us.
1455 if peer.To4() != nil {
1457 cl.config.PublicIp4,
1458 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1463 cl.config.PublicIp6,
1464 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1468 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1469 l := cl.findListener(
1470 func(l Listener) bool {
1471 return f(addrIpOrNil(l.Addr()))
1477 return addrIpOrNil(l.Addr())
1480 // Our IP as a peer should see it.
1481 func (cl *Client) publicAddr(peer net.IP) IpPort {
1482 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1485 // ListenAddrs addresses currently being listened to.
1486 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1488 ret = make([]net.Addr, len(cl.listeners))
1489 for i := 0; i < len(cl.listeners); i += 1 {
1490 ret[i] = cl.listeners[i].Addr()
1496 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1497 ipa, ok := tryIpPortFromNetAddr(addr)
1501 ip := maskIpForAcceptLimiting(ipa.IP)
1502 if cl.acceptLimiter == nil {
1503 cl.acceptLimiter = make(map[ipStr]int)
1505 cl.acceptLimiter[ipStr(ip.String())]++
1508 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1509 if ip4 := ip.To4(); ip4 != nil {
1510 return ip4.Mask(net.CIDRMask(24, 32))
1515 func (cl *Client) clearAcceptLimits() {
1516 cl.acceptLimiter = nil
1519 func (cl *Client) acceptLimitClearer() {
1522 case <-cl.closed.Done():
1524 case <-time.After(15 * time.Minute):
1526 cl.clearAcceptLimits()
1532 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1533 if cl.config.DisableAcceptRateLimiting {
1536 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1539 func (cl *Client) rLock() {
1543 func (cl *Client) rUnlock() {
1547 func (cl *Client) lock() {
1551 func (cl *Client) unlock() {
1555 func (cl *Client) locker() *lockWithDeferreds {
1559 func (cl *Client) String() string {
1560 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1563 // Returns connection-level aggregate stats at the Client level. See the comment on
1564 // TorrentStats.ConnStats.
1565 func (cl *Client) ConnStats() ConnStats {
1566 return cl.stats.Copy()