18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/tracker"
28 "github.com/anacrolix/torrent/webtorrent"
29 "github.com/davecgh/go-spew/spew"
30 "github.com/dustin/go-humanize"
31 "github.com/google/btree"
32 "github.com/pion/datachannel"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/missinggo/v2"
37 "github.com/anacrolix/missinggo/v2/conntrack"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure
51 // 64-bit alignment of fields. See #262.
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
84 func (cl *Client) BadPeerIPs() []string {
87 return cl.badPeerIPsLocked()
90 func (cl *Client) badPeerIPsLocked() []string {
91 return slices.FromMapKeys(cl.badPeerIPs).([]string)
94 func (cl *Client) PeerID() PeerID {
98 // Returns the port number for the first listener that has one. No longer assumes that all port
99 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
101 func (cl *Client) LocalPort() (port int) {
102 cl.eachListener(func(l Listener) bool {
103 port = addrPortOrZero(l.Addr())
109 func writeDhtServerStatus(w io.Writer, s DhtServer) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, " ID: %x\n", s.ID())
112 spew.Fdump(w, dhtStats)
115 // Writes out a human readable status of the client, such as for writing to a
117 func (cl *Client) WriteStatus(_w io.Writer) {
120 w := bufio.NewWriter(_w)
122 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
123 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
124 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
125 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
126 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
127 cl.eachDhtServer(func(s DhtServer) {
128 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
129 writeDhtServerStatus(w, s)
131 spew.Fdump(w, &cl.stats)
132 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135 return l.InfoHash().AsString() < r.InfoHash().AsString()
138 fmt.Fprint(w, "<unknown name>")
140 fmt.Fprint(w, t.name())
146 "%f%% of %d bytes (%s)",
147 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
149 humanize.Bytes(uint64(*t.length)))
151 w.WriteString("<missing metainfo>")
159 func (cl *Client) initLogger() {
160 cl.logger = cl.config.Logger.WithValues(cl)
161 if !cl.config.Debug {
162 cl.logger = cl.logger.FilterLevel(log.Info)
166 func (cl *Client) announceKey() int32 {
167 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
170 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
172 cfg = NewDefaultClientConfig()
182 dopplegangerAddrs: make(map[string]struct{}),
183 torrents: make(map[metainfo.Hash]*Torrent),
184 dialRateLimiter: rate.NewLimiter(10, 10),
186 go cl.acceptLimitClearer()
194 cl.event.L = cl.locker()
195 storageImpl := cfg.DefaultStorage
196 if storageImpl == nil {
197 // We'd use mmap by default but HFS+ doesn't support sparse files.
198 storageImplCloser := storage.NewFile(cfg.DataDir)
199 cl.onClose = append(cl.onClose, func() {
200 if err := storageImplCloser.Close(); err != nil {
201 cl.logger.Printf("error closing default storage: %s", err)
204 storageImpl = storageImplCloser
206 cl.defaultStorage = storage.NewClient(storageImpl)
207 if cfg.IPBlocklist != nil {
208 cl.ipBlockList = cfg.IPBlocklist
211 if cfg.PeerID != "" {
212 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
214 o := copy(cl.peerID[:], cfg.Bep20)
215 _, err = rand.Read(cl.peerID[o:])
217 panic("error generating peer id")
221 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
229 for _, _s := range sockets {
230 s := _s // Go is fucking retarded.
231 cl.onClose = append(cl.onClose, func() { s.Close() })
232 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
233 cl.dialers = append(cl.dialers, s)
234 cl.listeners = append(cl.listeners, s)
235 go cl.acceptConnections(s)
241 for _, s := range sockets {
242 if pc, ok := s.(net.PacketConn); ok {
243 ds, err := cl.newAnacrolixDhtServer(pc)
247 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
248 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 cl.websocketTrackers = websocketTrackers{
256 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
259 t, ok := cl.torrents[infoHash]
261 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
263 return t.announceRequest(event), nil
265 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
268 t, ok := cl.torrents[dcc.InfoHash]
270 cl.logger.WithDefaultLevel(log.Warning).Printf(
271 "got webrtc conn for unloaded torrent with infohash %x",
277 go t.onWebRtcConn(dc, dcc)
284 func (cl *Client) AddDhtServer(d DhtServer) {
285 cl.dhtServers = append(cl.dhtServers, d)
288 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
289 // given address for any Torrent.
290 func (cl *Client) AddDialer(d Dialer) {
293 cl.dialers = append(cl.dialers, d)
294 for _, t := range cl.torrents {
299 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
301 func (cl *Client) AddListener(l Listener) {
302 cl.listeners = append(cl.listeners, l)
303 go cl.acceptConnections(l)
306 func (cl *Client) firewallCallback(net.Addr) bool {
308 block := !cl.wantConns()
311 torrent.Add("connections firewalled", 1)
313 torrent.Add("connections not firewalled", 1)
318 func (cl *Client) listenOnNetwork(n network) bool {
319 if n.Ipv4 && cl.config.DisableIPv4 {
322 if n.Ipv6 && cl.config.DisableIPv6 {
325 if n.Tcp && cl.config.DisableTCP {
328 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
334 func (cl *Client) listenNetworks() (ns []network) {
335 for _, n := range allPeerNetworks {
336 if cl.listenOnNetwork(n) {
343 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
344 cfg := dht.ServerConfig{
345 IPBlocklist: cl.ipBlockList,
347 OnAnnouncePeer: cl.onDHTAnnouncePeer,
348 PublicIP: func() net.IP {
349 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
350 return cl.config.PublicIp6
352 return cl.config.PublicIp4
354 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
355 ConnectionTracking: cl.config.ConnTracker,
356 OnQuery: cl.config.DHTOnQuery,
357 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
359 s, err = dht.NewServer(&cfg)
362 ts, err := s.Bootstrap()
364 cl.logger.Printf("error bootstrapping dht: %s", err)
366 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
372 func (cl *Client) Closed() <-chan struct{} {
378 func (cl *Client) eachDhtServer(f func(DhtServer)) {
379 for _, ds := range cl.dhtServers {
384 // Stops the client. All connections to peers are closed and all activity will
386 func (cl *Client) Close() {
390 for _, t := range cl.torrents {
393 for i := range cl.onClose {
394 cl.onClose[len(cl.onClose)-1-i]()
399 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
400 if cl.ipBlockList == nil {
403 return cl.ipBlockList.Lookup(ip)
406 func (cl *Client) ipIsBlocked(ip net.IP) bool {
407 _, blocked := cl.ipBlockRange(ip)
411 func (cl *Client) wantConns() bool {
412 for _, t := range cl.torrents {
420 func (cl *Client) waitAccept() {
422 if cl.closed.IsSet() {
432 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
433 func (cl *Client) rejectAccepted(conn net.Conn) error {
434 ra := conn.RemoteAddr()
435 if rip := addrIpOrNil(ra); rip != nil {
436 if cl.config.DisableIPv4Peers && rip.To4() != nil {
437 return errors.New("ipv4 peers disabled")
439 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
440 return errors.New("ipv4 disabled")
443 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
444 return errors.New("ipv6 disabled")
446 if cl.rateLimitAccept(rip) {
447 return errors.New("source IP accepted rate limited")
449 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
450 return errors.New("bad source addr")
456 func (cl *Client) acceptConnections(l net.Listener) {
458 conn, err := l.Accept()
459 torrent.Add("client listener accepts", 1)
460 conn = pproffd.WrapNetConn(conn)
462 closed := cl.closed.IsSet()
465 reject = cl.rejectAccepted(conn)
475 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
480 torrent.Add("rejected accepted connections", 1)
481 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
484 go cl.incomingConnection(conn)
486 log.Fmsg("accepted %q connection at %q from %q",
490 ).SetLevel(log.Debug).Log(cl.logger)
491 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
492 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
493 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
498 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
499 func regularNetConnPeerConnConnString(nc net.Conn) string {
500 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
503 func (cl *Client) incomingConnection(nc net.Conn) {
505 if tc, ok := nc.(*net.TCPConn); ok {
508 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
509 regularNetConnPeerConnConnString(nc))
510 c.Discovery = PeerSourceIncoming
511 cl.runReceivedConn(c)
514 // Returns a handle to the given torrent, if it's present in the client.
515 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
518 t, ok = cl.torrents[ih]
522 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
523 return cl.torrents[ih]
526 type dialResult struct {
531 func countDialResult(err error) {
533 torrent.Add("successful dials", 1)
535 torrent.Add("unsuccessful dials", 1)
539 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
540 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
541 if ret < minDialTimeout {
547 // Returns whether an address is known to connect to a client with our own ID.
548 func (cl *Client) dopplegangerAddr(addr string) bool {
549 _, ok := cl.dopplegangerAddrs[addr]
553 // Returns a connection over UTP or TCP, whichever is first to connect.
554 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
556 t := perf.NewTimer(perf.CallerName(0))
559 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
561 t.Mark("returned conn over " + res.Network)
565 ctx, cancel := context.WithCancel(ctx)
566 // As soon as we return one connection, cancel the others.
569 resCh := make(chan dialResult, left)
573 cl.eachDialer(func(s Dialer) bool {
576 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
579 cl.dialFromSocket(ctx, s, addr),
580 s.LocalAddr().Network(),
587 // Wait for a successful connection.
589 defer perf.ScopeTimer()()
590 for ; left > 0 && res.Conn == nil; left-- {
594 // There are still incompleted dials.
596 for ; left > 0; left-- {
597 conn := (<-resCh).Conn
604 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
606 //if res.Conn != nil {
607 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
609 // cl.logger.Printf("failed to dial %s", addr)
614 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
615 network := s.LocalAddr().Network()
616 cte := cl.config.ConnTracker.Wait(
618 conntrack.Entry{network, s.LocalAddr().String(), addr},
619 "dial torrent client",
622 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
623 // which dial errors allow us to forget the connection tracking entry handle.
624 if ctx.Err() != nil {
630 c, err := s.Dial(ctx, addr)
631 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
632 // it now in case we close the connection forthwith.
633 if tc, ok := c.(*net.TCPConn); ok {
638 if err != nil && forgettableDialError(err) {
645 return closeWrapper{c, func() error {
652 func forgettableDialError(err error) bool {
653 return strings.Contains(err.Error(), "no suitable address found")
656 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
657 if _, ok := t.halfOpen[addr]; !ok {
658 panic("invariant broken")
660 delete(t.halfOpen, addr)
664 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
665 // for valid reasons.
666 func (cl *Client) initiateProtocolHandshakes(
670 outgoing, encryptHeader bool,
672 network, connString string,
674 c *PeerConn, err error,
676 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
677 c.headerEncrypted = encryptHeader
678 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
680 dl, ok := ctx.Deadline()
684 err = nc.SetDeadline(dl)
688 err = cl.initiateHandshakes(c, t)
692 // Returns nil connection and nil error if no connection could be established for valid reasons.
693 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
694 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
697 return t.dialTimeout()
700 dr := cl.dialFirst(dialCtx, addr.String())
703 if dialCtx.Err() != nil {
704 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
706 return nil, errors.New("dial failed")
708 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
715 // Returns nil connection and nil error if no connection could be established
716 // for valid reasons.
717 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
718 torrent.Add("establish outgoing connection", 1)
719 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
720 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
722 torrent.Add("initiated conn with preferred header obfuscation", 1)
725 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
726 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
727 // We should have just tried with the preferred header obfuscation. If it was required,
728 // there's nothing else to try.
731 // Try again with encryption if we didn't earlier, or without if we did.
732 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with fallback header obfuscation", 1)
736 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
740 // Called to dial out and run a connection. The addr we're given is already
741 // considered half-open.
742 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
743 cl.dialRateLimiter.Wait(context.Background())
744 c, err := cl.establishOutgoingConn(t, addr)
747 // Don't release lock between here and addConnection, unless it's for
749 cl.noLongerHalfOpen(t, addr.String())
752 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
759 t.runHandshookConnLoggingErr(c)
762 // The port number for incoming peer connections. 0 if the client isn't listening.
763 func (cl *Client) incomingPeerPort() int {
764 return cl.LocalPort()
767 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
768 if c.headerEncrypted {
771 rw, c.cryptoMethod, err = mse.InitiateHandshake(
778 cl.config.CryptoProvides,
782 return xerrors.Errorf("header obfuscation handshake: %w", err)
785 ih, err := cl.connBtHandshake(c, &t.infoHash)
787 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
789 if ih != t.infoHash {
790 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
795 // Calls f with any secret keys.
796 func (cl *Client) forSkeys(f func([]byte) bool) {
799 if false { // Emulate the bug from #114
801 for ih := range cl.torrents {
805 for range cl.torrents {
812 for ih := range cl.torrents {
819 // Do encryption and bittorrent handshakes as receiver.
820 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
821 defer perf.ScopeTimerErr(&err)()
823 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
825 if err == nil || err == mse.ErrNoSecretKeyMatch {
826 if c.headerEncrypted {
827 torrent.Add("handshakes received encrypted", 1)
829 torrent.Add("handshakes received unencrypted", 1)
832 torrent.Add("handshakes received with error while handling encryption", 1)
835 if err == mse.ErrNoSecretKeyMatch {
840 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
841 err = errors.New("connection not have required header obfuscation")
844 ih, err := cl.connBtHandshake(c, nil)
846 err = xerrors.Errorf("during bt handshake: %w", err)
855 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
856 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
861 c.PeerExtensionBytes = res.PeerExtensionBits
862 c.PeerID = res.PeerID
863 c.completedHandshake = time.Now()
864 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
870 func (cl *Client) runReceivedConn(c *PeerConn) {
871 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
875 t, err := cl.receiveHandshakes(c)
878 "error receiving handshakes on %v: %s", c, err,
879 ).SetLevel(log.Debug).
881 "network", c.network,
883 torrent.Add("error receiving handshake", 1)
885 cl.onBadAccept(c.RemoteAddr)
890 torrent.Add("received handshake for unloaded torrent", 1)
891 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
893 cl.onBadAccept(c.RemoteAddr)
897 torrent.Add("received handshake for loaded torrent", 1)
900 t.runHandshookConnLoggingErr(c)
903 // Client lock must be held before entering this.
904 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
906 if c.PeerID == cl.peerID {
909 addr := c.conn.RemoteAddr().String()
910 cl.dopplegangerAddrs[addr] = struct{}{}
912 // Because the remote address is not necessarily the same as its client's torrent listen
913 // address, we won't record the remote address as a doppleganger. Instead, the initiator
914 // can record *us* as the doppleganger.
916 return errors.New("local and remote peer ids are the same")
918 c.conn.SetWriteDeadline(time.Time{})
919 c.r = deadlineReader{c.conn, c.r}
920 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
921 if connIsIpv6(c.conn) {
922 torrent.Add("completed handshake over ipv6", 1)
924 if err := t.addConnection(c); err != nil {
925 return fmt.Errorf("adding connection: %w", err)
927 defer t.dropConnection(c)
928 go c.writer(time.Minute)
929 cl.sendInitialMessages(c, t)
930 err := c.mainReadLoop()
932 return fmt.Errorf("main read loop: %w", err)
937 // See the order given in Transmission's tr_peerMsgsNew.
938 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
939 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
940 conn.post(pp.Message{
942 ExtendedID: pp.HandshakeExtendedID,
943 ExtendedPayload: func() []byte {
944 msg := pp.ExtendedHandshakeMessage{
945 M: map[pp.ExtensionName]pp.ExtensionNumber{
946 pp.ExtensionNameMetadata: metadataExtendedId,
948 V: cl.config.ExtendedHandshakeClientVersion,
949 Reqq: 64, // TODO: Really?
950 YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
951 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
952 Port: cl.incomingPeerPort(),
953 MetadataSize: torrent.metadataSize(),
954 // TODO: We can figured these out specific to the socket
956 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
957 Ipv6: cl.config.PublicIp6.To16(),
959 if !cl.config.DisablePEX {
960 msg.M[pp.ExtensionNamePex] = pexExtendedId
962 return bencode.MustMarshal(msg)
967 if conn.fastEnabled() {
968 if torrent.haveAllPieces() {
969 conn.post(pp.Message{Type: pp.HaveAll})
970 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
972 } else if !torrent.haveAnyPieces() {
973 conn.post(pp.Message{Type: pp.HaveNone})
974 conn.sentHaves.Clear()
980 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
981 conn.post(pp.Message{
988 func (cl *Client) dhtPort() (ret uint16) {
989 cl.eachDhtServer(func(s DhtServer) {
990 ret = uint16(missinggo.AddrPort(s.Addr()))
995 func (cl *Client) haveDhtServer() (ret bool) {
996 cl.eachDhtServer(func(_ DhtServer) {
1002 // Process incoming ut_metadata message.
1003 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1004 var d map[string]int
1005 err := bencode.Unmarshal(payload, &d)
1006 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1007 } else if err != nil {
1008 return fmt.Errorf("error unmarshalling bencode: %s", err)
1010 msgType, ok := d["msg_type"]
1012 return errors.New("missing msg_type field")
1016 case pp.DataMetadataExtensionMsgType:
1017 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1018 if !c.requestedMetadataPiece(piece) {
1019 return fmt.Errorf("got unexpected piece %d", piece)
1021 c.metadataRequests[piece] = false
1022 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1023 if begin < 0 || begin >= len(payload) {
1024 return fmt.Errorf("data has bad offset in payload: %d", begin)
1026 t.saveMetadataPiece(piece, payload[begin:])
1027 c.lastUsefulChunkReceived = time.Now()
1028 return t.maybeCompleteMetadata()
1029 case pp.RequestMetadataExtensionMsgType:
1030 if !t.haveMetadataPiece(piece) {
1031 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1034 start := (1 << 14) * piece
1035 c.logger.Printf("sending metadata piece %d", piece)
1036 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1038 case pp.RejectMetadataExtensionMsgType:
1041 return errors.New("unknown msg_type value")
1045 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1046 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1047 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1052 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1056 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1059 if _, ok := cl.ipBlockRange(ip); ok {
1062 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1068 // Return a Torrent ready for insertion into a Client.
1069 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1070 // use provided storage, if provided
1071 storageClient := cl.defaultStorage
1072 if specStorage != nil {
1073 storageClient = storage.NewClient(specStorage)
1079 peers: prioritizedPeers{
1081 getPrio: func(p PeerInfo) peerPriority {
1082 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1085 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1087 halfOpen: make(map[string]PeerInfo),
1088 pieceStateChanges: pubsub.NewPubSub(),
1090 storageOpener: storageClient,
1091 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1093 networkingEnabled: true,
1094 metadataChanged: sync.Cond{
1097 webSeeds: make(map[string]*peer),
1099 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1100 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1101 t.logger = cl.logger.WithContextValue(t)
1102 t.setChunkSize(defaultChunkSize)
1106 // A file-like handle to some torrent data resource.
1107 type Handle interface {
1114 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1115 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1118 // Adds a torrent by InfoHash with a custom Storage implementation.
1119 // If the torrent already exists then this Storage is ignored and the
1120 // existing torrent returned with `new` set to `false`
1121 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1124 t, ok := cl.torrents[infoHash]
1130 t = cl.newTorrent(infoHash, specStorage)
1131 cl.eachDhtServer(func(s DhtServer) {
1132 go t.dhtAnnouncer(s)
1134 cl.torrents[infoHash] = t
1135 cl.clearAcceptLimits()
1136 t.updateWantPeersEvent()
1137 // Tickle Client.waitAccept, new torrent may want conns.
1138 cl.event.Broadcast()
1142 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1143 // Torrent.MergeSpec.
1144 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1145 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1146 err = t.MergeSpec(spec)
1150 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1151 // spec.DisallowDataDownload/Upload will be read and applied
1152 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1153 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1154 if spec.DisplayName != "" {
1155 t.SetDisplayName(spec.DisplayName)
1157 if spec.InfoBytes != nil {
1158 err := t.SetInfoBytes(spec.InfoBytes)
1164 cl.AddDHTNodes(spec.DhtNodes)
1167 useTorrentSources(spec.Sources, t)
1168 for _, url := range spec.Webseeds {
1171 if spec.ChunkSize != 0 {
1172 t.setChunkSize(pp.Integer(spec.ChunkSize))
1174 t.addTrackers(spec.Trackers)
1176 t.dataDownloadDisallowed = spec.DisallowDataDownload
1177 t.dataUploadDisallowed = spec.DisallowDataUpload
1181 func useTorrentSources(sources []string, t *Torrent) {
1182 for _, s := range sources {
1184 err := useTorrentSource(s, t)
1186 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1188 t.logger.Printf("successfully used source %q", s)
1194 func useTorrentSource(source string, t *Torrent) error {
1195 req, err := http.NewRequest(http.MethodGet, source, nil)
1199 ctx, cancel := context.WithCancel(context.Background())
1209 req = req.WithContext(ctx)
1210 resp, err := http.DefaultClient.Do(req)
1214 mi, err := metainfo.Load(resp.Body)
1216 if ctx.Err() != nil {
1221 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1224 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1225 t, ok := cl.torrents[infoHash]
1227 err = fmt.Errorf("no such torrent")
1234 delete(cl.torrents, infoHash)
1238 func (cl *Client) allTorrentsCompleted() bool {
1239 for _, t := range cl.torrents {
1243 if !t.haveAllPieces() {
1250 // Returns true when all torrents are completely downloaded and false if the
1251 // client is stopped before that.
1252 func (cl *Client) WaitAll() bool {
1255 for !cl.allTorrentsCompleted() {
1256 if cl.closed.IsSet() {
1264 // Returns handles to all the torrents loaded in the Client.
1265 func (cl *Client) Torrents() []*Torrent {
1268 return cl.torrentsAsSlice()
1271 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1272 for _, t := range cl.torrents {
1273 ret = append(ret, t)
1278 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1279 spec, err := TorrentSpecFromMagnetURI(uri)
1283 T, _, err = cl.AddTorrentSpec(spec)
1287 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1288 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1292 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1293 mi, err := metainfo.LoadFromFile(filename)
1297 return cl.AddTorrent(mi)
1300 func (cl *Client) DhtServers() []DhtServer {
1301 return cl.dhtServers
1304 func (cl *Client) AddDHTNodes(nodes []string) {
1305 for _, n := range nodes {
1306 hmp := missinggo.SplitHostMaybePort(n)
1307 ip := net.ParseIP(hmp.Host)
1309 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1312 ni := krpc.NodeInfo{
1313 Addr: krpc.NodeAddr{
1318 cl.eachDhtServer(func(s DhtServer) {
1324 func (cl *Client) banPeerIP(ip net.IP) {
1325 cl.logger.Printf("banning ip %v", ip)
1326 if cl.badPeerIPs == nil {
1327 cl.badPeerIPs = make(map[string]struct{})
1329 cl.badPeerIPs[ip.String()] = struct{}{}
1332 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1338 PeerMaxRequests: 250,
1340 RemoteAddr: remoteAddr,
1343 connString: connString,
1345 writeBuffer: new(bytes.Buffer),
1348 c.logger = cl.logger.WithDefaultLevel(log.Debug).WithContextValue(c)
1349 c.writerCond.L = cl.locker()
1350 c.setRW(connStatsReadWriter{nc, c})
1351 c.r = &rateLimitedReader{
1352 l: cl.config.DownloadRateLimiter,
1355 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1359 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1366 t.addPeers([]PeerInfo{{
1367 Addr: ipPortAddr{ip, port},
1368 Source: PeerSourceDhtAnnouncePeer,
1372 func firstNotNil(ips ...net.IP) net.IP {
1373 for _, ip := range ips {
1381 func (cl *Client) eachDialer(f func(Dialer) bool) {
1382 for _, s := range cl.dialers {
1389 func (cl *Client) eachListener(f func(Listener) bool) {
1390 for _, s := range cl.listeners {
1397 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1398 cl.eachListener(func(l Listener) bool {
1405 func (cl *Client) publicIp(peer net.IP) net.IP {
1406 // TODO: Use BEP 10 to determine how peers are seeing us.
1407 if peer.To4() != nil {
1409 cl.config.PublicIp4,
1410 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1415 cl.config.PublicIp6,
1416 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1420 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1421 l := cl.findListener(
1422 func(l net.Listener) bool {
1423 return f(addrIpOrNil(l.Addr()))
1429 return addrIpOrNil(l.Addr())
1432 // Our IP as a peer should see it.
1433 func (cl *Client) publicAddr(peer net.IP) IpPort {
1434 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1437 // ListenAddrs addresses currently being listened to.
1438 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1441 cl.eachListener(func(l Listener) bool {
1442 ret = append(ret, l.Addr())
1448 func (cl *Client) onBadAccept(addr net.Addr) {
1449 ipa, ok := tryIpPortFromNetAddr(addr)
1453 ip := maskIpForAcceptLimiting(ipa.IP)
1454 if cl.acceptLimiter == nil {
1455 cl.acceptLimiter = make(map[ipStr]int)
1457 cl.acceptLimiter[ipStr(ip.String())]++
1460 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1461 if ip4 := ip.To4(); ip4 != nil {
1462 return ip4.Mask(net.CIDRMask(24, 32))
1467 func (cl *Client) clearAcceptLimits() {
1468 cl.acceptLimiter = nil
1471 func (cl *Client) acceptLimitClearer() {
1474 case <-cl.closed.LockedChan(cl.locker()):
1476 case <-time.After(15 * time.Minute):
1478 cl.clearAcceptLimits()
1484 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1485 if cl.config.DisableAcceptRateLimiting {
1488 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1491 func (cl *Client) rLock() {
1495 func (cl *Client) rUnlock() {
1499 func (cl *Client) lock() {
1503 func (cl *Client) unlock() {
1507 func (cl *Client) locker() *lockWithDeferreds {
1511 func (cl *Client) String() string {
1512 return fmt.Sprintf("<%[1]T %[1]p>", cl)