18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
33 "github.com/anacrolix/chansync"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/limiter"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
42 "github.com/anacrolix/torrent/tracker"
43 "github.com/anacrolix/torrent/webtorrent"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
55 closed chansync.SetOnce
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
81 activeAnnounceLimiter limiter.Instance
83 updateRequests chansync.BroadcastCond
88 func (cl *Client) BadPeerIPs() (ips []string) {
90 ips = cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() (ips []string) {
96 ips = make([]string, len(cl.badPeerIPs))
98 for k := range cl.badPeerIPs {
105 func (cl *Client) PeerID() PeerID {
109 // Returns the port number for the first listener that has one. No longer assumes that all port
110 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
112 func (cl *Client) LocalPort() (port int) {
113 for i := 0; i < len(cl.listeners); i += 1 {
114 if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
121 func writeDhtServerStatus(w io.Writer, s DhtServer) {
122 dhtStats := s.Stats()
123 fmt.Fprintf(w, " ID: %x\n", s.ID())
124 spew.Fdump(w, dhtStats)
127 // Writes out a human readable status of the client, such as for writing to a
129 func (cl *Client) WriteStatus(_w io.Writer) {
132 w := bufio.NewWriter(_w)
134 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
135 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
136 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
137 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
138 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
139 cl.eachDhtServer(func(s DhtServer) {
140 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
141 writeDhtServerStatus(w, s)
143 spew.Fdump(w, &cl.stats)
144 torrentsSlice := cl.torrentsAsSlice()
145 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
147 sort.Slice(torrentsSlice, func(l, r int) bool {
148 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
150 for _, t := range torrentsSlice {
152 fmt.Fprint(w, "<unknown name>")
154 fmt.Fprint(w, t.name())
160 "%f%% of %d bytes (%s)",
161 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
163 humanize.Bytes(uint64(*t.length)))
165 w.WriteString("<missing metainfo>")
173 // Filters things that are less than warning from UPnP discovery.
174 func upnpDiscoverLogFilter(m log.Msg) bool {
175 level, ok := m.GetLevel()
176 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
179 func (cl *Client) initLogger() {
180 logger := cl.config.Logger
183 if !cl.config.Debug {
184 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
187 cl.logger = logger.WithValues(cl)
190 func (cl *Client) announceKey() int32 {
191 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
194 // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
195 func (cl *Client) init(cfg *ClientConfig) {
197 cl.dopplegangerAddrs = make(map[string]struct{})
198 cl.torrents = make(map[metainfo.Hash]*Torrent)
199 cl.dialRateLimiter = rate.NewLimiter(10, 10)
200 cl.activeAnnounceLimiter.SlotsPerKey = 2
202 cl.event.L = cl.locker()
203 cl.ipBlockList = cfg.IPBlocklist
206 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
208 cfg = NewDefaultClientConfig()
214 go cl.acceptLimitClearer()
223 storageImpl := cfg.DefaultStorage
224 if storageImpl == nil {
225 // We'd use mmap by default but HFS+ doesn't support sparse files.
226 storageImplCloser := storage.NewFile(cfg.DataDir)
227 cl.onClose = append(cl.onClose, func() {
228 if err := storageImplCloser.Close(); err != nil {
229 cl.logger.Printf("error closing default storage: %s", err)
232 storageImpl = storageImplCloser
234 cl.defaultStorage = storage.NewClient(storageImpl)
236 if cfg.PeerID != "" {
237 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
239 o := copy(cl.peerID[:], cfg.Bep20)
240 _, err = rand.Read(cl.peerID[o:])
242 panic("error generating peer id")
246 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
254 for _, _s := range sockets {
255 s := _s // Go is fucking retarded.
256 cl.onClose = append(cl.onClose, func() { s.Close() })
257 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
258 cl.dialers = append(cl.dialers, s)
259 cl.listeners = append(cl.listeners, s)
260 if cl.config.AcceptPeerConnections {
261 go cl.acceptConnections(s)
268 for _, s := range sockets {
269 if pc, ok := s.(net.PacketConn); ok {
270 ds, err := cl.NewAnacrolixDhtServer(pc)
274 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
275 cl.onClose = append(cl.onClose, func() { ds.Close() })
280 cl.websocketTrackers = websocketTrackers{
283 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
286 t, ok := cl.torrents[infoHash]
288 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
290 return t.announceRequest(event), nil
292 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
295 t, ok := cl.torrents[dcc.InfoHash]
297 cl.logger.WithDefaultLevel(log.Warning).Printf(
298 "got webrtc conn for unloaded torrent with infohash %x",
304 go t.onWebRtcConn(dc, dcc)
315 func (cl *Client) AddDhtServer(d DhtServer) {
316 cl.dhtServers = append(cl.dhtServers, d)
319 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
320 // given address for any Torrent.
321 func (cl *Client) AddDialer(d Dialer) {
324 cl.dialers = append(cl.dialers, d)
325 for _, t := range cl.torrents {
330 func (cl *Client) Listeners() []Listener {
334 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
336 func (cl *Client) AddListener(l Listener) {
337 cl.listeners = append(cl.listeners, l)
338 if cl.config.AcceptPeerConnections {
339 go cl.acceptConnections(l)
343 func (cl *Client) firewallCallback(net.Addr) bool {
345 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
348 torrent.Add("connections firewalled", 1)
350 torrent.Add("connections not firewalled", 1)
355 func (cl *Client) listenOnNetwork(n network) bool {
356 if n.Ipv4 && cl.config.DisableIPv4 {
359 if n.Ipv6 && cl.config.DisableIPv6 {
362 if n.Tcp && cl.config.DisableTCP {
365 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
371 func (cl *Client) listenNetworks() (ns []network) {
372 for _, n := range allPeerNetworks {
373 if cl.listenOnNetwork(n) {
380 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
381 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
382 cfg := dht.ServerConfig{
383 IPBlocklist: cl.ipBlockList,
385 OnAnnouncePeer: cl.onDHTAnnouncePeer,
386 PublicIP: func() net.IP {
387 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
388 return cl.config.PublicIp6
390 return cl.config.PublicIp4
392 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
393 OnQuery: cl.config.DHTOnQuery,
394 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
396 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
399 s, err = dht.NewServer(&cfg)
402 ts, err := s.Bootstrap()
404 cl.logger.Printf("error bootstrapping dht: %s", err)
406 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
412 func (cl *Client) Closed() chansync.Done {
413 return cl.closed.Done()
416 func (cl *Client) eachDhtServer(f func(DhtServer)) {
417 for _, ds := range cl.dhtServers {
422 // Stops the client. All connections to peers are closed and all activity will
424 func (cl *Client) Close() {
426 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
429 for _, t := range cl.torrents {
433 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
435 for i := range cl.onClose {
436 cl.onClose[len(cl.onClose)-1-i]()
441 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
442 if cl.ipBlockList == nil {
445 return cl.ipBlockList.Lookup(ip)
448 func (cl *Client) ipIsBlocked(ip net.IP) bool {
449 _, blocked := cl.ipBlockRange(ip)
453 func (cl *Client) wantConns() bool {
454 for _, t := range cl.torrents {
462 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
463 func (cl *Client) rejectAccepted(conn net.Conn) error {
465 return errors.New("don't want conns right now")
467 ra := conn.RemoteAddr()
468 if rip := addrIpOrNil(ra); rip != nil {
469 if cl.config.DisableIPv4Peers && rip.To4() != nil {
470 return errors.New("ipv4 peers disabled")
472 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
473 return errors.New("ipv4 disabled")
476 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
477 return errors.New("ipv6 disabled")
479 if cl.rateLimitAccept(rip) {
480 return errors.New("source IP accepted rate limited")
482 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
483 return errors.New("bad source addr")
489 func (cl *Client) acceptConnections(l Listener) {
491 conn, err := l.Accept()
492 torrent.Add("client listener accepts", 1)
493 conn = pproffd.WrapNetConn(conn)
495 closed := cl.closed.IsSet()
498 reject = cl.rejectAccepted(conn)
508 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
513 torrent.Add("rejected accepted connections", 1)
514 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
517 go cl.incomingConnection(conn)
519 log.Fmsg("accepted %q connection at %q from %q",
523 ).SetLevel(log.Debug).Log(cl.logger)
524 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
525 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
526 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
531 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
532 func regularNetConnPeerConnConnString(nc net.Conn) string {
533 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
536 func (cl *Client) incomingConnection(nc net.Conn) {
538 if tc, ok := nc.(*net.TCPConn); ok {
541 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
542 regularNetConnPeerConnConnString(nc))
548 c.Discovery = PeerSourceIncoming
549 cl.runReceivedConn(c)
552 // Returns a handle to the given torrent, if it's present in the client.
553 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
556 t, ok = cl.torrents[ih]
560 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
561 return cl.torrents[ih]
564 type DialResult struct {
569 func countDialResult(err error) {
571 torrent.Add("successful dials", 1)
573 torrent.Add("unsuccessful dials", 1)
577 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
578 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
579 if ret < minDialTimeout {
585 // Returns whether an address is known to connect to a client with our own ID.
586 func (cl *Client) dopplegangerAddr(addr string) bool {
587 _, ok := cl.dopplegangerAddrs[addr]
591 // Returns a connection over UTP or TCP, whichever is first to connect.
592 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
593 return DialFirst(ctx, addr, cl.dialers)
596 // Returns a connection over UTP or TCP, whichever is first to connect.
597 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
599 t := perf.NewTimer(perf.CallerName(0))
602 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
604 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
608 ctx, cancel := context.WithCancel(ctx)
609 // As soon as we return one connection, cancel the others.
612 resCh := make(chan DialResult, left)
613 for _, _s := range dialers {
618 dialFromSocket(ctx, s, addr),
623 // Wait for a successful connection.
625 defer perf.ScopeTimer()()
626 for ; left > 0 && res.Conn == nil; left-- {
630 // There are still incompleted dials.
632 for ; left > 0; left-- {
633 conn := (<-resCh).Conn
640 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
645 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
646 c, err := s.Dial(ctx, addr)
647 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
648 // it now in case we close the connection forthwith.
649 if tc, ok := c.(*net.TCPConn); ok {
656 func forgettableDialError(err error) bool {
657 return strings.Contains(err.Error(), "no suitable address found")
660 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
661 if _, ok := t.halfOpen[addr]; !ok {
662 panic("invariant broken")
664 delete(t.halfOpen, addr)
666 for _, t := range cl.torrents {
671 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
672 // for valid reasons.
673 func (cl *Client) initiateProtocolHandshakes(
677 outgoing, encryptHeader bool,
678 remoteAddr PeerRemoteAddr,
679 network, connString string,
681 c *PeerConn, err error,
683 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
684 c.headerEncrypted = encryptHeader
685 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
687 dl, ok := ctx.Deadline()
691 err = nc.SetDeadline(dl)
695 err = cl.initiateHandshakes(c, t)
699 // Returns nil connection and nil error if no connection could be established for valid reasons.
700 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
701 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
704 return t.dialTimeout()
707 dr := cl.dialFirst(dialCtx, addr.String())
710 if dialCtx.Err() != nil {
711 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
713 return nil, errors.New("dial failed")
715 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
722 // Returns nil connection and nil error if no connection could be established
723 // for valid reasons.
724 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
725 torrent.Add("establish outgoing connection", 1)
726 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
727 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
729 torrent.Add("initiated conn with preferred header obfuscation", 1)
732 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
733 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
734 // We should have just tried with the preferred header obfuscation. If it was required,
735 // there's nothing else to try.
738 // Try again with encryption if we didn't earlier, or without if we did.
739 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
741 torrent.Add("initiated conn with fallback header obfuscation", 1)
743 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
747 // Called to dial out and run a connection. The addr we're given is already
748 // considered half-open.
749 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
750 cl.dialRateLimiter.Wait(context.Background())
751 c, err := cl.establishOutgoingConn(t, addr)
754 // Don't release lock between here and addPeerConn, unless it's for
756 cl.noLongerHalfOpen(t, addr.String())
759 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
766 t.runHandshookConnLoggingErr(c)
769 // The port number for incoming peer connections. 0 if the client isn't listening.
770 func (cl *Client) incomingPeerPort() int {
771 return cl.LocalPort()
774 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
775 if c.headerEncrypted {
778 rw, c.cryptoMethod, err = mse.InitiateHandshake(
785 cl.config.CryptoProvides,
789 return fmt.Errorf("header obfuscation handshake: %w", err)
792 ih, err := cl.connBtHandshake(c, &t.infoHash)
794 return fmt.Errorf("bittorrent protocol handshake: %w", err)
796 if ih != t.infoHash {
797 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
802 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
803 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
804 func (cl *Client) forSkeys(f func([]byte) bool) {
807 if false { // Emulate the bug from #114
809 for ih := range cl.torrents {
813 for range cl.torrents {
820 for ih := range cl.torrents {
827 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
828 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
834 // Do encryption and bittorrent handshakes as receiver.
835 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
836 defer perf.ScopeTimerErr(&err)()
838 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
840 if err == nil || err == mse.ErrNoSecretKeyMatch {
841 if c.headerEncrypted {
842 torrent.Add("handshakes received encrypted", 1)
844 torrent.Add("handshakes received unencrypted", 1)
847 torrent.Add("handshakes received with error while handling encryption", 1)
850 if err == mse.ErrNoSecretKeyMatch {
855 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
856 err = errors.New("connection does not have required header obfuscation")
859 ih, err := cl.connBtHandshake(c, nil)
861 return nil, fmt.Errorf("during bt handshake: %w", err)
869 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
870 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
875 c.PeerExtensionBytes = res.PeerExtensionBits
876 c.PeerID = res.PeerID
877 c.completedHandshake = time.Now()
878 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
884 func (cl *Client) runReceivedConn(c *PeerConn) {
885 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
889 t, err := cl.receiveHandshakes(c)
892 "error receiving handshakes on %v: %s", c, err,
893 ).SetLevel(log.Debug).
895 "network", c.Network,
897 torrent.Add("error receiving handshake", 1)
899 cl.onBadAccept(c.RemoteAddr)
904 torrent.Add("received handshake for unloaded torrent", 1)
905 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
907 cl.onBadAccept(c.RemoteAddr)
911 torrent.Add("received handshake for loaded torrent", 1)
914 t.runHandshookConnLoggingErr(c)
917 // Client lock must be held before entering this.
918 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
920 if c.PeerID == cl.peerID {
923 addr := c.conn.RemoteAddr().String()
924 cl.dopplegangerAddrs[addr] = struct{}{}
926 // Because the remote address is not necessarily the same as its client's torrent listen
927 // address, we won't record the remote address as a doppleganger. Instead, the initiator
928 // can record *us* as the doppleganger.
930 return errors.New("local and remote peer ids are the same")
932 c.conn.SetWriteDeadline(time.Time{})
933 c.r = deadlineReader{c.conn, c.r}
934 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
935 if connIsIpv6(c.conn) {
936 torrent.Add("completed handshake over ipv6", 1)
938 if err := t.addPeerConn(c); err != nil {
939 return fmt.Errorf("adding connection: %w", err)
941 defer t.dropConnection(c)
943 cl.sendInitialMessages(c, t)
944 err := c.mainReadLoop()
946 return fmt.Errorf("main read loop: %w", err)
951 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
952 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
953 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
954 const localClientReqq = 1 << 5
956 // See the order given in Transmission's tr_peerMsgsNew.
957 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
958 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
959 conn.write(pp.Message{
961 ExtendedID: pp.HandshakeExtendedID,
962 ExtendedPayload: func() []byte {
963 msg := pp.ExtendedHandshakeMessage{
964 M: map[pp.ExtensionName]pp.ExtensionNumber{
965 pp.ExtensionNameMetadata: metadataExtendedId,
967 V: cl.config.ExtendedHandshakeClientVersion,
968 Reqq: localClientReqq,
969 YourIp: pp.CompactIp(conn.remoteIp()),
970 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
971 Port: cl.incomingPeerPort(),
972 MetadataSize: torrent.metadataSize(),
973 // TODO: We can figured these out specific to the socket
975 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
976 Ipv6: cl.config.PublicIp6.To16(),
978 if !cl.config.DisablePEX {
979 msg.M[pp.ExtensionNamePex] = pexExtendedId
981 return bencode.MustMarshal(msg)
986 if conn.fastEnabled() {
987 if torrent.haveAllPieces() {
988 conn.write(pp.Message{Type: pp.HaveAll})
989 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
991 } else if !torrent.haveAnyPieces() {
992 conn.write(pp.Message{Type: pp.HaveNone})
993 conn.sentHaves.Clear()
999 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1000 conn.write(pp.Message{
1007 func (cl *Client) dhtPort() (ret uint16) {
1008 if len(cl.dhtServers) == 0 {
1011 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1014 func (cl *Client) haveDhtServer() bool {
1015 return len(cl.dhtServers) > 0
1018 // Process incoming ut_metadata message.
1019 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1020 var d pp.ExtendedMetadataRequestMsg
1021 err := bencode.Unmarshal(payload, &d)
1022 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1023 } else if err != nil {
1024 return fmt.Errorf("error unmarshalling bencode: %s", err)
1028 case pp.DataMetadataExtensionMsgType:
1029 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1030 if !c.requestedMetadataPiece(piece) {
1031 return fmt.Errorf("got unexpected piece %d", piece)
1033 c.metadataRequests[piece] = false
1034 begin := len(payload) - d.PieceSize()
1035 if begin < 0 || begin >= len(payload) {
1036 return fmt.Errorf("data has bad offset in payload: %d", begin)
1038 t.saveMetadataPiece(piece, payload[begin:])
1039 c.lastUsefulChunkReceived = time.Now()
1040 err = t.maybeCompleteMetadata()
1042 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1043 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1044 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1045 // log consumers can filter for this message.
1046 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1049 case pp.RequestMetadataExtensionMsgType:
1050 if !t.haveMetadataPiece(piece) {
1051 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1054 start := (1 << 14) * piece
1055 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1056 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1058 case pp.RejectMetadataExtensionMsgType:
1061 return errors.New("unknown msg_type value")
1065 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1066 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1067 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1072 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1076 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1079 if _, ok := cl.ipBlockRange(ip); ok {
1082 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1088 // Return a Torrent ready for insertion into a Client.
1089 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1090 // use provided storage, if provided
1091 storageClient := cl.defaultStorage
1092 if specStorage != nil {
1093 storageClient = storage.NewClient(specStorage)
1099 peers: prioritizedPeers{
1101 getPrio: func(p PeerInfo) peerPriority {
1103 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1106 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1108 halfOpen: make(map[string]PeerInfo),
1109 pieceStateChanges: pubsub.NewPubSub(),
1111 storageOpener: storageClient,
1112 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1114 metadataChanged: sync.Cond{
1117 webSeeds: make(map[string]*Peer),
1118 gotMetainfoC: make(chan struct{}),
1120 t.networkingEnabled.Set()
1121 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1122 t.logger = cl.logger.WithContextValue(t)
1123 t.setChunkSize(defaultChunkSize)
1127 // A file-like handle to some torrent data resource.
1128 type Handle interface {
1135 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1136 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1139 // Adds a torrent by InfoHash with a custom Storage implementation.
1140 // If the torrent already exists then this Storage is ignored and the
1141 // existing torrent returned with `new` set to `false`
1142 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1145 t, ok := cl.torrents[infoHash]
1151 t = cl.newTorrent(infoHash, specStorage)
1152 cl.eachDhtServer(func(s DhtServer) {
1153 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1154 go t.dhtAnnouncer(s)
1157 cl.torrents[infoHash] = t
1158 cl.clearAcceptLimits()
1159 t.updateWantPeersEvent()
1160 // Tickle Client.waitAccept, new torrent may want conns.
1161 cl.event.Broadcast()
1165 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1166 // Torrent.MergeSpec.
1167 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1168 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1169 err = t.MergeSpec(spec)
1170 if err != nil && new {
1176 type stringAddr string
1178 var _ net.Addr = stringAddr("")
1180 func (stringAddr) Network() string { return "" }
1181 func (me stringAddr) String() string { return string(me) }
1183 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1184 // spec.DisallowDataDownload/Upload will be read and applied
1185 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1186 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1187 if spec.DisplayName != "" {
1188 t.SetDisplayName(spec.DisplayName)
1190 if spec.InfoBytes != nil {
1191 err := t.SetInfoBytes(spec.InfoBytes)
1197 cl.AddDhtNodes(spec.DhtNodes)
1200 useTorrentSources(spec.Sources, t)
1201 for _, url := range spec.Webseeds {
1204 for _, peerAddr := range spec.PeerAddrs {
1206 Addr: stringAddr(peerAddr),
1207 Source: PeerSourceDirect,
1211 if spec.ChunkSize != 0 {
1212 t.setChunkSize(pp.Integer(spec.ChunkSize))
1214 t.addTrackers(spec.Trackers)
1216 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1217 t.dataUploadDisallowed = spec.DisallowDataUpload
1221 func useTorrentSources(sources []string, t *Torrent) {
1222 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1223 ctx := context.Background()
1224 for i := 0; i < len(sources); i += 1 {
1227 if err := useTorrentSource(ctx, s, t); err != nil {
1228 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1230 t.logger.Printf("successfully used source %q", s)
1236 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1237 ctx, cancel := context.WithCancel(ctx)
1247 var req *http.Request
1248 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1251 var resp *http.Response
1252 if resp, err = http.DefaultClient.Do(req); err != nil {
1255 var mi metainfo.MetaInfo
1256 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1259 if ctx.Err() != nil {
1264 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1267 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1268 t, ok := cl.torrents[infoHash]
1270 err = fmt.Errorf("no such torrent")
1277 delete(cl.torrents, infoHash)
1281 func (cl *Client) allTorrentsCompleted() bool {
1282 for _, t := range cl.torrents {
1286 if !t.haveAllPieces() {
1293 // Returns true when all torrents are completely downloaded and false if the
1294 // client is stopped before that.
1295 func (cl *Client) WaitAll() bool {
1298 for !cl.allTorrentsCompleted() {
1299 if cl.closed.IsSet() {
1307 // Returns handles to all the torrents loaded in the Client.
1308 func (cl *Client) Torrents() []*Torrent {
1311 return cl.torrentsAsSlice()
1314 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1315 for _, t := range cl.torrents {
1316 ret = append(ret, t)
1321 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1322 spec, err := TorrentSpecFromMagnetUri(uri)
1326 T, _, err = cl.AddTorrentSpec(spec)
1330 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1331 ts, err := TorrentSpecFromMetaInfoErr(mi)
1335 T, _, err = cl.AddTorrentSpec(ts)
1339 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1340 mi, err := metainfo.LoadFromFile(filename)
1344 return cl.AddTorrent(mi)
1347 func (cl *Client) DhtServers() []DhtServer {
1348 return cl.dhtServers
1351 func (cl *Client) AddDhtNodes(nodes []string) {
1352 for _, n := range nodes {
1353 hmp := missinggo.SplitHostMaybePort(n)
1354 ip := net.ParseIP(hmp.Host)
1356 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1359 ni := krpc.NodeInfo{
1360 Addr: krpc.NodeAddr{
1365 cl.eachDhtServer(func(s DhtServer) {
1371 func (cl *Client) banPeerIP(ip net.IP) {
1372 cl.logger.Printf("banning ip %v", ip)
1373 if cl.badPeerIPs == nil {
1374 cl.badPeerIPs = make(map[string]struct{})
1376 cl.badPeerIPs[ip.String()] = struct{}{}
1379 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1388 PeerMaxRequests: 250,
1390 RemoteAddr: remoteAddr,
1392 callbacks: &cl.config.Callbacks,
1394 connString: connString,
1398 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1399 c.setRW(connStatsReadWriter{nc, c})
1400 c.r = &rateLimitedReader{
1401 l: cl.config.DownloadRateLimiter,
1404 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1405 for _, f := range cl.config.Callbacks.NewPeer {
1411 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1418 t.addPeers([]PeerInfo{{
1419 Addr: ipPortAddr{ip, port},
1420 Source: PeerSourceDhtAnnouncePeer,
1424 func firstNotNil(ips ...net.IP) net.IP {
1425 for _, ip := range ips {
1433 func (cl *Client) eachListener(f func(Listener) bool) {
1434 for _, s := range cl.listeners {
1441 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1442 for i := 0; i < len(cl.listeners); i += 1 {
1443 if ret = cl.listeners[i]; f(ret) {
1450 func (cl *Client) publicIp(peer net.IP) net.IP {
1451 // TODO: Use BEP 10 to determine how peers are seeing us.
1452 if peer.To4() != nil {
1454 cl.config.PublicIp4,
1455 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1460 cl.config.PublicIp6,
1461 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1465 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1466 l := cl.findListener(
1467 func(l Listener) bool {
1468 return f(addrIpOrNil(l.Addr()))
1474 return addrIpOrNil(l.Addr())
1477 // Our IP as a peer should see it.
1478 func (cl *Client) publicAddr(peer net.IP) IpPort {
1479 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1482 // ListenAddrs addresses currently being listened to.
1483 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1485 ret = make([]net.Addr, len(cl.listeners))
1486 for i := 0; i < len(cl.listeners); i += 1 {
1487 ret[i] = cl.listeners[i].Addr()
1493 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1494 ipa, ok := tryIpPortFromNetAddr(addr)
1498 ip := maskIpForAcceptLimiting(ipa.IP)
1499 if cl.acceptLimiter == nil {
1500 cl.acceptLimiter = make(map[ipStr]int)
1502 cl.acceptLimiter[ipStr(ip.String())]++
1505 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1506 if ip4 := ip.To4(); ip4 != nil {
1507 return ip4.Mask(net.CIDRMask(24, 32))
1512 func (cl *Client) clearAcceptLimits() {
1513 cl.acceptLimiter = nil
1516 func (cl *Client) acceptLimitClearer() {
1519 case <-cl.closed.Done():
1521 case <-time.After(15 * time.Minute):
1523 cl.clearAcceptLimits()
1529 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1530 if cl.config.DisableAcceptRateLimiting {
1533 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1536 func (cl *Client) rLock() {
1540 func (cl *Client) rUnlock() {
1544 func (cl *Client) lock() {
1548 func (cl *Client) unlock() {
1552 func (cl *Client) locker() *lockWithDeferreds {
1556 func (cl *Client) String() string {
1557 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1560 // Returns connection-level aggregate stats at the Client level. See the comment on
1561 // TorrentStats.ConnStats.
1562 func (cl *Client) ConnStats() ConnStats {
1563 return cl.stats.Copy()