18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/tracker"
28 "github.com/anacrolix/torrent/webtorrent"
29 "github.com/davecgh/go-spew/spew"
30 "github.com/dustin/go-humanize"
31 "github.com/google/btree"
32 "github.com/pion/datachannel"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/missinggo/v2"
37 "github.com/anacrolix/missinggo/v2/conntrack"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure
51 // 64-bit alignment of fields. See #262.
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
84 func (cl *Client) BadPeerIPs() []string {
87 return cl.badPeerIPsLocked()
90 func (cl *Client) badPeerIPsLocked() []string {
91 return slices.FromMapKeys(cl.badPeerIPs).([]string)
94 func (cl *Client) PeerID() PeerID {
98 // Returns the port number for the first listener that has one. No longer assumes that all port
99 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
101 func (cl *Client) LocalPort() (port int) {
102 cl.eachListener(func(l Listener) bool {
103 port = addrPortOrZero(l.Addr())
109 func writeDhtServerStatus(w io.Writer, s DhtServer) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, " ID: %x\n", s.ID())
112 spew.Fdump(w, dhtStats)
115 // Writes out a human readable status of the client, such as for writing to a
117 func (cl *Client) WriteStatus(_w io.Writer) {
120 w := bufio.NewWriter(_w)
122 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
123 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
124 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
125 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
126 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
127 cl.eachDhtServer(func(s DhtServer) {
128 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
129 writeDhtServerStatus(w, s)
131 spew.Fdump(w, &cl.stats)
132 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135 return l.InfoHash().AsString() < r.InfoHash().AsString()
138 fmt.Fprint(w, "<unknown name>")
140 fmt.Fprint(w, t.name())
146 "%f%% of %d bytes (%s)",
147 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
149 humanize.Bytes(uint64(*t.length)))
151 w.WriteString("<missing metainfo>")
159 func (cl *Client) initLogger() {
160 cl.logger = cl.config.Logger.WithValues(cl)
161 if !cl.config.Debug {
162 cl.logger = cl.logger.FilterLevel(log.Info)
166 func (cl *Client) announceKey() int32 {
167 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
170 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
172 cfg = NewDefaultClientConfig()
182 dopplegangerAddrs: make(map[string]struct{}),
183 torrents: make(map[metainfo.Hash]*Torrent),
184 dialRateLimiter: rate.NewLimiter(10, 10),
186 go cl.acceptLimitClearer()
194 cl.event.L = cl.locker()
195 storageImpl := cfg.DefaultStorage
196 if storageImpl == nil {
197 // We'd use mmap by default but HFS+ doesn't support sparse files.
198 storageImplCloser := storage.NewFile(cfg.DataDir)
199 cl.onClose = append(cl.onClose, func() {
200 if err := storageImplCloser.Close(); err != nil {
201 cl.logger.Printf("error closing default storage: %s", err)
204 storageImpl = storageImplCloser
206 cl.defaultStorage = storage.NewClient(storageImpl)
207 if cfg.IPBlocklist != nil {
208 cl.ipBlockList = cfg.IPBlocklist
211 if cfg.PeerID != "" {
212 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
214 o := copy(cl.peerID[:], cfg.Bep20)
215 _, err = rand.Read(cl.peerID[o:])
217 panic("error generating peer id")
221 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
229 for _, _s := range sockets {
230 s := _s // Go is fucking retarded.
231 cl.onClose = append(cl.onClose, func() { s.Close() })
232 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
233 cl.dialers = append(cl.dialers, s)
234 cl.listeners = append(cl.listeners, s)
235 go cl.acceptConnections(s)
241 for _, s := range sockets {
242 if pc, ok := s.(net.PacketConn); ok {
243 ds, err := cl.newAnacrolixDhtServer(pc)
247 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
248 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 cl.websocketTrackers = websocketTrackers{
256 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
259 t, ok := cl.torrents[infoHash]
261 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
263 return t.announceRequest(event), nil
265 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
268 t, ok := cl.torrents[dcc.InfoHash]
270 cl.logger.WithDefaultLevel(log.Warning).Printf(
271 "got webrtc conn for unloaded torrent with infohash %x",
277 go t.onWebRtcConn(dc, dcc)
284 func (cl *Client) AddDhtServer(d DhtServer) {
285 cl.dhtServers = append(cl.dhtServers, d)
288 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
289 // given address for any Torrent.
290 func (cl *Client) AddDialer(d Dialer) {
293 cl.dialers = append(cl.dialers, d)
294 for _, t := range cl.torrents {
299 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
301 func (cl *Client) AddListener(l Listener) {
302 cl.listeners = append(cl.listeners, l)
303 go cl.acceptConnections(l)
306 func (cl *Client) firewallCallback(net.Addr) bool {
308 block := !cl.wantConns()
311 torrent.Add("connections firewalled", 1)
313 torrent.Add("connections not firewalled", 1)
318 func (cl *Client) listenOnNetwork(n network) bool {
319 if n.Ipv4 && cl.config.DisableIPv4 {
322 if n.Ipv6 && cl.config.DisableIPv6 {
325 if n.Tcp && cl.config.DisableTCP {
328 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
334 func (cl *Client) listenNetworks() (ns []network) {
335 for _, n := range allPeerNetworks {
336 if cl.listenOnNetwork(n) {
343 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
344 cfg := dht.ServerConfig{
345 IPBlocklist: cl.ipBlockList,
347 OnAnnouncePeer: cl.onDHTAnnouncePeer,
348 PublicIP: func() net.IP {
349 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
350 return cl.config.PublicIp6
352 return cl.config.PublicIp4
354 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
355 ConnectionTracking: cl.config.ConnTracker,
356 OnQuery: cl.config.DHTOnQuery,
357 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
359 s, err = dht.NewServer(&cfg)
362 ts, err := s.Bootstrap()
364 cl.logger.Printf("error bootstrapping dht: %s", err)
366 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
372 func (cl *Client) Closed() <-chan struct{} {
378 func (cl *Client) eachDhtServer(f func(DhtServer)) {
379 for _, ds := range cl.dhtServers {
384 // Stops the client. All connections to peers are closed and all activity will
386 func (cl *Client) Close() {
390 for _, t := range cl.torrents {
393 for i := range cl.onClose {
394 cl.onClose[len(cl.onClose)-1-i]()
399 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
400 if cl.ipBlockList == nil {
403 return cl.ipBlockList.Lookup(ip)
406 func (cl *Client) ipIsBlocked(ip net.IP) bool {
407 _, blocked := cl.ipBlockRange(ip)
411 func (cl *Client) wantConns() bool {
412 for _, t := range cl.torrents {
420 func (cl *Client) waitAccept() {
422 if cl.closed.IsSet() {
432 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
433 func (cl *Client) rejectAccepted(conn net.Conn) error {
434 ra := conn.RemoteAddr()
435 if rip := addrIpOrNil(ra); rip != nil {
436 if cl.config.DisableIPv4Peers && rip.To4() != nil {
437 return errors.New("ipv4 peers disabled")
439 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
440 return errors.New("ipv4 disabled")
443 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
444 return errors.New("ipv6 disabled")
446 if cl.rateLimitAccept(rip) {
447 return errors.New("source IP accepted rate limited")
449 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
450 return errors.New("bad source addr")
456 func (cl *Client) acceptConnections(l net.Listener) {
458 conn, err := l.Accept()
459 torrent.Add("client listener accepts", 1)
460 conn = pproffd.WrapNetConn(conn)
462 closed := cl.closed.IsSet()
465 reject = cl.rejectAccepted(conn)
475 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
480 torrent.Add("rejected accepted connections", 1)
481 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
484 go cl.incomingConnection(conn)
486 log.Fmsg("accepted %q connection at %q from %q",
490 ).SetLevel(log.Debug).Log(cl.logger)
491 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
492 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
493 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
498 func regularConnString(nc net.Conn) string {
499 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
502 func (cl *Client) incomingConnection(nc net.Conn) {
504 if tc, ok := nc.(*net.TCPConn); ok {
507 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
508 regularConnString(nc))
509 c.Discovery = PeerSourceIncoming
510 cl.runReceivedConn(c)
513 // Returns a handle to the given torrent, if it's present in the client.
514 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
517 t, ok = cl.torrents[ih]
521 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
522 return cl.torrents[ih]
525 type dialResult struct {
530 func countDialResult(err error) {
532 torrent.Add("successful dials", 1)
534 torrent.Add("unsuccessful dials", 1)
538 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
539 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
540 if ret < minDialTimeout {
546 // Returns whether an address is known to connect to a client with our own ID.
547 func (cl *Client) dopplegangerAddr(addr string) bool {
548 _, ok := cl.dopplegangerAddrs[addr]
552 // Returns a connection over UTP or TCP, whichever is first to connect.
553 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
555 t := perf.NewTimer(perf.CallerName(0))
558 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
560 t.Mark("returned conn over " + res.Network)
564 ctx, cancel := context.WithCancel(ctx)
565 // As soon as we return one connection, cancel the others.
568 resCh := make(chan dialResult, left)
572 cl.eachDialer(func(s Dialer) bool {
575 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
578 cl.dialFromSocket(ctx, s, addr),
579 s.LocalAddr().Network(),
586 // Wait for a successful connection.
588 defer perf.ScopeTimer()()
589 for ; left > 0 && res.Conn == nil; left-- {
593 // There are still incompleted dials.
595 for ; left > 0; left-- {
596 conn := (<-resCh).Conn
603 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
605 //if res.Conn != nil {
606 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
608 // cl.logger.Printf("failed to dial %s", addr)
613 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
614 network := s.LocalAddr().Network()
615 cte := cl.config.ConnTracker.Wait(
617 conntrack.Entry{network, s.LocalAddr().String(), addr},
618 "dial torrent client",
621 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
622 // which dial errors allow us to forget the connection tracking entry handle.
623 if ctx.Err() != nil {
629 c, err := s.Dial(ctx, addr)
630 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
631 // it now in case we close the connection forthwith.
632 if tc, ok := c.(*net.TCPConn); ok {
637 if err != nil && forgettableDialError(err) {
644 return closeWrapper{c, func() error {
651 func forgettableDialError(err error) bool {
652 return strings.Contains(err.Error(), "no suitable address found")
655 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
656 if _, ok := t.halfOpen[addr]; !ok {
657 panic("invariant broken")
659 delete(t.halfOpen, addr)
663 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
664 // for valid reasons.
665 func (cl *Client) initiateProtocolHandshakes(
669 outgoing, encryptHeader bool,
671 network, connString string,
673 c *PeerConn, err error,
675 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
676 c.headerEncrypted = encryptHeader
677 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
679 dl, ok := ctx.Deadline()
683 err = nc.SetDeadline(dl)
687 err = cl.initiateHandshakes(c, t)
691 // Returns nil connection and nil error if no connection could be established for valid reasons.
692 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
693 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
696 return t.dialTimeout()
699 dr := cl.dialFirst(dialCtx, addr.String())
702 if dialCtx.Err() != nil {
703 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
705 return nil, errors.New("dial failed")
707 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
714 // Returns nil connection and nil error if no connection could be established
715 // for valid reasons.
716 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
717 torrent.Add("establish outgoing connection", 1)
718 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
719 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
721 torrent.Add("initiated conn with preferred header obfuscation", 1)
724 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
725 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
726 // We should have just tried with the preferred header obfuscation. If it was required,
727 // there's nothing else to try.
730 // Try again with encryption if we didn't earlier, or without if we did.
731 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
733 torrent.Add("initiated conn with fallback header obfuscation", 1)
735 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
739 // Called to dial out and run a connection. The addr we're given is already
740 // considered half-open.
741 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
742 cl.dialRateLimiter.Wait(context.Background())
743 c, err := cl.establishOutgoingConn(t, addr)
746 // Don't release lock between here and addConnection, unless it's for
748 cl.noLongerHalfOpen(t, addr.String())
751 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
758 t.runHandshookConnLoggingErr(c)
761 // The port number for incoming peer connections. 0 if the client isn't listening.
762 func (cl *Client) incomingPeerPort() int {
763 return cl.LocalPort()
766 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
767 if c.headerEncrypted {
770 rw, c.cryptoMethod, err = mse.InitiateHandshake(
777 cl.config.CryptoProvides,
781 return xerrors.Errorf("header obfuscation handshake: %w", err)
784 ih, err := cl.connBtHandshake(c, &t.infoHash)
786 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
788 if ih != t.infoHash {
789 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
794 // Calls f with any secret keys.
795 func (cl *Client) forSkeys(f func([]byte) bool) {
798 if false { // Emulate the bug from #114
800 for ih := range cl.torrents {
804 for range cl.torrents {
811 for ih := range cl.torrents {
818 // Do encryption and bittorrent handshakes as receiver.
819 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
820 defer perf.ScopeTimerErr(&err)()
822 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
824 if err == nil || err == mse.ErrNoSecretKeyMatch {
825 if c.headerEncrypted {
826 torrent.Add("handshakes received encrypted", 1)
828 torrent.Add("handshakes received unencrypted", 1)
831 torrent.Add("handshakes received with error while handling encryption", 1)
834 if err == mse.ErrNoSecretKeyMatch {
839 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
840 err = errors.New("connection not have required header obfuscation")
843 ih, err := cl.connBtHandshake(c, nil)
845 err = xerrors.Errorf("during bt handshake: %w", err)
854 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
855 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
860 c.PeerExtensionBytes = res.PeerExtensionBits
861 c.PeerID = res.PeerID
862 c.completedHandshake = time.Now()
863 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
869 func (cl *Client) runReceivedConn(c *PeerConn) {
870 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
874 t, err := cl.receiveHandshakes(c)
877 "error receiving handshakes on %v: %s", c, err,
878 ).SetLevel(log.Debug).
880 "network", c.network,
882 torrent.Add("error receiving handshake", 1)
884 cl.onBadAccept(c.RemoteAddr)
889 torrent.Add("received handshake for unloaded torrent", 1)
890 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
892 cl.onBadAccept(c.RemoteAddr)
896 torrent.Add("received handshake for loaded torrent", 1)
899 t.runHandshookConnLoggingErr(c)
902 // Client lock must be held before entering this.
903 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
905 if c.PeerID == cl.peerID {
908 addr := c.conn.RemoteAddr().String()
909 cl.dopplegangerAddrs[addr] = struct{}{}
911 // Because the remote address is not necessarily the same as its client's torrent listen
912 // address, we won't record the remote address as a doppleganger. Instead, the initiator
913 // can record *us* as the doppleganger.
915 return errors.New("local and remote peer ids are the same")
917 c.conn.SetWriteDeadline(time.Time{})
918 c.r = deadlineReader{c.conn, c.r}
919 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
920 if connIsIpv6(c.conn) {
921 torrent.Add("completed handshake over ipv6", 1)
923 if err := t.addConnection(c); err != nil {
924 return fmt.Errorf("adding connection: %w", err)
926 defer t.dropConnection(c)
927 go c.writer(time.Minute)
928 cl.sendInitialMessages(c, t)
929 err := c.mainReadLoop()
931 return fmt.Errorf("main read loop: %w", err)
936 // See the order given in Transmission's tr_peerMsgsNew.
937 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
938 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
939 conn.post(pp.Message{
941 ExtendedID: pp.HandshakeExtendedID,
942 ExtendedPayload: func() []byte {
943 msg := pp.ExtendedHandshakeMessage{
944 M: map[pp.ExtensionName]pp.ExtensionNumber{
945 pp.ExtensionNameMetadata: metadataExtendedId,
947 V: cl.config.ExtendedHandshakeClientVersion,
948 Reqq: 64, // TODO: Really?
949 YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
950 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
951 Port: cl.incomingPeerPort(),
952 MetadataSize: torrent.metadataSize(),
953 // TODO: We can figured these out specific to the socket
955 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
956 Ipv6: cl.config.PublicIp6.To16(),
958 if !cl.config.DisablePEX {
959 msg.M[pp.ExtensionNamePex] = pexExtendedId
961 return bencode.MustMarshal(msg)
966 if conn.fastEnabled() {
967 if torrent.haveAllPieces() {
968 conn.post(pp.Message{Type: pp.HaveAll})
969 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
971 } else if !torrent.haveAnyPieces() {
972 conn.post(pp.Message{Type: pp.HaveNone})
973 conn.sentHaves.Clear()
979 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
980 conn.post(pp.Message{
987 func (cl *Client) dhtPort() (ret uint16) {
988 cl.eachDhtServer(func(s DhtServer) {
989 ret = uint16(missinggo.AddrPort(s.Addr()))
994 func (cl *Client) haveDhtServer() (ret bool) {
995 cl.eachDhtServer(func(_ DhtServer) {
1001 // Process incoming ut_metadata message.
1002 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1003 var d map[string]int
1004 err := bencode.Unmarshal(payload, &d)
1005 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1006 } else if err != nil {
1007 return fmt.Errorf("error unmarshalling bencode: %s", err)
1009 msgType, ok := d["msg_type"]
1011 return errors.New("missing msg_type field")
1015 case pp.DataMetadataExtensionMsgType:
1016 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1017 if !c.requestedMetadataPiece(piece) {
1018 return fmt.Errorf("got unexpected piece %d", piece)
1020 c.metadataRequests[piece] = false
1021 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1022 if begin < 0 || begin >= len(payload) {
1023 return fmt.Errorf("data has bad offset in payload: %d", begin)
1025 t.saveMetadataPiece(piece, payload[begin:])
1026 c.lastUsefulChunkReceived = time.Now()
1027 return t.maybeCompleteMetadata()
1028 case pp.RequestMetadataExtensionMsgType:
1029 if !t.haveMetadataPiece(piece) {
1030 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1033 start := (1 << 14) * piece
1034 c.logger.Printf("sending metadata piece %d", piece)
1035 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1037 case pp.RejectMetadataExtensionMsgType:
1040 return errors.New("unknown msg_type value")
1044 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1045 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1046 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1051 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1055 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1058 if _, ok := cl.ipBlockRange(ip); ok {
1061 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1067 // Return a Torrent ready for insertion into a Client.
1068 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1069 // use provided storage, if provided
1070 storageClient := cl.defaultStorage
1071 if specStorage != nil {
1072 storageClient = storage.NewClient(specStorage)
1078 peers: prioritizedPeers{
1080 getPrio: func(p PeerInfo) peerPriority {
1081 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1084 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1086 halfOpen: make(map[string]PeerInfo),
1087 pieceStateChanges: pubsub.NewPubSub(),
1089 storageOpener: storageClient,
1090 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1092 networkingEnabled: true,
1093 metadataChanged: sync.Cond{
1096 webSeeds: make(map[string]*peer),
1098 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1099 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1100 t.logger = cl.logger.WithContextValue(t)
1101 t.setChunkSize(defaultChunkSize)
1105 // A file-like handle to some torrent data resource.
1106 type Handle interface {
1113 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1114 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1117 // Adds a torrent by InfoHash with a custom Storage implementation.
1118 // If the torrent already exists then this Storage is ignored and the
1119 // existing torrent returned with `new` set to `false`
1120 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1123 t, ok := cl.torrents[infoHash]
1129 t = cl.newTorrent(infoHash, specStorage)
1130 cl.eachDhtServer(func(s DhtServer) {
1131 go t.dhtAnnouncer(s)
1133 cl.torrents[infoHash] = t
1134 cl.clearAcceptLimits()
1135 t.updateWantPeersEvent()
1136 // Tickle Client.waitAccept, new torrent may want conns.
1137 cl.event.Broadcast()
1141 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1142 // Torrent.MergeSpec.
1143 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1144 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1145 err = t.MergeSpec(spec)
1149 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1150 // spec.DisallowDataDownload/Upload will be read and applied
1151 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1152 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1153 if spec.DisplayName != "" {
1154 t.SetDisplayName(spec.DisplayName)
1156 if spec.InfoBytes != nil {
1157 err := t.SetInfoBytes(spec.InfoBytes)
1163 cl.AddDHTNodes(spec.DhtNodes)
1166 useTorrentSources(spec.Sources, t)
1167 for _, url := range spec.Webseeds {
1170 if spec.ChunkSize != 0 {
1171 t.setChunkSize(pp.Integer(spec.ChunkSize))
1173 t.addTrackers(spec.Trackers)
1175 t.dataDownloadDisallowed = spec.DisallowDataDownload
1176 t.dataUploadDisallowed = spec.DisallowDataUpload
1180 func useTorrentSources(sources []string, t *Torrent) {
1181 for _, s := range sources {
1183 err := useTorrentSource(s, t)
1185 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1187 t.logger.Printf("successfully used source %q", s)
1193 func useTorrentSource(source string, t *Torrent) error {
1194 req, err := http.NewRequest(http.MethodGet, source, nil)
1198 ctx, cancel := context.WithCancel(context.Background())
1208 req = req.WithContext(ctx)
1209 resp, err := http.DefaultClient.Do(req)
1213 mi, err := metainfo.Load(resp.Body)
1215 if ctx.Err() != nil {
1220 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1223 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1224 t, ok := cl.torrents[infoHash]
1226 err = fmt.Errorf("no such torrent")
1233 delete(cl.torrents, infoHash)
1237 func (cl *Client) allTorrentsCompleted() bool {
1238 for _, t := range cl.torrents {
1242 if !t.haveAllPieces() {
1249 // Returns true when all torrents are completely downloaded and false if the
1250 // client is stopped before that.
1251 func (cl *Client) WaitAll() bool {
1254 for !cl.allTorrentsCompleted() {
1255 if cl.closed.IsSet() {
1263 // Returns handles to all the torrents loaded in the Client.
1264 func (cl *Client) Torrents() []*Torrent {
1267 return cl.torrentsAsSlice()
1270 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1271 for _, t := range cl.torrents {
1272 ret = append(ret, t)
1277 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1278 spec, err := TorrentSpecFromMagnetURI(uri)
1282 T, _, err = cl.AddTorrentSpec(spec)
1286 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1287 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1291 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1292 mi, err := metainfo.LoadFromFile(filename)
1296 return cl.AddTorrent(mi)
1299 func (cl *Client) DhtServers() []DhtServer {
1300 return cl.dhtServers
1303 func (cl *Client) AddDHTNodes(nodes []string) {
1304 for _, n := range nodes {
1305 hmp := missinggo.SplitHostMaybePort(n)
1306 ip := net.ParseIP(hmp.Host)
1308 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1311 ni := krpc.NodeInfo{
1312 Addr: krpc.NodeAddr{
1317 cl.eachDhtServer(func(s DhtServer) {
1323 func (cl *Client) banPeerIP(ip net.IP) {
1324 cl.logger.Printf("banning ip %v", ip)
1325 if cl.badPeerIPs == nil {
1326 cl.badPeerIPs = make(map[string]struct{})
1328 cl.badPeerIPs[ip.String()] = struct{}{}
1331 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1337 PeerMaxRequests: 250,
1339 RemoteAddr: remoteAddr,
1341 connString: connString,
1344 writeBuffer: new(bytes.Buffer),
1347 c.logger = cl.logger.WithDefaultLevel(log.Debug).WithContextValue(c)
1348 c.writerCond.L = cl.locker()
1349 c.setRW(connStatsReadWriter{nc, c})
1350 c.r = &rateLimitedReader{
1351 l: cl.config.DownloadRateLimiter,
1354 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1358 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1365 t.addPeers([]PeerInfo{{
1366 Addr: ipPortAddr{ip, port},
1367 Source: PeerSourceDhtAnnouncePeer,
1371 func firstNotNil(ips ...net.IP) net.IP {
1372 for _, ip := range ips {
1380 func (cl *Client) eachDialer(f func(Dialer) bool) {
1381 for _, s := range cl.dialers {
1388 func (cl *Client) eachListener(f func(Listener) bool) {
1389 for _, s := range cl.listeners {
1396 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1397 cl.eachListener(func(l Listener) bool {
1404 func (cl *Client) publicIp(peer net.IP) net.IP {
1405 // TODO: Use BEP 10 to determine how peers are seeing us.
1406 if peer.To4() != nil {
1408 cl.config.PublicIp4,
1409 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1414 cl.config.PublicIp6,
1415 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1419 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1420 l := cl.findListener(
1421 func(l net.Listener) bool {
1422 return f(addrIpOrNil(l.Addr()))
1428 return addrIpOrNil(l.Addr())
1431 // Our IP as a peer should see it.
1432 func (cl *Client) publicAddr(peer net.IP) IpPort {
1433 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1436 // ListenAddrs addresses currently being listened to.
1437 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1440 cl.eachListener(func(l Listener) bool {
1441 ret = append(ret, l.Addr())
1447 func (cl *Client) onBadAccept(addr net.Addr) {
1448 ipa, ok := tryIpPortFromNetAddr(addr)
1452 ip := maskIpForAcceptLimiting(ipa.IP)
1453 if cl.acceptLimiter == nil {
1454 cl.acceptLimiter = make(map[ipStr]int)
1456 cl.acceptLimiter[ipStr(ip.String())]++
1459 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1460 if ip4 := ip.To4(); ip4 != nil {
1461 return ip4.Mask(net.CIDRMask(24, 32))
1466 func (cl *Client) clearAcceptLimits() {
1467 cl.acceptLimiter = nil
1470 func (cl *Client) acceptLimitClearer() {
1473 case <-cl.closed.LockedChan(cl.locker()):
1475 case <-time.After(15 * time.Minute):
1477 cl.clearAcceptLimits()
1483 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1484 if cl.config.DisableAcceptRateLimiting {
1487 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1490 func (cl *Client) rLock() {
1494 func (cl *Client) rUnlock() {
1498 func (cl *Client) lock() {
1502 func (cl *Client) unlock() {
1506 func (cl *Client) locker() *lockWithDeferreds {
1510 func (cl *Client) String() string {
1511 return fmt.Sprintf("<%[1]T %[1]p>", cl)