18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/tracker"
28 "github.com/anacrolix/torrent/webtorrent"
29 "github.com/davecgh/go-spew/spew"
30 "github.com/dustin/go-humanize"
31 "github.com/google/btree"
32 "github.com/pion/datachannel"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/missinggo/v2"
37 "github.com/anacrolix/missinggo/v2/conntrack"
39 "github.com/anacrolix/torrent/bencode"
40 "github.com/anacrolix/torrent/iplist"
41 "github.com/anacrolix/torrent/metainfo"
42 "github.com/anacrolix/torrent/mse"
43 pp "github.com/anacrolix/torrent/peer_protocol"
44 "github.com/anacrolix/torrent/storage"
47 // Clients contain zero or more Torrents. A Client manages a blocklist, the
48 // TCP/UDP protocol ports, and DHT as desired.
50 // An aggregate of stats over all connections. First in struct to ensure
51 // 64-bit alignment of fields. See #262.
56 closed missinggo.Event
62 defaultStorage *storage.Client
66 dhtServers []DhtServer
67 ipBlockList iplist.Ranger
69 // Set of addresses that have our client ID. This intentionally will
70 // include ourselves if we end up trying to connect to our own address
71 // through legitimate channels.
72 dopplegangerAddrs map[string]struct{}
73 badPeerIPs map[string]struct{}
74 torrents map[InfoHash]*Torrent
76 acceptLimiter map[ipStr]int
77 dialRateLimiter *rate.Limiter
79 websocketTrackers websocketTrackers
84 func (cl *Client) BadPeerIPs() []string {
87 return cl.badPeerIPsLocked()
90 func (cl *Client) badPeerIPsLocked() []string {
91 return slices.FromMapKeys(cl.badPeerIPs).([]string)
94 func (cl *Client) PeerID() PeerID {
98 // Returns the port number for the first listener that has one. No longer assumes that all port
99 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
101 func (cl *Client) LocalPort() (port int) {
102 cl.eachListener(func(l Listener) bool {
103 port = addrPortOrZero(l.Addr())
109 func writeDhtServerStatus(w io.Writer, s DhtServer) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, " ID: %x\n", s.ID())
112 spew.Fdump(w, dhtStats)
115 // Writes out a human readable status of the client, such as for writing to a
117 func (cl *Client) WriteStatus(_w io.Writer) {
120 w := bufio.NewWriter(_w)
122 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
123 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
124 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
125 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
126 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
127 cl.eachDhtServer(func(s DhtServer) {
128 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
129 writeDhtServerStatus(w, s)
131 spew.Fdump(w, &cl.stats)
132 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135 return l.InfoHash().AsString() < r.InfoHash().AsString()
138 fmt.Fprint(w, "<unknown name>")
140 fmt.Fprint(w, t.name())
146 "%f%% of %d bytes (%s)",
147 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
149 humanize.Bytes(uint64(*t.length)))
151 w.WriteString("<missing metainfo>")
159 func (cl *Client) initLogger() {
160 cl.logger = cl.config.Logger.WithValues(cl)
161 if !cl.config.Debug {
162 cl.logger = cl.logger.FilterLevel(log.Info)
166 func (cl *Client) announceKey() int32 {
167 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
170 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
172 cfg = NewDefaultClientConfig()
182 dopplegangerAddrs: make(map[string]struct{}),
183 torrents: make(map[metainfo.Hash]*Torrent),
184 dialRateLimiter: rate.NewLimiter(10, 10),
186 go cl.acceptLimitClearer()
194 cl.event.L = cl.locker()
195 storageImpl := cfg.DefaultStorage
196 if storageImpl == nil {
197 // We'd use mmap by default but HFS+ doesn't support sparse files.
198 storageImplCloser := storage.NewFile(cfg.DataDir)
199 cl.onClose = append(cl.onClose, func() {
200 if err := storageImplCloser.Close(); err != nil {
201 cl.logger.Printf("error closing default storage: %s", err)
204 storageImpl = storageImplCloser
206 cl.defaultStorage = storage.NewClient(storageImpl)
207 if cfg.IPBlocklist != nil {
208 cl.ipBlockList = cfg.IPBlocklist
211 if cfg.PeerID != "" {
212 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
214 o := copy(cl.peerID[:], cfg.Bep20)
215 _, err = rand.Read(cl.peerID[o:])
217 panic("error generating peer id")
221 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
229 for _, _s := range sockets {
230 s := _s // Go is fucking retarded.
231 cl.onClose = append(cl.onClose, func() { s.Close() })
232 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
233 cl.dialers = append(cl.dialers, s)
234 cl.listeners = append(cl.listeners, s)
235 go cl.acceptConnections(s)
241 for _, s := range sockets {
242 if pc, ok := s.(net.PacketConn); ok {
243 ds, err := cl.newAnacrolixDhtServer(pc)
247 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
248 cl.onClose = append(cl.onClose, func() { ds.Close() })
253 cl.websocketTrackers = websocketTrackers{
256 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
259 t, ok := cl.torrents[infoHash]
261 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
263 return t.announceRequest(event), nil
265 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
268 t, ok := cl.torrents[dcc.InfoHash]
270 cl.logger.WithDefaultLevel(log.Warning).Printf(
271 "got webrtc conn for unloaded torrent with infohash %x",
277 go t.onWebRtcConn(dc, dcc)
284 func (cl *Client) AddDhtServer(d DhtServer) {
285 cl.dhtServers = append(cl.dhtServers, d)
288 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
289 // given address for any Torrent.
290 func (cl *Client) AddDialer(d Dialer) {
293 cl.dialers = append(cl.dialers, d)
294 for _, t := range cl.torrents {
299 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
301 func (cl *Client) AddListener(l Listener) {
302 cl.listeners = append(cl.listeners, l)
303 go cl.acceptConnections(l)
306 func (cl *Client) firewallCallback(net.Addr) bool {
308 block := !cl.wantConns()
311 torrent.Add("connections firewalled", 1)
313 torrent.Add("connections not firewalled", 1)
318 func (cl *Client) listenOnNetwork(n network) bool {
319 if n.Ipv4 && cl.config.DisableIPv4 {
322 if n.Ipv6 && cl.config.DisableIPv6 {
325 if n.Tcp && cl.config.DisableTCP {
328 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
334 func (cl *Client) listenNetworks() (ns []network) {
335 for _, n := range allPeerNetworks {
336 if cl.listenOnNetwork(n) {
343 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
344 cfg := dht.ServerConfig{
345 IPBlocklist: cl.ipBlockList,
347 OnAnnouncePeer: cl.onDHTAnnouncePeer,
348 PublicIP: func() net.IP {
349 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
350 return cl.config.PublicIp6
352 return cl.config.PublicIp4
354 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
355 ConnectionTracking: cl.config.ConnTracker,
356 OnQuery: cl.config.DHTOnQuery,
357 Logger: cl.logger.WithText(func(m log.Msg) string {
358 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
361 s, err = dht.NewServer(&cfg)
364 ts, err := s.Bootstrap()
366 cl.logger.Printf("error bootstrapping dht: %s", err)
368 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
374 func (cl *Client) Closed() <-chan struct{} {
380 func (cl *Client) eachDhtServer(f func(DhtServer)) {
381 for _, ds := range cl.dhtServers {
386 // Stops the client. All connections to peers are closed and all activity will
388 func (cl *Client) Close() {
392 for _, t := range cl.torrents {
395 for i := range cl.onClose {
396 cl.onClose[len(cl.onClose)-1-i]()
401 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
402 if cl.ipBlockList == nil {
405 return cl.ipBlockList.Lookup(ip)
408 func (cl *Client) ipIsBlocked(ip net.IP) bool {
409 _, blocked := cl.ipBlockRange(ip)
413 func (cl *Client) wantConns() bool {
414 for _, t := range cl.torrents {
422 func (cl *Client) waitAccept() {
424 if cl.closed.IsSet() {
434 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
435 func (cl *Client) rejectAccepted(conn net.Conn) error {
436 ra := conn.RemoteAddr()
437 if rip := addrIpOrNil(ra); rip != nil {
438 if cl.config.DisableIPv4Peers && rip.To4() != nil {
439 return errors.New("ipv4 peers disabled")
441 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
442 return errors.New("ipv4 disabled")
445 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
446 return errors.New("ipv6 disabled")
448 if cl.rateLimitAccept(rip) {
449 return errors.New("source IP accepted rate limited")
451 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
452 return errors.New("bad source addr")
458 func (cl *Client) acceptConnections(l net.Listener) {
460 conn, err := l.Accept()
461 torrent.Add("client listener accepts", 1)
462 conn = pproffd.WrapNetConn(conn)
464 closed := cl.closed.IsSet()
467 reject = cl.rejectAccepted(conn)
477 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
482 torrent.Add("rejected accepted connections", 1)
483 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
486 go cl.incomingConnection(conn)
488 log.Fmsg("accepted %q connection at %q from %q",
492 ).SetLevel(log.Debug).Log(cl.logger)
493 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
494 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
495 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
500 func regularConnString(nc net.Conn) string {
501 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
504 func (cl *Client) incomingConnection(nc net.Conn) {
506 if tc, ok := nc.(*net.TCPConn); ok {
509 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
510 regularConnString(nc))
511 c.Discovery = PeerSourceIncoming
512 cl.runReceivedConn(c)
515 // Returns a handle to the given torrent, if it's present in the client.
516 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
519 t, ok = cl.torrents[ih]
523 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
524 return cl.torrents[ih]
527 type dialResult struct {
532 func countDialResult(err error) {
534 torrent.Add("successful dials", 1)
536 torrent.Add("unsuccessful dials", 1)
540 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
541 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
542 if ret < minDialTimeout {
548 // Returns whether an address is known to connect to a client with our own ID.
549 func (cl *Client) dopplegangerAddr(addr string) bool {
550 _, ok := cl.dopplegangerAddrs[addr]
554 // Returns a connection over UTP or TCP, whichever is first to connect.
555 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
557 t := perf.NewTimer(perf.CallerName(0))
560 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
562 t.Mark("returned conn over " + res.Network)
566 ctx, cancel := context.WithCancel(ctx)
567 // As soon as we return one connection, cancel the others.
570 resCh := make(chan dialResult, left)
574 cl.eachDialer(func(s Dialer) bool {
577 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
580 cl.dialFromSocket(ctx, s, addr),
581 s.LocalAddr().Network(),
588 // Wait for a successful connection.
590 defer perf.ScopeTimer()()
591 for ; left > 0 && res.Conn == nil; left-- {
595 // There are still incompleted dials.
597 for ; left > 0; left-- {
598 conn := (<-resCh).Conn
605 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
607 //if res.Conn != nil {
608 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
610 // cl.logger.Printf("failed to dial %s", addr)
615 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
616 network := s.LocalAddr().Network()
617 cte := cl.config.ConnTracker.Wait(
619 conntrack.Entry{network, s.LocalAddr().String(), addr},
620 "dial torrent client",
623 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
624 // which dial errors allow us to forget the connection tracking entry handle.
625 if ctx.Err() != nil {
631 c, err := s.Dial(ctx, addr)
632 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
633 // it now in case we close the connection forthwith.
634 if tc, ok := c.(*net.TCPConn); ok {
639 if err != nil && forgettableDialError(err) {
646 return closeWrapper{c, func() error {
653 func forgettableDialError(err error) bool {
654 return strings.Contains(err.Error(), "no suitable address found")
657 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
658 if _, ok := t.halfOpen[addr]; !ok {
659 panic("invariant broken")
661 delete(t.halfOpen, addr)
665 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
666 // for valid reasons.
667 func (cl *Client) initiateProtocolHandshakes(
671 outgoing, encryptHeader bool,
673 network, connString string,
675 c *PeerConn, err error,
677 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
678 c.headerEncrypted = encryptHeader
679 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
681 dl, ok := ctx.Deadline()
685 err = nc.SetDeadline(dl)
689 err = cl.initiateHandshakes(c, t)
693 // Returns nil connection and nil error if no connection could be established for valid reasons.
694 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
695 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
698 return t.dialTimeout()
701 dr := cl.dialFirst(dialCtx, addr.String())
704 if dialCtx.Err() != nil {
705 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
707 return nil, errors.New("dial failed")
709 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
716 // Returns nil connection and nil error if no connection could be established
717 // for valid reasons.
718 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
719 torrent.Add("establish outgoing connection", 1)
720 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
721 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
723 torrent.Add("initiated conn with preferred header obfuscation", 1)
726 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
727 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
728 // We should have just tried with the preferred header obfuscation. If it was required,
729 // there's nothing else to try.
732 // Try again with encryption if we didn't earlier, or without if we did.
733 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
735 torrent.Add("initiated conn with fallback header obfuscation", 1)
737 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
741 // Called to dial out and run a connection. The addr we're given is already
742 // considered half-open.
743 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
744 cl.dialRateLimiter.Wait(context.Background())
745 c, err := cl.establishOutgoingConn(t, addr)
748 // Don't release lock between here and addConnection, unless it's for
750 cl.noLongerHalfOpen(t, addr.String())
753 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
760 t.runHandshookConnLoggingErr(c)
763 // The port number for incoming peer connections. 0 if the client isn't listening.
764 func (cl *Client) incomingPeerPort() int {
765 return cl.LocalPort()
768 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
769 if c.headerEncrypted {
772 rw, c.cryptoMethod, err = mse.InitiateHandshake(
779 cl.config.CryptoProvides,
783 return xerrors.Errorf("header obfuscation handshake: %w", err)
786 ih, err := cl.connBtHandshake(c, &t.infoHash)
788 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
790 if ih != t.infoHash {
791 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
796 // Calls f with any secret keys.
797 func (cl *Client) forSkeys(f func([]byte) bool) {
800 if false { // Emulate the bug from #114
802 for ih := range cl.torrents {
806 for range cl.torrents {
813 for ih := range cl.torrents {
820 // Do encryption and bittorrent handshakes as receiver.
821 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
822 defer perf.ScopeTimerErr(&err)()
824 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
826 if err == nil || err == mse.ErrNoSecretKeyMatch {
827 if c.headerEncrypted {
828 torrent.Add("handshakes received encrypted", 1)
830 torrent.Add("handshakes received unencrypted", 1)
833 torrent.Add("handshakes received with error while handling encryption", 1)
836 if err == mse.ErrNoSecretKeyMatch {
841 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
842 err = errors.New("connection not have required header obfuscation")
845 ih, err := cl.connBtHandshake(c, nil)
847 err = xerrors.Errorf("during bt handshake: %w", err)
856 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
857 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
862 c.PeerExtensionBytes = res.PeerExtensionBits
863 c.PeerID = res.PeerID
864 c.completedHandshake = time.Now()
868 func (cl *Client) runReceivedConn(c *PeerConn) {
869 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
873 t, err := cl.receiveHandshakes(c)
876 "error receiving handshakes on %v: %s", c, err,
877 ).SetLevel(log.Debug).
879 "network", c.network,
881 torrent.Add("error receiving handshake", 1)
883 cl.onBadAccept(c.remoteAddr)
888 torrent.Add("received handshake for unloaded torrent", 1)
889 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
891 cl.onBadAccept(c.remoteAddr)
895 torrent.Add("received handshake for loaded torrent", 1)
898 t.runHandshookConnLoggingErr(c)
901 // Client lock must be held before entering this.
902 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
904 if c.PeerID == cl.peerID {
907 addr := c.conn.RemoteAddr().String()
908 cl.dopplegangerAddrs[addr] = struct{}{}
910 // Because the remote address is not necessarily the same as its client's torrent listen
911 // address, we won't record the remote address as a doppleganger. Instead, the initiator
912 // can record *us* as the doppleganger.
914 return errors.New("local and remote peer ids are the same")
916 c.conn.SetWriteDeadline(time.Time{})
917 c.r = deadlineReader{c.conn, c.r}
918 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
919 if connIsIpv6(c.conn) {
920 torrent.Add("completed handshake over ipv6", 1)
922 if err := t.addConnection(c); err != nil {
923 return fmt.Errorf("adding connection: %w", err)
925 defer t.dropConnection(c)
926 go c.writer(time.Minute)
927 cl.sendInitialMessages(c, t)
928 err := c.mainReadLoop()
930 return fmt.Errorf("main read loop: %w", err)
935 // See the order given in Transmission's tr_peerMsgsNew.
936 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
937 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
938 conn.post(pp.Message{
940 ExtendedID: pp.HandshakeExtendedID,
941 ExtendedPayload: func() []byte {
942 msg := pp.ExtendedHandshakeMessage{
943 M: map[pp.ExtensionName]pp.ExtensionNumber{
944 pp.ExtensionNameMetadata: metadataExtendedId,
946 V: cl.config.ExtendedHandshakeClientVersion,
947 Reqq: 64, // TODO: Really?
948 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
949 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
950 Port: cl.incomingPeerPort(),
951 MetadataSize: torrent.metadataSize(),
952 // TODO: We can figured these out specific to the socket
954 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
955 Ipv6: cl.config.PublicIp6.To16(),
957 if !cl.config.DisablePEX {
958 msg.M[pp.ExtensionNamePex] = pexExtendedId
960 return bencode.MustMarshal(msg)
965 if conn.fastEnabled() {
966 if torrent.haveAllPieces() {
967 conn.post(pp.Message{Type: pp.HaveAll})
968 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
970 } else if !torrent.haveAnyPieces() {
971 conn.post(pp.Message{Type: pp.HaveNone})
972 conn.sentHaves.Clear()
978 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
979 conn.post(pp.Message{
986 func (cl *Client) dhtPort() (ret uint16) {
987 cl.eachDhtServer(func(s DhtServer) {
988 ret = uint16(missinggo.AddrPort(s.Addr()))
993 func (cl *Client) haveDhtServer() (ret bool) {
994 cl.eachDhtServer(func(_ DhtServer) {
1000 // Process incoming ut_metadata message.
1001 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1002 var d map[string]int
1003 err := bencode.Unmarshal(payload, &d)
1004 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1005 } else if err != nil {
1006 return fmt.Errorf("error unmarshalling bencode: %s", err)
1008 msgType, ok := d["msg_type"]
1010 return errors.New("missing msg_type field")
1014 case pp.DataMetadataExtensionMsgType:
1015 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1016 if !c.requestedMetadataPiece(piece) {
1017 return fmt.Errorf("got unexpected piece %d", piece)
1019 c.metadataRequests[piece] = false
1020 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1021 if begin < 0 || begin >= len(payload) {
1022 return fmt.Errorf("data has bad offset in payload: %d", begin)
1024 t.saveMetadataPiece(piece, payload[begin:])
1025 c.lastUsefulChunkReceived = time.Now()
1026 return t.maybeCompleteMetadata()
1027 case pp.RequestMetadataExtensionMsgType:
1028 if !t.haveMetadataPiece(piece) {
1029 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1032 start := (1 << 14) * piece
1033 c.logger.Printf("sending metadata piece %d", piece)
1034 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1036 case pp.RejectMetadataExtensionMsgType:
1039 return errors.New("unknown msg_type value")
1043 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1044 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1045 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1050 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1054 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1057 if _, ok := cl.ipBlockRange(ip); ok {
1060 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1066 // Return a Torrent ready for insertion into a Client.
1067 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1068 // use provided storage, if provided
1069 storageClient := cl.defaultStorage
1070 if specStorage != nil {
1071 storageClient = storage.NewClient(specStorage)
1077 peers: prioritizedPeers{
1079 getPrio: func(p PeerInfo) peerPriority {
1080 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1083 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1085 halfOpen: make(map[string]PeerInfo),
1086 pieceStateChanges: pubsub.NewPubSub(),
1088 storageOpener: storageClient,
1089 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1091 networkingEnabled: true,
1092 metadataChanged: sync.Cond{
1095 webSeeds: make(map[string]*peer),
1097 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1098 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1099 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1100 return fmt.Sprintf("%v: %s", t, m.Text())
1102 t.setChunkSize(defaultChunkSize)
1106 // A file-like handle to some torrent data resource.
1107 type Handle interface {
1114 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1115 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1118 // Adds a torrent by InfoHash with a custom Storage implementation.
1119 // If the torrent already exists then this Storage is ignored and the
1120 // existing torrent returned with `new` set to `false`
1121 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1124 t, ok := cl.torrents[infoHash]
1130 t = cl.newTorrent(infoHash, specStorage)
1131 cl.eachDhtServer(func(s DhtServer) {
1132 go t.dhtAnnouncer(s)
1134 cl.torrents[infoHash] = t
1135 cl.clearAcceptLimits()
1136 t.updateWantPeersEvent()
1137 // Tickle Client.waitAccept, new torrent may want conns.
1138 cl.event.Broadcast()
1142 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1143 // Torrent.MergeSpec.
1144 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1145 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1146 err = t.MergeSpec(spec)
1150 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1151 // spec.DisallowDataDownload/Upload will be read and applied
1152 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1153 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1154 if spec.DisplayName != "" {
1155 t.SetDisplayName(spec.DisplayName)
1157 if spec.InfoBytes != nil {
1158 err := t.SetInfoBytes(spec.InfoBytes)
1164 cl.AddDHTNodes(spec.DhtNodes)
1167 useTorrentSources(spec.Sources, t)
1168 for _, url := range spec.Webseeds {
1171 if spec.ChunkSize != 0 {
1172 t.setChunkSize(pp.Integer(spec.ChunkSize))
1174 t.addTrackers(spec.Trackers)
1176 t.dataDownloadDisallowed = spec.DisallowDataDownload
1177 t.dataUploadDisallowed = spec.DisallowDataUpload
1181 func useTorrentSources(sources []string, t *Torrent) {
1182 for _, s := range sources {
1184 err := useTorrentSource(s, t)
1186 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1188 t.logger.Printf("successfully used source %q", s)
1194 func useTorrentSource(source string, t *Torrent) error {
1195 req, err := http.NewRequest(http.MethodGet, source, nil)
1199 ctx, cancel := context.WithCancel(context.Background())
1209 req = req.WithContext(ctx)
1210 resp, err := http.DefaultClient.Do(req)
1212 if ctx.Err() != nil {
1217 mi, err := metainfo.Load(resp.Body)
1219 if ctx.Err() != nil {
1224 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1227 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1228 t, ok := cl.torrents[infoHash]
1230 err = fmt.Errorf("no such torrent")
1237 delete(cl.torrents, infoHash)
1241 func (cl *Client) allTorrentsCompleted() bool {
1242 for _, t := range cl.torrents {
1246 if !t.haveAllPieces() {
1253 // Returns true when all torrents are completely downloaded and false if the
1254 // client is stopped before that.
1255 func (cl *Client) WaitAll() bool {
1258 for !cl.allTorrentsCompleted() {
1259 if cl.closed.IsSet() {
1267 // Returns handles to all the torrents loaded in the Client.
1268 func (cl *Client) Torrents() []*Torrent {
1271 return cl.torrentsAsSlice()
1274 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1275 for _, t := range cl.torrents {
1276 ret = append(ret, t)
1281 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1282 spec, err := TorrentSpecFromMagnetURI(uri)
1286 T, _, err = cl.AddTorrentSpec(spec)
1290 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1291 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1295 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1296 mi, err := metainfo.LoadFromFile(filename)
1300 return cl.AddTorrent(mi)
1303 func (cl *Client) DhtServers() []DhtServer {
1304 return cl.dhtServers
1307 func (cl *Client) AddDHTNodes(nodes []string) {
1308 for _, n := range nodes {
1309 hmp := missinggo.SplitHostMaybePort(n)
1310 ip := net.ParseIP(hmp.Host)
1312 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1315 ni := krpc.NodeInfo{
1316 Addr: krpc.NodeAddr{
1321 cl.eachDhtServer(func(s DhtServer) {
1327 func (cl *Client) banPeerIP(ip net.IP) {
1328 cl.logger.Printf("banning ip %v", ip)
1329 if cl.badPeerIPs == nil {
1330 cl.badPeerIPs = make(map[string]struct{})
1332 cl.badPeerIPs[ip.String()] = struct{}{}
1335 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1341 PeerMaxRequests: 250,
1343 remoteAddr: remoteAddr,
1345 connString: connString,
1348 writeBuffer: new(bytes.Buffer),
1351 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1352 return fmt.Sprintf("%v: %s", c, m.Text())
1354 c.writerCond.L = cl.locker()
1355 c.setRW(connStatsReadWriter{nc, c})
1356 c.r = &rateLimitedReader{
1357 l: cl.config.DownloadRateLimiter,
1360 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1364 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1371 t.addPeers([]PeerInfo{{
1372 Addr: ipPortAddr{ip, port},
1373 Source: PeerSourceDhtAnnouncePeer,
1377 func firstNotNil(ips ...net.IP) net.IP {
1378 for _, ip := range ips {
1386 func (cl *Client) eachDialer(f func(Dialer) bool) {
1387 for _, s := range cl.dialers {
1394 func (cl *Client) eachListener(f func(Listener) bool) {
1395 for _, s := range cl.listeners {
1402 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1403 cl.eachListener(func(l Listener) bool {
1410 func (cl *Client) publicIp(peer net.IP) net.IP {
1411 // TODO: Use BEP 10 to determine how peers are seeing us.
1412 if peer.To4() != nil {
1414 cl.config.PublicIp4,
1415 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1420 cl.config.PublicIp6,
1421 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1425 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1426 l := cl.findListener(
1427 func(l net.Listener) bool {
1428 return f(addrIpOrNil(l.Addr()))
1434 return addrIpOrNil(l.Addr())
1437 // Our IP as a peer should see it.
1438 func (cl *Client) publicAddr(peer net.IP) IpPort {
1439 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1442 // ListenAddrs addresses currently being listened to.
1443 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1446 cl.eachListener(func(l Listener) bool {
1447 ret = append(ret, l.Addr())
1453 func (cl *Client) onBadAccept(addr net.Addr) {
1454 ipa, ok := tryIpPortFromNetAddr(addr)
1458 ip := maskIpForAcceptLimiting(ipa.IP)
1459 if cl.acceptLimiter == nil {
1460 cl.acceptLimiter = make(map[ipStr]int)
1462 cl.acceptLimiter[ipStr(ip.String())]++
1465 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1466 if ip4 := ip.To4(); ip4 != nil {
1467 return ip4.Mask(net.CIDRMask(24, 32))
1472 func (cl *Client) clearAcceptLimits() {
1473 cl.acceptLimiter = nil
1476 func (cl *Client) acceptLimitClearer() {
1479 case <-cl.closed.LockedChan(cl.locker()):
1481 case <-time.After(15 * time.Minute):
1483 cl.clearAcceptLimits()
1489 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1490 if cl.config.DisableAcceptRateLimiting {
1493 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1496 func (cl *Client) rLock() {
1500 func (cl *Client) rUnlock() {
1504 func (cl *Client) lock() {
1508 func (cl *Client) unlock() {
1512 func (cl *Client) locker() *lockWithDeferreds {
1516 func (cl *Client) String() string {
1517 return fmt.Sprintf("<%[1]T %[1]p>", cl)