18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/tracker"
28 "github.com/anacrolix/torrent/webtorrent"
29 "github.com/davecgh/go-spew/spew"
30 "github.com/dustin/go-humanize"
31 "github.com/google/btree"
32 "github.com/pion/datachannel"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/missinggo/v2"
37 "github.com/anacrolix/missinggo/v2/conntrack"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure
51 // 64-bit alignment of fields. See #262.
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
80 websocketTrackers websocketTrackers
85 func (cl *Client) BadPeerIPs() []string {
88 return cl.badPeerIPsLocked()
91 func (cl *Client) badPeerIPsLocked() []string {
92 return slices.FromMapKeys(cl.badPeerIPs).([]string)
95 func (cl *Client) PeerID() PeerID {
99 // Returns the port number for the first listener that has one. No longer assumes that all port
100 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
102 func (cl *Client) LocalPort() (port int) {
103 cl.eachListener(func(l Listener) bool {
104 port = addrPortOrZero(l.Addr())
110 func writeDhtServerStatus(w io.Writer, s DhtServer) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, " ID: %x\n", s.ID())
113 spew.Fdump(w, dhtStats)
116 // Writes out a human readable status of the client, such as for writing to a
118 func (cl *Client) WriteStatus(_w io.Writer) {
121 w := bufio.NewWriter(_w)
123 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
124 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s DhtServer) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, &cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
147 "%f%% of %d bytes (%s)",
148 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
150 humanize.Bytes(uint64(*t.length)))
152 w.WriteString("<missing metainfo>")
160 func (cl *Client) initLogger() {
161 cl.logger = cl.config.Logger.WithValues(cl)
162 if !cl.config.Debug {
163 cl.logger = cl.logger.FilterLevel(log.Info)
167 func (cl *Client) announceKey() int32 {
168 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
171 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
173 cfg = NewDefaultClientConfig()
183 dopplegangerAddrs: make(map[string]struct{}),
184 torrents: make(map[metainfo.Hash]*Torrent),
185 dialRateLimiter: rate.NewLimiter(10, 10),
187 go cl.acceptLimitClearer()
195 cl.event.L = cl.locker()
196 storageImpl := cfg.DefaultStorage
197 if storageImpl == nil {
198 // We'd use mmap by default but HFS+ doesn't support sparse files.
199 storageImplCloser := storage.NewFile(cfg.DataDir)
200 cl.onClose = append(cl.onClose, func() {
201 if err := storageImplCloser.Close(); err != nil {
202 cl.logger.Printf("error closing default storage: %s", err)
205 storageImpl = storageImplCloser
207 cl.defaultStorage = storage.NewClient(storageImpl)
208 if cfg.IPBlocklist != nil {
209 cl.ipBlockList = cfg.IPBlocklist
212 if cfg.PeerID != "" {
213 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
215 o := copy(cl.peerID[:], cfg.Bep20)
216 _, err = rand.Read(cl.peerID[o:])
218 panic("error generating peer id")
222 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
230 for _, _s := range sockets {
231 s := _s // Go is fucking retarded.
232 cl.onClose = append(cl.onClose, func() { s.Close() })
233 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
234 cl.dialers = append(cl.dialers, s)
235 cl.listeners = append(cl.listeners, s)
236 go cl.acceptConnections(s)
242 for _, s := range sockets {
243 if pc, ok := s.(net.PacketConn); ok {
244 ds, err := cl.newAnacrolixDhtServer(pc)
248 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
249 cl.onClose = append(cl.onClose, func() { ds.Close() })
254 cl.websocketTrackers = websocketTrackers{
257 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
260 t, ok := cl.torrents[infoHash]
262 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
264 return t.announceRequest(event), nil
266 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
269 t, ok := cl.torrents[dcc.InfoHash]
271 cl.logger.WithDefaultLevel(log.Warning).Printf(
272 "got webrtc conn for unloaded torrent with infohash %x",
278 go t.onWebRtcConn(dc, dcc)
285 func (cl *Client) AddDhtServer(d DhtServer) {
286 cl.dhtServers = append(cl.dhtServers, d)
289 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
290 // given address for any Torrent.
291 func (cl *Client) AddDialer(d Dialer) {
294 cl.dialers = append(cl.dialers, d)
295 for _, t := range cl.torrents {
300 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
302 func (cl *Client) AddListener(l Listener) {
303 cl.listeners = append(cl.listeners, l)
304 go cl.acceptConnections(l)
307 func (cl *Client) firewallCallback(net.Addr) bool {
309 block := !cl.wantConns()
312 torrent.Add("connections firewalled", 1)
314 torrent.Add("connections not firewalled", 1)
319 func (cl *Client) listenOnNetwork(n network) bool {
320 if n.Ipv4 && cl.config.DisableIPv4 {
323 if n.Ipv6 && cl.config.DisableIPv6 {
326 if n.Tcp && cl.config.DisableTCP {
329 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
335 func (cl *Client) listenNetworks() (ns []network) {
336 for _, n := range allPeerNetworks {
337 if cl.listenOnNetwork(n) {
344 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
345 cfg := dht.ServerConfig{
346 IPBlocklist: cl.ipBlockList,
348 OnAnnouncePeer: cl.onDHTAnnouncePeer,
349 PublicIP: func() net.IP {
350 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
351 return cl.config.PublicIp6
353 return cl.config.PublicIp4
355 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
356 ConnectionTracking: cl.config.ConnTracker,
357 OnQuery: cl.config.DHTOnQuery,
358 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
360 s, err = dht.NewServer(&cfg)
363 ts, err := s.Bootstrap()
365 cl.logger.Printf("error bootstrapping dht: %s", err)
367 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
373 func (cl *Client) Closed() <-chan struct{} {
379 func (cl *Client) eachDhtServer(f func(DhtServer)) {
380 for _, ds := range cl.dhtServers {
385 // Stops the client. All connections to peers are closed and all activity will
387 func (cl *Client) Close() {
391 for _, t := range cl.torrents {
394 for i := range cl.onClose {
395 cl.onClose[len(cl.onClose)-1-i]()
400 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
401 if cl.ipBlockList == nil {
404 return cl.ipBlockList.Lookup(ip)
407 func (cl *Client) ipIsBlocked(ip net.IP) bool {
408 _, blocked := cl.ipBlockRange(ip)
412 func (cl *Client) wantConns() bool {
413 for _, t := range cl.torrents {
421 func (cl *Client) waitAccept() {
423 if cl.closed.IsSet() {
433 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
434 func (cl *Client) rejectAccepted(conn net.Conn) error {
435 ra := conn.RemoteAddr()
436 if rip := addrIpOrNil(ra); rip != nil {
437 if cl.config.DisableIPv4Peers && rip.To4() != nil {
438 return errors.New("ipv4 peers disabled")
440 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
441 return errors.New("ipv4 disabled")
444 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
445 return errors.New("ipv6 disabled")
447 if cl.rateLimitAccept(rip) {
448 return errors.New("source IP accepted rate limited")
450 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
451 return errors.New("bad source addr")
457 func (cl *Client) acceptConnections(l net.Listener) {
459 conn, err := l.Accept()
460 torrent.Add("client listener accepts", 1)
461 conn = pproffd.WrapNetConn(conn)
463 closed := cl.closed.IsSet()
466 reject = cl.rejectAccepted(conn)
476 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
481 torrent.Add("rejected accepted connections", 1)
482 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
485 go cl.incomingConnection(conn)
487 log.Fmsg("accepted %q connection at %q from %q",
491 ).SetLevel(log.Debug).Log(cl.logger)
492 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
493 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
494 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
499 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
500 func regularNetConnPeerConnConnString(nc net.Conn) string {
501 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
504 func (cl *Client) incomingConnection(nc net.Conn) {
506 if tc, ok := nc.(*net.TCPConn); ok {
509 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
510 regularNetConnPeerConnConnString(nc))
511 c.Discovery = PeerSourceIncoming
512 cl.runReceivedConn(c)
515 // Returns a handle to the given torrent, if it's present in the client.
516 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
519 t, ok = cl.torrents[ih]
523 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
524 return cl.torrents[ih]
527 type dialResult struct {
532 func countDialResult(err error) {
534 torrent.Add("successful dials", 1)
536 torrent.Add("unsuccessful dials", 1)
540 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
541 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
542 if ret < minDialTimeout {
548 // Returns whether an address is known to connect to a client with our own ID.
549 func (cl *Client) dopplegangerAddr(addr string) bool {
550 _, ok := cl.dopplegangerAddrs[addr]
554 // Returns a connection over UTP or TCP, whichever is first to connect.
555 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
557 t := perf.NewTimer(perf.CallerName(0))
560 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
562 t.Mark("returned conn over " + res.Network)
566 ctx, cancel := context.WithCancel(ctx)
567 // As soon as we return one connection, cancel the others.
570 resCh := make(chan dialResult, left)
574 cl.eachDialer(func(s Dialer) bool {
577 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
580 cl.dialFromSocket(ctx, s, addr),
581 s.LocalAddr().Network(),
588 // Wait for a successful connection.
590 defer perf.ScopeTimer()()
591 for ; left > 0 && res.Conn == nil; left-- {
595 // There are still incompleted dials.
597 for ; left > 0; left-- {
598 conn := (<-resCh).Conn
605 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
607 //if res.Conn != nil {
608 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
610 // cl.logger.Printf("failed to dial %s", addr)
615 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
616 network := s.LocalAddr().Network()
617 cte := cl.config.ConnTracker.Wait(
619 conntrack.Entry{network, s.LocalAddr().String(), addr},
620 "dial torrent client",
623 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
624 // which dial errors allow us to forget the connection tracking entry handle.
625 if ctx.Err() != nil {
631 c, err := s.Dial(ctx, addr)
632 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
633 // it now in case we close the connection forthwith.
634 if tc, ok := c.(*net.TCPConn); ok {
639 if err != nil && forgettableDialError(err) {
646 return closeWrapper{c, func() error {
653 func forgettableDialError(err error) bool {
654 return strings.Contains(err.Error(), "no suitable address found")
657 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
658 if _, ok := t.halfOpen[addr]; !ok {
659 panic("invariant broken")
661 delete(t.halfOpen, addr)
663 for _, t := range cl.torrents {
668 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
669 // for valid reasons.
670 func (cl *Client) initiateProtocolHandshakes(
674 outgoing, encryptHeader bool,
676 network, connString string,
678 c *PeerConn, err error,
680 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
681 c.headerEncrypted = encryptHeader
682 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
684 dl, ok := ctx.Deadline()
688 err = nc.SetDeadline(dl)
692 err = cl.initiateHandshakes(c, t)
696 // Returns nil connection and nil error if no connection could be established for valid reasons.
697 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
698 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
701 return t.dialTimeout()
704 dr := cl.dialFirst(dialCtx, addr.String())
707 if dialCtx.Err() != nil {
708 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
710 return nil, errors.New("dial failed")
712 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
719 // Returns nil connection and nil error if no connection could be established
720 // for valid reasons.
721 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
722 torrent.Add("establish outgoing connection", 1)
723 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
724 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
726 torrent.Add("initiated conn with preferred header obfuscation", 1)
729 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
730 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
731 // We should have just tried with the preferred header obfuscation. If it was required,
732 // there's nothing else to try.
735 // Try again with encryption if we didn't earlier, or without if we did.
736 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
738 torrent.Add("initiated conn with fallback header obfuscation", 1)
740 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
744 // Called to dial out and run a connection. The addr we're given is already
745 // considered half-open.
746 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
747 cl.dialRateLimiter.Wait(context.Background())
748 c, err := cl.establishOutgoingConn(t, addr)
751 // Don't release lock between here and addConnection, unless it's for
753 cl.noLongerHalfOpen(t, addr.String())
756 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
763 t.runHandshookConnLoggingErr(c)
766 // The port number for incoming peer connections. 0 if the client isn't listening.
767 func (cl *Client) incomingPeerPort() int {
768 return cl.LocalPort()
771 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
772 if c.headerEncrypted {
775 rw, c.cryptoMethod, err = mse.InitiateHandshake(
782 cl.config.CryptoProvides,
786 return xerrors.Errorf("header obfuscation handshake: %w", err)
789 ih, err := cl.connBtHandshake(c, &t.infoHash)
791 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
793 if ih != t.infoHash {
794 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
799 // Calls f with any secret keys.
800 func (cl *Client) forSkeys(f func([]byte) bool) {
803 if false { // Emulate the bug from #114
805 for ih := range cl.torrents {
809 for range cl.torrents {
816 for ih := range cl.torrents {
823 // Do encryption and bittorrent handshakes as receiver.
824 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
825 defer perf.ScopeTimerErr(&err)()
827 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
829 if err == nil || err == mse.ErrNoSecretKeyMatch {
830 if c.headerEncrypted {
831 torrent.Add("handshakes received encrypted", 1)
833 torrent.Add("handshakes received unencrypted", 1)
836 torrent.Add("handshakes received with error while handling encryption", 1)
839 if err == mse.ErrNoSecretKeyMatch {
844 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
845 err = errors.New("connection not have required header obfuscation")
848 ih, err := cl.connBtHandshake(c, nil)
850 err = xerrors.Errorf("during bt handshake: %w", err)
859 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
860 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
865 c.PeerExtensionBytes = res.PeerExtensionBits
866 c.PeerID = res.PeerID
867 c.completedHandshake = time.Now()
868 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
874 func (cl *Client) runReceivedConn(c *PeerConn) {
875 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
879 t, err := cl.receiveHandshakes(c)
882 "error receiving handshakes on %v: %s", c, err,
883 ).SetLevel(log.Debug).
885 "network", c.network,
887 torrent.Add("error receiving handshake", 1)
889 cl.onBadAccept(c.RemoteAddr)
894 torrent.Add("received handshake for unloaded torrent", 1)
895 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
897 cl.onBadAccept(c.RemoteAddr)
901 torrent.Add("received handshake for loaded torrent", 1)
904 t.runHandshookConnLoggingErr(c)
907 // Client lock must be held before entering this.
908 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
910 if c.PeerID == cl.peerID {
913 addr := c.conn.RemoteAddr().String()
914 cl.dopplegangerAddrs[addr] = struct{}{}
916 // Because the remote address is not necessarily the same as its client's torrent listen
917 // address, we won't record the remote address as a doppleganger. Instead, the initiator
918 // can record *us* as the doppleganger.
920 return errors.New("local and remote peer ids are the same")
922 c.conn.SetWriteDeadline(time.Time{})
923 c.r = deadlineReader{c.conn, c.r}
924 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
925 if connIsIpv6(c.conn) {
926 torrent.Add("completed handshake over ipv6", 1)
928 if err := t.addConnection(c); err != nil {
929 return fmt.Errorf("adding connection: %w", err)
931 defer t.dropConnection(c)
932 go c.writer(time.Minute)
933 cl.sendInitialMessages(c, t)
934 err := c.mainReadLoop()
936 return fmt.Errorf("main read loop: %w", err)
941 // See the order given in Transmission's tr_peerMsgsNew.
942 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
943 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
944 conn.post(pp.Message{
946 ExtendedID: pp.HandshakeExtendedID,
947 ExtendedPayload: func() []byte {
948 msg := pp.ExtendedHandshakeMessage{
949 M: map[pp.ExtensionName]pp.ExtensionNumber{
950 pp.ExtensionNameMetadata: metadataExtendedId,
952 V: cl.config.ExtendedHandshakeClientVersion,
953 Reqq: 64, // TODO: Really?
954 YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
955 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
956 Port: cl.incomingPeerPort(),
957 MetadataSize: torrent.metadataSize(),
958 // TODO: We can figured these out specific to the socket
960 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
961 Ipv6: cl.config.PublicIp6.To16(),
963 if !cl.config.DisablePEX {
964 msg.M[pp.ExtensionNamePex] = pexExtendedId
966 return bencode.MustMarshal(msg)
971 if conn.fastEnabled() {
972 if torrent.haveAllPieces() {
973 conn.post(pp.Message{Type: pp.HaveAll})
974 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
976 } else if !torrent.haveAnyPieces() {
977 conn.post(pp.Message{Type: pp.HaveNone})
978 conn.sentHaves.Clear()
984 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
985 conn.post(pp.Message{
992 func (cl *Client) dhtPort() (ret uint16) {
993 cl.eachDhtServer(func(s DhtServer) {
994 ret = uint16(missinggo.AddrPort(s.Addr()))
999 func (cl *Client) haveDhtServer() (ret bool) {
1000 cl.eachDhtServer(func(_ DhtServer) {
1006 // Process incoming ut_metadata message.
1007 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1008 var d map[string]int
1009 err := bencode.Unmarshal(payload, &d)
1010 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1011 } else if err != nil {
1012 return fmt.Errorf("error unmarshalling bencode: %s", err)
1014 msgType, ok := d["msg_type"]
1016 return errors.New("missing msg_type field")
1020 case pp.DataMetadataExtensionMsgType:
1021 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1022 if !c.requestedMetadataPiece(piece) {
1023 return fmt.Errorf("got unexpected piece %d", piece)
1025 c.metadataRequests[piece] = false
1026 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1027 if begin < 0 || begin >= len(payload) {
1028 return fmt.Errorf("data has bad offset in payload: %d", begin)
1030 t.saveMetadataPiece(piece, payload[begin:])
1031 c.lastUsefulChunkReceived = time.Now()
1032 return t.maybeCompleteMetadata()
1033 case pp.RequestMetadataExtensionMsgType:
1034 if !t.haveMetadataPiece(piece) {
1035 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1038 start := (1 << 14) * piece
1039 c.logger.Printf("sending metadata piece %d", piece)
1040 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1042 case pp.RejectMetadataExtensionMsgType:
1045 return errors.New("unknown msg_type value")
1049 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1050 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1051 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1056 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1060 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1063 if _, ok := cl.ipBlockRange(ip); ok {
1066 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1072 // Return a Torrent ready for insertion into a Client.
1073 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1074 // use provided storage, if provided
1075 storageClient := cl.defaultStorage
1076 if specStorage != nil {
1077 storageClient = storage.NewClient(specStorage)
1083 peers: prioritizedPeers{
1085 getPrio: func(p PeerInfo) peerPriority {
1086 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1089 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1091 halfOpen: make(map[string]PeerInfo),
1092 pieceStateChanges: pubsub.NewPubSub(),
1094 storageOpener: storageClient,
1095 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1097 networkingEnabled: true,
1098 metadataChanged: sync.Cond{
1101 webSeeds: make(map[string]*peer),
1103 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1104 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1105 t.logger = cl.logger.WithContextValue(t)
1106 t.setChunkSize(defaultChunkSize)
1110 // A file-like handle to some torrent data resource.
1111 type Handle interface {
1118 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1119 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1122 // Adds a torrent by InfoHash with a custom Storage implementation.
1123 // If the torrent already exists then this Storage is ignored and the
1124 // existing torrent returned with `new` set to `false`
1125 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1128 t, ok := cl.torrents[infoHash]
1134 t = cl.newTorrent(infoHash, specStorage)
1135 cl.eachDhtServer(func(s DhtServer) {
1136 go t.dhtAnnouncer(s)
1138 cl.torrents[infoHash] = t
1139 cl.clearAcceptLimits()
1140 t.updateWantPeersEvent()
1141 // Tickle Client.waitAccept, new torrent may want conns.
1142 cl.event.Broadcast()
1146 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1147 // Torrent.MergeSpec.
1148 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1149 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1150 err = t.MergeSpec(spec)
1154 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1155 // spec.DisallowDataDownload/Upload will be read and applied
1156 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1157 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1158 if spec.DisplayName != "" {
1159 t.SetDisplayName(spec.DisplayName)
1161 if spec.InfoBytes != nil {
1162 err := t.SetInfoBytes(spec.InfoBytes)
1168 cl.AddDHTNodes(spec.DhtNodes)
1171 useTorrentSources(spec.Sources, t)
1172 for _, url := range spec.Webseeds {
1175 if spec.ChunkSize != 0 {
1176 t.setChunkSize(pp.Integer(spec.ChunkSize))
1178 t.addTrackers(spec.Trackers)
1180 t.dataDownloadDisallowed = spec.DisallowDataDownload
1181 t.dataUploadDisallowed = spec.DisallowDataUpload
1185 func useTorrentSources(sources []string, t *Torrent) {
1186 for _, s := range sources {
1188 err := useTorrentSource(s, t)
1190 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1192 t.logger.Printf("successfully used source %q", s)
1198 func useTorrentSource(source string, t *Torrent) error {
1199 req, err := http.NewRequest(http.MethodGet, source, nil)
1203 ctx, cancel := context.WithCancel(context.Background())
1213 req = req.WithContext(ctx)
1214 resp, err := http.DefaultClient.Do(req)
1218 mi, err := metainfo.Load(resp.Body)
1220 if ctx.Err() != nil {
1225 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1228 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1229 t, ok := cl.torrents[infoHash]
1231 err = fmt.Errorf("no such torrent")
1238 delete(cl.torrents, infoHash)
1242 func (cl *Client) allTorrentsCompleted() bool {
1243 for _, t := range cl.torrents {
1247 if !t.haveAllPieces() {
1254 // Returns true when all torrents are completely downloaded and false if the
1255 // client is stopped before that.
1256 func (cl *Client) WaitAll() bool {
1259 for !cl.allTorrentsCompleted() {
1260 if cl.closed.IsSet() {
1268 // Returns handles to all the torrents loaded in the Client.
1269 func (cl *Client) Torrents() []*Torrent {
1272 return cl.torrentsAsSlice()
1275 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1276 for _, t := range cl.torrents {
1277 ret = append(ret, t)
1282 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1283 spec, err := TorrentSpecFromMagnetURI(uri)
1287 T, _, err = cl.AddTorrentSpec(spec)
1291 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1292 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1296 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1297 mi, err := metainfo.LoadFromFile(filename)
1301 return cl.AddTorrent(mi)
1304 func (cl *Client) DhtServers() []DhtServer {
1305 return cl.dhtServers
1308 func (cl *Client) AddDHTNodes(nodes []string) {
1309 for _, n := range nodes {
1310 hmp := missinggo.SplitHostMaybePort(n)
1311 ip := net.ParseIP(hmp.Host)
1313 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1316 ni := krpc.NodeInfo{
1317 Addr: krpc.NodeAddr{
1322 cl.eachDhtServer(func(s DhtServer) {
1328 func (cl *Client) banPeerIP(ip net.IP) {
1329 cl.logger.Printf("banning ip %v", ip)
1330 if cl.badPeerIPs == nil {
1331 cl.badPeerIPs = make(map[string]struct{})
1333 cl.badPeerIPs[ip.String()] = struct{}{}
1336 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1342 PeerMaxRequests: 250,
1344 RemoteAddr: remoteAddr,
1347 connString: connString,
1349 writeBuffer: new(bytes.Buffer),
1352 c.logger = cl.logger.WithDefaultLevel(log.Debug).WithContextValue(c)
1353 c.writerCond.L = cl.locker()
1354 c.setRW(connStatsReadWriter{nc, c})
1355 c.r = &rateLimitedReader{
1356 l: cl.config.DownloadRateLimiter,
1359 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1363 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1370 t.addPeers([]PeerInfo{{
1371 Addr: ipPortAddr{ip, port},
1372 Source: PeerSourceDhtAnnouncePeer,
1376 func firstNotNil(ips ...net.IP) net.IP {
1377 for _, ip := range ips {
1385 func (cl *Client) eachDialer(f func(Dialer) bool) {
1386 for _, s := range cl.dialers {
1393 func (cl *Client) eachListener(f func(Listener) bool) {
1394 for _, s := range cl.listeners {
1401 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1402 cl.eachListener(func(l Listener) bool {
1409 func (cl *Client) publicIp(peer net.IP) net.IP {
1410 // TODO: Use BEP 10 to determine how peers are seeing us.
1411 if peer.To4() != nil {
1413 cl.config.PublicIp4,
1414 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1419 cl.config.PublicIp6,
1420 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1424 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1425 l := cl.findListener(
1426 func(l net.Listener) bool {
1427 return f(addrIpOrNil(l.Addr()))
1433 return addrIpOrNil(l.Addr())
1436 // Our IP as a peer should see it.
1437 func (cl *Client) publicAddr(peer net.IP) IpPort {
1438 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1441 // ListenAddrs addresses currently being listened to.
1442 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1445 cl.eachListener(func(l Listener) bool {
1446 ret = append(ret, l.Addr())
1452 func (cl *Client) onBadAccept(addr net.Addr) {
1453 ipa, ok := tryIpPortFromNetAddr(addr)
1457 ip := maskIpForAcceptLimiting(ipa.IP)
1458 if cl.acceptLimiter == nil {
1459 cl.acceptLimiter = make(map[ipStr]int)
1461 cl.acceptLimiter[ipStr(ip.String())]++
1464 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1465 if ip4 := ip.To4(); ip4 != nil {
1466 return ip4.Mask(net.CIDRMask(24, 32))
1471 func (cl *Client) clearAcceptLimits() {
1472 cl.acceptLimiter = nil
1475 func (cl *Client) acceptLimitClearer() {
1478 case <-cl.closed.LockedChan(cl.locker()):
1480 case <-time.After(15 * time.Minute):
1482 cl.clearAcceptLimits()
1488 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1489 if cl.config.DisableAcceptRateLimiting {
1492 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1495 func (cl *Client) rLock() {
1499 func (cl *Client) rUnlock() {
1503 func (cl *Client) lock() {
1507 func (cl *Client) unlock() {
1511 func (cl *Client) locker() *lockWithDeferreds {
1515 func (cl *Client) String() string {
1516 return fmt.Sprintf("<%[1]T %[1]p>", cl)