18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/v2"
24 "github.com/anacrolix/missinggo/v2/bitmap"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "github.com/pion/datachannel"
31 "golang.org/x/time/rate"
33 "github.com/anacrolix/chansync"
35 "github.com/anacrolix/torrent/bencode"
36 "github.com/anacrolix/torrent/internal/limiter"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
42 "github.com/anacrolix/torrent/tracker"
43 "github.com/anacrolix/torrent/webtorrent"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
55 closed chansync.SetOnce
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
81 activeAnnounceLimiter limiter.Instance
83 updateRequests chansync.BroadcastCond
88 func (cl *Client) BadPeerIPs() (ips []string) {
90 ips = cl.badPeerIPsLocked()
95 func (cl *Client) badPeerIPsLocked() (ips []string) {
96 ips = make([]string, len(cl.badPeerIPs))
98 for k := range cl.badPeerIPs {
105 func (cl *Client) PeerID() PeerID {
109 // Returns the port number for the first listener that has one. No longer assumes that all port
110 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
112 func (cl *Client) LocalPort() (port int) {
113 cl.eachListener(func(l Listener) bool {
114 port = addrPortOrZero(l.Addr())
120 func writeDhtServerStatus(w io.Writer, s DhtServer) {
121 dhtStats := s.Stats()
122 fmt.Fprintf(w, " ID: %x\n", s.ID())
123 spew.Fdump(w, dhtStats)
126 // Writes out a human readable status of the client, such as for writing to a
128 func (cl *Client) WriteStatus(_w io.Writer) {
131 w := bufio.NewWriter(_w)
133 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
134 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
135 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
136 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
137 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
138 cl.eachDhtServer(func(s DhtServer) {
139 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
140 writeDhtServerStatus(w, s)
142 spew.Fdump(w, &cl.stats)
143 torrentsSlice := cl.torrentsAsSlice()
144 fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
146 sort.Slice(torrentsSlice, func(l, r int) bool {
147 return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
149 for _, t := range torrentsSlice {
151 fmt.Fprint(w, "<unknown name>")
153 fmt.Fprint(w, t.name())
159 "%f%% of %d bytes (%s)",
160 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
162 humanize.Bytes(uint64(*t.length)))
164 w.WriteString("<missing metainfo>")
172 // Filters things that are less than warning from UPnP discovery.
173 func upnpDiscoverLogFilter(m log.Msg) bool {
174 level, ok := m.GetLevel()
175 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
178 func (cl *Client) initLogger() {
179 logger := cl.config.Logger
182 if !cl.config.Debug {
183 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
186 cl.logger = logger.WithValues(cl)
189 func (cl *Client) announceKey() int32 {
190 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
193 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
195 cfg = NewDefaultClientConfig()
205 dopplegangerAddrs: make(map[string]struct{}),
206 torrents: make(map[metainfo.Hash]*Torrent),
207 dialRateLimiter: rate.NewLimiter(10, 10),
209 cl.activeAnnounceLimiter.SlotsPerKey = 2
210 go cl.acceptLimitClearer()
218 cl.event.L = cl.locker()
219 storageImpl := cfg.DefaultStorage
220 if storageImpl == nil {
221 // We'd use mmap by default but HFS+ doesn't support sparse files.
222 storageImplCloser := storage.NewFile(cfg.DataDir)
223 cl.onClose = append(cl.onClose, func() {
224 if err := storageImplCloser.Close(); err != nil {
225 cl.logger.Printf("error closing default storage: %s", err)
228 storageImpl = storageImplCloser
230 cl.defaultStorage = storage.NewClient(storageImpl)
231 if cfg.IPBlocklist != nil {
232 cl.ipBlockList = cfg.IPBlocklist
235 if cfg.PeerID != "" {
236 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
238 o := copy(cl.peerID[:], cfg.Bep20)
239 _, err = rand.Read(cl.peerID[o:])
241 panic("error generating peer id")
245 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
253 for _, _s := range sockets {
254 s := _s // Go is fucking retarded.
255 cl.onClose = append(cl.onClose, func() { s.Close() })
256 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
257 cl.dialers = append(cl.dialers, s)
258 cl.listeners = append(cl.listeners, s)
259 if cl.config.AcceptPeerConnections {
260 go cl.acceptConnections(s)
267 for _, s := range sockets {
268 if pc, ok := s.(net.PacketConn); ok {
269 ds, err := cl.NewAnacrolixDhtServer(pc)
273 cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
274 cl.onClose = append(cl.onClose, func() { ds.Close() })
279 cl.websocketTrackers = websocketTrackers{
282 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
285 t, ok := cl.torrents[infoHash]
287 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
289 return t.announceRequest(event), nil
291 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
294 t, ok := cl.torrents[dcc.InfoHash]
296 cl.logger.WithDefaultLevel(log.Warning).Printf(
297 "got webrtc conn for unloaded torrent with infohash %x",
303 go t.onWebRtcConn(dc, dcc)
312 func (cl *Client) AddDhtServer(d DhtServer) {
313 cl.dhtServers = append(cl.dhtServers, d)
316 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
317 // given address for any Torrent.
318 func (cl *Client) AddDialer(d Dialer) {
321 cl.dialers = append(cl.dialers, d)
322 for _, t := range cl.torrents {
327 func (cl *Client) Listeners() []Listener {
331 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
333 func (cl *Client) AddListener(l Listener) {
334 cl.listeners = append(cl.listeners, l)
335 if cl.config.AcceptPeerConnections {
336 go cl.acceptConnections(l)
340 func (cl *Client) firewallCallback(net.Addr) bool {
342 block := !cl.wantConns() || !cl.config.AcceptPeerConnections
345 torrent.Add("connections firewalled", 1)
347 torrent.Add("connections not firewalled", 1)
352 func (cl *Client) listenOnNetwork(n network) bool {
353 if n.Ipv4 && cl.config.DisableIPv4 {
356 if n.Ipv6 && cl.config.DisableIPv6 {
359 if n.Tcp && cl.config.DisableTCP {
362 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
368 func (cl *Client) listenNetworks() (ns []network) {
369 for _, n := range allPeerNetworks {
370 if cl.listenOnNetwork(n) {
377 // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
378 func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
379 cfg := dht.ServerConfig{
380 IPBlocklist: cl.ipBlockList,
382 OnAnnouncePeer: cl.onDHTAnnouncePeer,
383 PublicIP: func() net.IP {
384 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
385 return cl.config.PublicIp6
387 return cl.config.PublicIp4
389 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
390 OnQuery: cl.config.DHTOnQuery,
391 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
393 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
396 s, err = dht.NewServer(&cfg)
399 ts, err := s.Bootstrap()
401 cl.logger.Printf("error bootstrapping dht: %s", err)
403 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
409 func (cl *Client) Closed() chansync.Done {
410 return cl.closed.Done()
413 func (cl *Client) eachDhtServer(f func(DhtServer)) {
414 for _, ds := range cl.dhtServers {
419 // Stops the client. All connections to peers are closed and all activity will
421 func (cl *Client) Close() {
423 var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
426 for _, t := range cl.torrents {
430 closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
432 for i := range cl.onClose {
433 cl.onClose[len(cl.onClose)-1-i]()
438 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
439 if cl.ipBlockList == nil {
442 return cl.ipBlockList.Lookup(ip)
445 func (cl *Client) ipIsBlocked(ip net.IP) bool {
446 _, blocked := cl.ipBlockRange(ip)
450 func (cl *Client) wantConns() bool {
451 for _, t := range cl.torrents {
459 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
460 func (cl *Client) rejectAccepted(conn net.Conn) error {
462 return errors.New("don't want conns right now")
464 ra := conn.RemoteAddr()
465 if rip := addrIpOrNil(ra); rip != nil {
466 if cl.config.DisableIPv4Peers && rip.To4() != nil {
467 return errors.New("ipv4 peers disabled")
469 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
470 return errors.New("ipv4 disabled")
473 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
474 return errors.New("ipv6 disabled")
476 if cl.rateLimitAccept(rip) {
477 return errors.New("source IP accepted rate limited")
479 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
480 return errors.New("bad source addr")
486 func (cl *Client) acceptConnections(l Listener) {
488 conn, err := l.Accept()
489 torrent.Add("client listener accepts", 1)
490 conn = pproffd.WrapNetConn(conn)
492 closed := cl.closed.IsSet()
495 reject = cl.rejectAccepted(conn)
505 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
510 torrent.Add("rejected accepted connections", 1)
511 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
514 go cl.incomingConnection(conn)
516 log.Fmsg("accepted %q connection at %q from %q",
520 ).SetLevel(log.Debug).Log(cl.logger)
521 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
522 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
523 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
528 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
529 func regularNetConnPeerConnConnString(nc net.Conn) string {
530 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
533 func (cl *Client) incomingConnection(nc net.Conn) {
535 if tc, ok := nc.(*net.TCPConn); ok {
538 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
539 regularNetConnPeerConnConnString(nc))
545 c.Discovery = PeerSourceIncoming
546 cl.runReceivedConn(c)
549 // Returns a handle to the given torrent, if it's present in the client.
550 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
553 t, ok = cl.torrents[ih]
557 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
558 return cl.torrents[ih]
561 type DialResult struct {
566 func countDialResult(err error) {
568 torrent.Add("successful dials", 1)
570 torrent.Add("unsuccessful dials", 1)
574 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
575 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
576 if ret < minDialTimeout {
582 // Returns whether an address is known to connect to a client with our own ID.
583 func (cl *Client) dopplegangerAddr(addr string) bool {
584 _, ok := cl.dopplegangerAddrs[addr]
588 // Returns a connection over UTP or TCP, whichever is first to connect.
589 func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
590 return DialFirst(ctx, addr, cl.dialers)
593 // Returns a connection over UTP or TCP, whichever is first to connect.
594 func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
596 t := perf.NewTimer(perf.CallerName(0))
599 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
601 t.Mark("returned conn over " + res.Dialer.DialerNetwork())
605 ctx, cancel := context.WithCancel(ctx)
606 // As soon as we return one connection, cancel the others.
609 resCh := make(chan DialResult, left)
610 for _, _s := range dialers {
615 dialFromSocket(ctx, s, addr),
620 // Wait for a successful connection.
622 defer perf.ScopeTimer()()
623 for ; left > 0 && res.Conn == nil; left-- {
627 // There are still incompleted dials.
629 for ; left > 0; left-- {
630 conn := (<-resCh).Conn
637 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
642 func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
643 c, err := s.Dial(ctx, addr)
644 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
645 // it now in case we close the connection forthwith.
646 if tc, ok := c.(*net.TCPConn); ok {
653 func forgettableDialError(err error) bool {
654 return strings.Contains(err.Error(), "no suitable address found")
657 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
658 if _, ok := t.halfOpen[addr]; !ok {
659 panic("invariant broken")
661 delete(t.halfOpen, addr)
663 for _, t := range cl.torrents {
668 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
669 // for valid reasons.
670 func (cl *Client) initiateProtocolHandshakes(
674 outgoing, encryptHeader bool,
675 remoteAddr PeerRemoteAddr,
676 network, connString string,
678 c *PeerConn, err error,
680 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
681 c.headerEncrypted = encryptHeader
682 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
684 dl, ok := ctx.Deadline()
688 err = nc.SetDeadline(dl)
692 err = cl.initiateHandshakes(c, t)
696 // Returns nil connection and nil error if no connection could be established for valid reasons.
697 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
698 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
701 return t.dialTimeout()
704 dr := cl.dialFirst(dialCtx, addr.String())
707 if dialCtx.Err() != nil {
708 return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
710 return nil, errors.New("dial failed")
712 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
719 // Returns nil connection and nil error if no connection could be established
720 // for valid reasons.
721 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
722 torrent.Add("establish outgoing connection", 1)
723 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
724 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
726 torrent.Add("initiated conn with preferred header obfuscation", 1)
729 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
730 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
731 // We should have just tried with the preferred header obfuscation. If it was required,
732 // there's nothing else to try.
735 // Try again with encryption if we didn't earlier, or without if we did.
736 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
738 torrent.Add("initiated conn with fallback header obfuscation", 1)
740 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
744 // Called to dial out and run a connection. The addr we're given is already
745 // considered half-open.
746 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
747 cl.dialRateLimiter.Wait(context.Background())
748 c, err := cl.establishOutgoingConn(t, addr)
751 // Don't release lock between here and addPeerConn, unless it's for
753 cl.noLongerHalfOpen(t, addr.String())
756 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
763 t.runHandshookConnLoggingErr(c)
766 // The port number for incoming peer connections. 0 if the client isn't listening.
767 func (cl *Client) incomingPeerPort() int {
768 return cl.LocalPort()
771 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
772 if c.headerEncrypted {
775 rw, c.cryptoMethod, err = mse.InitiateHandshake(
782 cl.config.CryptoProvides,
786 return fmt.Errorf("header obfuscation handshake: %w", err)
789 ih, err := cl.connBtHandshake(c, &t.infoHash)
791 return fmt.Errorf("bittorrent protocol handshake: %w", err)
793 if ih != t.infoHash {
794 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
799 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
800 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
801 func (cl *Client) forSkeys(f func([]byte) bool) {
804 if false { // Emulate the bug from #114
806 for ih := range cl.torrents {
810 for range cl.torrents {
817 for ih := range cl.torrents {
824 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
825 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
831 // Do encryption and bittorrent handshakes as receiver.
832 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
833 defer perf.ScopeTimerErr(&err)()
835 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
837 if err == nil || err == mse.ErrNoSecretKeyMatch {
838 if c.headerEncrypted {
839 torrent.Add("handshakes received encrypted", 1)
841 torrent.Add("handshakes received unencrypted", 1)
844 torrent.Add("handshakes received with error while handling encryption", 1)
847 if err == mse.ErrNoSecretKeyMatch {
852 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
853 err = errors.New("connection does not have required header obfuscation")
856 ih, err := cl.connBtHandshake(c, nil)
858 return nil, fmt.Errorf("during bt handshake: %w", err)
866 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
867 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
872 c.PeerExtensionBytes = res.PeerExtensionBits
873 c.PeerID = res.PeerID
874 c.completedHandshake = time.Now()
875 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
881 func (cl *Client) runReceivedConn(c *PeerConn) {
882 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
886 t, err := cl.receiveHandshakes(c)
889 "error receiving handshakes on %v: %s", c, err,
890 ).SetLevel(log.Debug).
892 "network", c.Network,
894 torrent.Add("error receiving handshake", 1)
896 cl.onBadAccept(c.RemoteAddr)
901 torrent.Add("received handshake for unloaded torrent", 1)
902 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
904 cl.onBadAccept(c.RemoteAddr)
908 torrent.Add("received handshake for loaded torrent", 1)
911 t.runHandshookConnLoggingErr(c)
914 // Client lock must be held before entering this.
915 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
917 if c.PeerID == cl.peerID {
920 addr := c.conn.RemoteAddr().String()
921 cl.dopplegangerAddrs[addr] = struct{}{}
923 // Because the remote address is not necessarily the same as its client's torrent listen
924 // address, we won't record the remote address as a doppleganger. Instead, the initiator
925 // can record *us* as the doppleganger.
927 return errors.New("local and remote peer ids are the same")
929 c.conn.SetWriteDeadline(time.Time{})
930 c.r = deadlineReader{c.conn, c.r}
931 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
932 if connIsIpv6(c.conn) {
933 torrent.Add("completed handshake over ipv6", 1)
935 if err := t.addPeerConn(c); err != nil {
936 return fmt.Errorf("adding connection: %w", err)
938 defer t.dropConnection(c)
940 cl.sendInitialMessages(c, t)
941 err := c.mainReadLoop()
943 return fmt.Errorf("main read loop: %w", err)
948 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
949 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
950 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
951 const localClientReqq = 1 << 5
953 // See the order given in Transmission's tr_peerMsgsNew.
954 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
955 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
956 conn.write(pp.Message{
958 ExtendedID: pp.HandshakeExtendedID,
959 ExtendedPayload: func() []byte {
960 msg := pp.ExtendedHandshakeMessage{
961 M: map[pp.ExtensionName]pp.ExtensionNumber{
962 pp.ExtensionNameMetadata: metadataExtendedId,
964 V: cl.config.ExtendedHandshakeClientVersion,
965 Reqq: localClientReqq,
966 YourIp: pp.CompactIp(conn.remoteIp()),
967 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
968 Port: cl.incomingPeerPort(),
969 MetadataSize: torrent.metadataSize(),
970 // TODO: We can figured these out specific to the socket
972 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
973 Ipv6: cl.config.PublicIp6.To16(),
975 if !cl.config.DisablePEX {
976 msg.M[pp.ExtensionNamePex] = pexExtendedId
978 return bencode.MustMarshal(msg)
983 if conn.fastEnabled() {
984 if torrent.haveAllPieces() {
985 conn.write(pp.Message{Type: pp.HaveAll})
986 conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
988 } else if !torrent.haveAnyPieces() {
989 conn.write(pp.Message{Type: pp.HaveNone})
990 conn.sentHaves.Clear()
996 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
997 conn.write(pp.Message{
1004 func (cl *Client) dhtPort() (ret uint16) {
1005 if len(cl.dhtServers) == 0 {
1008 return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
1011 func (cl *Client) haveDhtServer() bool {
1012 return len(cl.dhtServers) > 0
1015 // Process incoming ut_metadata message.
1016 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1017 var d pp.ExtendedMetadataRequestMsg
1018 err := bencode.Unmarshal(payload, &d)
1019 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1020 } else if err != nil {
1021 return fmt.Errorf("error unmarshalling bencode: %s", err)
1025 case pp.DataMetadataExtensionMsgType:
1026 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1027 if !c.requestedMetadataPiece(piece) {
1028 return fmt.Errorf("got unexpected piece %d", piece)
1030 c.metadataRequests[piece] = false
1031 begin := len(payload) - d.PieceSize()
1032 if begin < 0 || begin >= len(payload) {
1033 return fmt.Errorf("data has bad offset in payload: %d", begin)
1035 t.saveMetadataPiece(piece, payload[begin:])
1036 c.lastUsefulChunkReceived = time.Now()
1037 err = t.maybeCompleteMetadata()
1039 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1040 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1041 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1042 // log consumers can filter for this message.
1043 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1046 case pp.RequestMetadataExtensionMsgType:
1047 if !t.haveMetadataPiece(piece) {
1048 c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
1051 start := (1 << 14) * piece
1052 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1053 c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1055 case pp.RejectMetadataExtensionMsgType:
1058 return errors.New("unknown msg_type value")
1062 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1063 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1064 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1069 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1073 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1076 if _, ok := cl.ipBlockRange(ip); ok {
1079 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1085 // Return a Torrent ready for insertion into a Client.
1086 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1087 // use provided storage, if provided
1088 storageClient := cl.defaultStorage
1089 if specStorage != nil {
1090 storageClient = storage.NewClient(specStorage)
1096 peers: prioritizedPeers{
1098 getPrio: func(p PeerInfo) peerPriority {
1100 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1103 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1105 halfOpen: make(map[string]PeerInfo),
1106 pieceStateChanges: pubsub.NewPubSub(),
1108 storageOpener: storageClient,
1109 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1111 metadataChanged: sync.Cond{
1114 webSeeds: make(map[string]*Peer),
1115 gotMetainfoC: make(chan struct{}),
1117 t.networkingEnabled.Set()
1118 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1119 t.logger = cl.logger.WithContextValue(t)
1120 t.setChunkSize(defaultChunkSize)
1124 // A file-like handle to some torrent data resource.
1125 type Handle interface {
1132 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1133 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1136 // Adds a torrent by InfoHash with a custom Storage implementation.
1137 // If the torrent already exists then this Storage is ignored and the
1138 // existing torrent returned with `new` set to `false`
1139 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1142 t, ok := cl.torrents[infoHash]
1148 t = cl.newTorrent(infoHash, specStorage)
1149 cl.eachDhtServer(func(s DhtServer) {
1150 if cl.config.PeriodicallyAnnounceTorrentsToDht {
1151 go t.dhtAnnouncer(s)
1154 cl.torrents[infoHash] = t
1155 cl.clearAcceptLimits()
1156 t.updateWantPeersEvent()
1157 // Tickle Client.waitAccept, new torrent may want conns.
1158 cl.event.Broadcast()
1162 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1163 // Torrent.MergeSpec.
1164 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1165 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1166 err = t.MergeSpec(spec)
1167 if err != nil && new {
1173 type stringAddr string
1175 var _ net.Addr = stringAddr("")
1177 func (stringAddr) Network() string { return "" }
1178 func (me stringAddr) String() string { return string(me) }
1180 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1181 // spec.DisallowDataDownload/Upload will be read and applied
1182 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1183 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1184 if spec.DisplayName != "" {
1185 t.SetDisplayName(spec.DisplayName)
1187 if spec.InfoBytes != nil {
1188 err := t.SetInfoBytes(spec.InfoBytes)
1194 cl.AddDhtNodes(spec.DhtNodes)
1197 useTorrentSources(spec.Sources, t)
1198 for _, url := range spec.Webseeds {
1201 for _, peerAddr := range spec.PeerAddrs {
1203 Addr: stringAddr(peerAddr),
1204 Source: PeerSourceDirect,
1208 if spec.ChunkSize != 0 {
1209 t.setChunkSize(pp.Integer(spec.ChunkSize))
1211 t.addTrackers(spec.Trackers)
1213 t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
1214 t.dataUploadDisallowed = spec.DisallowDataUpload
1218 func useTorrentSources(sources []string, t *Torrent) {
1219 // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes
1220 ctx := context.Background()
1221 for i := 0; i < len(sources); i += 1 {
1224 if err := useTorrentSource(ctx, s, t); err != nil {
1225 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1227 t.logger.Printf("successfully used source %q", s)
1233 func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) {
1234 ctx, cancel := context.WithCancel(ctx)
1244 var req *http.Request
1245 if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil {
1248 var resp *http.Response
1249 if resp, err = http.DefaultClient.Do(req); err != nil {
1252 var mi metainfo.MetaInfo
1253 err = bencode.NewDecoder(resp.Body).Decode(&mi)
1256 if ctx.Err() != nil {
1261 return t.MergeSpec(TorrentSpecFromMetaInfo(&mi))
1264 func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
1265 t, ok := cl.torrents[infoHash]
1267 err = fmt.Errorf("no such torrent")
1274 delete(cl.torrents, infoHash)
1278 func (cl *Client) allTorrentsCompleted() bool {
1279 for _, t := range cl.torrents {
1283 if !t.haveAllPieces() {
1290 // Returns true when all torrents are completely downloaded and false if the
1291 // client is stopped before that.
1292 func (cl *Client) WaitAll() bool {
1295 for !cl.allTorrentsCompleted() {
1296 if cl.closed.IsSet() {
1304 // Returns handles to all the torrents loaded in the Client.
1305 func (cl *Client) Torrents() []*Torrent {
1308 return cl.torrentsAsSlice()
1311 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1312 for _, t := range cl.torrents {
1313 ret = append(ret, t)
1318 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1319 spec, err := TorrentSpecFromMagnetUri(uri)
1323 T, _, err = cl.AddTorrentSpec(spec)
1327 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1328 ts, err := TorrentSpecFromMetaInfoErr(mi)
1332 T, _, err = cl.AddTorrentSpec(ts)
1336 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1337 mi, err := metainfo.LoadFromFile(filename)
1341 return cl.AddTorrent(mi)
1344 func (cl *Client) DhtServers() []DhtServer {
1345 return cl.dhtServers
1348 func (cl *Client) AddDhtNodes(nodes []string) {
1349 for _, n := range nodes {
1350 hmp := missinggo.SplitHostMaybePort(n)
1351 ip := net.ParseIP(hmp.Host)
1353 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1356 ni := krpc.NodeInfo{
1357 Addr: krpc.NodeAddr{
1362 cl.eachDhtServer(func(s DhtServer) {
1368 func (cl *Client) banPeerIP(ip net.IP) {
1369 cl.logger.Printf("banning ip %v", ip)
1370 if cl.badPeerIPs == nil {
1371 cl.badPeerIPs = make(map[string]struct{})
1373 cl.badPeerIPs[ip.String()] = struct{}{}
1376 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1385 PeerMaxRequests: 250,
1387 RemoteAddr: remoteAddr,
1389 callbacks: &cl.config.Callbacks,
1391 connString: connString,
1395 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1396 c.setRW(connStatsReadWriter{nc, c})
1397 c.r = &rateLimitedReader{
1398 l: cl.config.DownloadRateLimiter,
1401 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1402 for _, f := range cl.config.Callbacks.NewPeer {
1408 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1415 t.addPeers([]PeerInfo{{
1416 Addr: ipPortAddr{ip, port},
1417 Source: PeerSourceDhtAnnouncePeer,
1421 func firstNotNil(ips ...net.IP) net.IP {
1422 for _, ip := range ips {
1430 func (cl *Client) eachListener(f func(Listener) bool) {
1431 for _, s := range cl.listeners {
1438 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1439 cl.eachListener(func(l Listener) bool {
1446 func (cl *Client) publicIp(peer net.IP) net.IP {
1447 // TODO: Use BEP 10 to determine how peers are seeing us.
1448 if peer.To4() != nil {
1450 cl.config.PublicIp4,
1451 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1456 cl.config.PublicIp6,
1457 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1461 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1462 l := cl.findListener(
1463 func(l Listener) bool {
1464 return f(addrIpOrNil(l.Addr()))
1470 return addrIpOrNil(l.Addr())
1473 // Our IP as a peer should see it.
1474 func (cl *Client) publicAddr(peer net.IP) IpPort {
1475 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1478 // ListenAddrs addresses currently being listened to.
1479 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1482 cl.eachListener(func(l Listener) bool {
1483 ret = append(ret, l.Addr())
1489 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1490 ipa, ok := tryIpPortFromNetAddr(addr)
1494 ip := maskIpForAcceptLimiting(ipa.IP)
1495 if cl.acceptLimiter == nil {
1496 cl.acceptLimiter = make(map[ipStr]int)
1498 cl.acceptLimiter[ipStr(ip.String())]++
1501 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1502 if ip4 := ip.To4(); ip4 != nil {
1503 return ip4.Mask(net.CIDRMask(24, 32))
1508 func (cl *Client) clearAcceptLimits() {
1509 cl.acceptLimiter = nil
1512 func (cl *Client) acceptLimitClearer() {
1515 case <-cl.closed.Done():
1517 case <-time.After(15 * time.Minute):
1519 cl.clearAcceptLimits()
1525 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1526 if cl.config.DisableAcceptRateLimiting {
1529 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1532 func (cl *Client) rLock() {
1536 func (cl *Client) rUnlock() {
1540 func (cl *Client) lock() {
1544 func (cl *Client) unlock() {
1548 func (cl *Client) locker() *lockWithDeferreds {
1552 func (cl *Client) String() string {
1553 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1556 // Returns connection-level aggregate stats at the Client level. See the comment on
1557 // TorrentStats.ConnStats.
1558 func (cl *Client) ConnStats() ConnStats {
1559 return cl.stats.Copy()