18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
88 func (cl *Client) BadPeerIPs() []string {
91 return cl.badPeerIPsLocked()
94 func (cl *Client) badPeerIPsLocked() []string {
95 return slices.FromMapKeys(cl.badPeerIPs).([]string)
98 func (cl *Client) PeerID() PeerID {
102 // Returns the port number for the first listener that has one. No longer assumes that all port
103 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
105 func (cl *Client) LocalPort() (port int) {
106 cl.eachListener(func(l Listener) bool {
107 port = addrPortOrZero(l.Addr())
113 func writeDhtServerStatus(w io.Writer, s DhtServer) {
114 dhtStats := s.Stats()
115 fmt.Fprintf(w, " ID: %x\n", s.ID())
116 spew.Fdump(w, dhtStats)
119 // Writes out a human readable status of the client, such as for writing to a
121 func (cl *Client) WriteStatus(_w io.Writer) {
124 w := bufio.NewWriter(_w)
126 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
127 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
128 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
129 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131 cl.eachDhtServer(func(s DhtServer) {
132 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133 writeDhtServerStatus(w, s)
135 spew.Fdump(w, &cl.stats)
136 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
138 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139 return l.InfoHash().AsString() < r.InfoHash().AsString()
142 fmt.Fprint(w, "<unknown name>")
144 fmt.Fprint(w, t.name())
150 "%f%% of %d bytes (%s)",
151 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
153 humanize.Bytes(uint64(*t.length)))
155 w.WriteString("<missing metainfo>")
163 func (cl *Client) initLogger() {
164 cl.logger = cl.config.Logger.WithValues(cl)
165 if !cl.config.Debug {
166 cl.logger = cl.logger.FilterLevel(log.Info)
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
186 dopplegangerAddrs: make(map[string]struct{}),
187 torrents: make(map[metainfo.Hash]*Torrent),
188 dialRateLimiter: rate.NewLimiter(10, 10),
190 cl.activeAnnounceLimiter.SlotsPerKey = 2
191 go cl.acceptLimitClearer()
199 cl.event.L = cl.locker()
200 storageImpl := cfg.DefaultStorage
201 if storageImpl == nil {
202 // We'd use mmap by default but HFS+ doesn't support sparse files.
203 storageImplCloser := storage.NewFile(cfg.DataDir)
204 cl.onClose = append(cl.onClose, func() {
205 if err := storageImplCloser.Close(); err != nil {
206 cl.logger.Printf("error closing default storage: %s", err)
209 storageImpl = storageImplCloser
211 cl.defaultStorage = storage.NewClient(storageImpl)
212 if cfg.IPBlocklist != nil {
213 cl.ipBlockList = cfg.IPBlocklist
216 if cfg.PeerID != "" {
217 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
219 o := copy(cl.peerID[:], cfg.Bep20)
220 _, err = rand.Read(cl.peerID[o:])
222 panic("error generating peer id")
226 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
234 for _, _s := range sockets {
235 s := _s // Go is fucking retarded.
236 cl.onClose = append(cl.onClose, func() { s.Close() })
237 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
238 cl.dialers = append(cl.dialers, s)
239 cl.listeners = append(cl.listeners, s)
240 go cl.acceptConnections(s)
246 for _, s := range sockets {
247 if pc, ok := s.(net.PacketConn); ok {
248 ds, err := cl.newAnacrolixDhtServer(pc)
252 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
253 cl.onClose = append(cl.onClose, func() { ds.Close() })
258 cl.websocketTrackers = websocketTrackers{
261 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
264 t, ok := cl.torrents[infoHash]
266 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
268 return t.announceRequest(event), nil
270 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
273 t, ok := cl.torrents[dcc.InfoHash]
275 cl.logger.WithDefaultLevel(log.Warning).Printf(
276 "got webrtc conn for unloaded torrent with infohash %x",
282 go t.onWebRtcConn(dc, dcc)
289 func (cl *Client) AddDhtServer(d DhtServer) {
290 cl.dhtServers = append(cl.dhtServers, d)
293 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
294 // given address for any Torrent.
295 func (cl *Client) AddDialer(d Dialer) {
298 cl.dialers = append(cl.dialers, d)
299 for _, t := range cl.torrents {
304 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
306 func (cl *Client) AddListener(l Listener) {
307 cl.listeners = append(cl.listeners, l)
308 go cl.acceptConnections(l)
311 func (cl *Client) firewallCallback(net.Addr) bool {
313 block := !cl.wantConns()
316 torrent.Add("connections firewalled", 1)
318 torrent.Add("connections not firewalled", 1)
323 func (cl *Client) listenOnNetwork(n network) bool {
324 if n.Ipv4 && cl.config.DisableIPv4 {
327 if n.Ipv6 && cl.config.DisableIPv6 {
330 if n.Tcp && cl.config.DisableTCP {
333 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
339 func (cl *Client) listenNetworks() (ns []network) {
340 for _, n := range allPeerNetworks {
341 if cl.listenOnNetwork(n) {
348 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
349 cfg := dht.ServerConfig{
350 IPBlocklist: cl.ipBlockList,
352 OnAnnouncePeer: cl.onDHTAnnouncePeer,
353 PublicIP: func() net.IP {
354 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
355 return cl.config.PublicIp6
357 return cl.config.PublicIp4
359 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
360 ConnectionTracking: cl.config.ConnTracker,
361 OnQuery: cl.config.DHTOnQuery,
362 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
364 s, err = dht.NewServer(&cfg)
367 ts, err := s.Bootstrap()
369 cl.logger.Printf("error bootstrapping dht: %s", err)
371 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
377 func (cl *Client) Closed() <-chan struct{} {
383 func (cl *Client) eachDhtServer(f func(DhtServer)) {
384 for _, ds := range cl.dhtServers {
389 // Stops the client. All connections to peers are closed and all activity will
391 func (cl *Client) Close() {
395 for _, t := range cl.torrents {
398 for i := range cl.onClose {
399 cl.onClose[len(cl.onClose)-1-i]()
404 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
405 if cl.ipBlockList == nil {
408 return cl.ipBlockList.Lookup(ip)
411 func (cl *Client) ipIsBlocked(ip net.IP) bool {
412 _, blocked := cl.ipBlockRange(ip)
416 func (cl *Client) wantConns() bool {
417 for _, t := range cl.torrents {
425 func (cl *Client) waitAccept() {
427 if cl.closed.IsSet() {
437 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
438 func (cl *Client) rejectAccepted(conn net.Conn) error {
439 ra := conn.RemoteAddr()
440 if rip := addrIpOrNil(ra); rip != nil {
441 if cl.config.DisableIPv4Peers && rip.To4() != nil {
442 return errors.New("ipv4 peers disabled")
444 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
445 return errors.New("ipv4 disabled")
448 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
449 return errors.New("ipv6 disabled")
451 if cl.rateLimitAccept(rip) {
452 return errors.New("source IP accepted rate limited")
454 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
455 return errors.New("bad source addr")
461 func (cl *Client) acceptConnections(l Listener) {
463 conn, err := l.Accept()
464 torrent.Add("client listener accepts", 1)
465 conn = pproffd.WrapNetConn(conn)
467 closed := cl.closed.IsSet()
470 reject = cl.rejectAccepted(conn)
480 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
485 torrent.Add("rejected accepted connections", 1)
486 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
489 go cl.incomingConnection(conn)
491 log.Fmsg("accepted %q connection at %q from %q",
495 ).SetLevel(log.Debug).Log(cl.logger)
496 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
497 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
498 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
503 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
504 func regularNetConnPeerConnConnString(nc net.Conn) string {
505 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
508 func (cl *Client) incomingConnection(nc net.Conn) {
510 if tc, ok := nc.(*net.TCPConn); ok {
513 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
514 regularNetConnPeerConnConnString(nc))
516 c.Discovery = PeerSourceIncoming
517 cl.runReceivedConn(c)
520 // Returns a handle to the given torrent, if it's present in the client.
521 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
524 t, ok = cl.torrents[ih]
528 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
529 return cl.torrents[ih]
532 type dialResult struct {
537 func countDialResult(err error) {
539 torrent.Add("successful dials", 1)
541 torrent.Add("unsuccessful dials", 1)
545 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
546 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
547 if ret < minDialTimeout {
553 // Returns whether an address is known to connect to a client with our own ID.
554 func (cl *Client) dopplegangerAddr(addr string) bool {
555 _, ok := cl.dopplegangerAddrs[addr]
559 // Returns a connection over UTP or TCP, whichever is first to connect.
560 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
562 t := perf.NewTimer(perf.CallerName(0))
565 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
567 t.Mark("returned conn over " + res.Network)
571 ctx, cancel := context.WithCancel(ctx)
572 // As soon as we return one connection, cancel the others.
575 resCh := make(chan dialResult, left)
579 cl.eachDialer(func(s Dialer) bool {
582 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
585 cl.dialFromSocket(ctx, s, addr),
586 s.LocalAddr().Network(),
593 // Wait for a successful connection.
595 defer perf.ScopeTimer()()
596 for ; left > 0 && res.Conn == nil; left-- {
600 // There are still incompleted dials.
602 for ; left > 0; left-- {
603 conn := (<-resCh).Conn
610 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
612 //if res.Conn != nil {
613 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
615 // cl.logger.Printf("failed to dial %s", addr)
620 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
621 network := s.LocalAddr().Network()
622 cte := cl.config.ConnTracker.Wait(
624 conntrack.Entry{network, s.LocalAddr().String(), addr},
625 "dial torrent client",
628 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
629 // which dial errors allow us to forget the connection tracking entry handle.
630 if ctx.Err() != nil {
636 c, err := s.Dial(ctx, addr)
637 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
638 // it now in case we close the connection forthwith.
639 if tc, ok := c.(*net.TCPConn); ok {
644 if err != nil && forgettableDialError(err) {
651 return closeWrapper{c, func() error {
658 func forgettableDialError(err error) bool {
659 return strings.Contains(err.Error(), "no suitable address found")
662 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
663 if _, ok := t.halfOpen[addr]; !ok {
664 panic("invariant broken")
666 delete(t.halfOpen, addr)
668 for _, t := range cl.torrents {
673 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
674 // for valid reasons.
675 func (cl *Client) initiateProtocolHandshakes(
679 outgoing, encryptHeader bool,
681 network, connString string,
683 c *PeerConn, err error,
685 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
686 c.headerEncrypted = encryptHeader
687 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
689 dl, ok := ctx.Deadline()
693 err = nc.SetDeadline(dl)
697 err = cl.initiateHandshakes(c, t)
701 // Returns nil connection and nil error if no connection could be established for valid reasons.
702 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
703 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
706 return t.dialTimeout()
709 dr := cl.dialFirst(dialCtx, addr.String())
712 if dialCtx.Err() != nil {
713 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
715 return nil, errors.New("dial failed")
717 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
724 // Returns nil connection and nil error if no connection could be established
725 // for valid reasons.
726 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
727 torrent.Add("establish outgoing connection", 1)
728 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
729 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
731 torrent.Add("initiated conn with preferred header obfuscation", 1)
734 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
735 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
736 // We should have just tried with the preferred header obfuscation. If it was required,
737 // there's nothing else to try.
740 // Try again with encryption if we didn't earlier, or without if we did.
741 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
743 torrent.Add("initiated conn with fallback header obfuscation", 1)
745 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
749 // Called to dial out and run a connection. The addr we're given is already
750 // considered half-open.
751 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
752 cl.dialRateLimiter.Wait(context.Background())
753 c, err := cl.establishOutgoingConn(t, addr)
756 // Don't release lock between here and addConnection, unless it's for
758 cl.noLongerHalfOpen(t, addr.String())
761 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
768 t.runHandshookConnLoggingErr(c)
771 // The port number for incoming peer connections. 0 if the client isn't listening.
772 func (cl *Client) incomingPeerPort() int {
773 return cl.LocalPort()
776 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
777 if c.headerEncrypted {
780 rw, c.cryptoMethod, err = mse.InitiateHandshake(
787 cl.config.CryptoProvides,
791 return xerrors.Errorf("header obfuscation handshake: %w", err)
794 ih, err := cl.connBtHandshake(c, &t.infoHash)
796 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
798 if ih != t.infoHash {
799 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
804 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
805 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
806 func (cl *Client) forSkeys(f func([]byte) bool) {
809 if false { // Emulate the bug from #114
811 for ih := range cl.torrents {
815 for range cl.torrents {
822 for ih := range cl.torrents {
829 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
830 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
836 // Do encryption and bittorrent handshakes as receiver.
837 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
838 defer perf.ScopeTimerErr(&err)()
840 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
842 if err == nil || err == mse.ErrNoSecretKeyMatch {
843 if c.headerEncrypted {
844 torrent.Add("handshakes received encrypted", 1)
846 torrent.Add("handshakes received unencrypted", 1)
849 torrent.Add("handshakes received with error while handling encryption", 1)
852 if err == mse.ErrNoSecretKeyMatch {
857 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
858 err = errors.New("connection does not have required header obfuscation")
861 ih, err := cl.connBtHandshake(c, nil)
863 err = xerrors.Errorf("during bt handshake: %w", err)
872 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
873 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
878 c.PeerExtensionBytes = res.PeerExtensionBits
879 c.PeerID = res.PeerID
880 c.completedHandshake = time.Now()
881 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
887 func (cl *Client) runReceivedConn(c *PeerConn) {
888 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
892 t, err := cl.receiveHandshakes(c)
895 "error receiving handshakes on %v: %s", c, err,
896 ).SetLevel(log.Debug).
898 "network", c.network,
900 torrent.Add("error receiving handshake", 1)
902 cl.onBadAccept(c.RemoteAddr)
907 torrent.Add("received handshake for unloaded torrent", 1)
908 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
910 cl.onBadAccept(c.RemoteAddr)
914 torrent.Add("received handshake for loaded torrent", 1)
917 t.runHandshookConnLoggingErr(c)
920 // Client lock must be held before entering this.
921 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
923 if c.PeerID == cl.peerID {
926 addr := c.conn.RemoteAddr().String()
927 cl.dopplegangerAddrs[addr] = struct{}{}
929 // Because the remote address is not necessarily the same as its client's torrent listen
930 // address, we won't record the remote address as a doppleganger. Instead, the initiator
931 // can record *us* as the doppleganger.
933 return errors.New("local and remote peer ids are the same")
935 c.conn.SetWriteDeadline(time.Time{})
936 c.r = deadlineReader{c.conn, c.r}
937 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
938 if connIsIpv6(c.conn) {
939 torrent.Add("completed handshake over ipv6", 1)
941 if err := t.addConnection(c); err != nil {
942 return fmt.Errorf("adding connection: %w", err)
944 defer t.dropConnection(c)
945 go c.writer(time.Minute)
946 cl.sendInitialMessages(c, t)
947 err := c.mainReadLoop()
949 return fmt.Errorf("main read loop: %w", err)
954 // See the order given in Transmission's tr_peerMsgsNew.
955 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
956 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
957 conn.post(pp.Message{
959 ExtendedID: pp.HandshakeExtendedID,
960 ExtendedPayload: func() []byte {
961 msg := pp.ExtendedHandshakeMessage{
962 M: map[pp.ExtensionName]pp.ExtensionNumber{
963 pp.ExtensionNameMetadata: metadataExtendedId,
965 V: cl.config.ExtendedHandshakeClientVersion,
966 // If peer requests are buffered on read, this instructs the amount of memory
967 // that might be used to cache pending writes. Assuming 512KiB cached for
968 // sending, for 16KiB chunks.
970 YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
971 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
972 Port: cl.incomingPeerPort(),
973 MetadataSize: torrent.metadataSize(),
974 // TODO: We can figured these out specific to the socket
976 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
977 Ipv6: cl.config.PublicIp6.To16(),
979 if !cl.config.DisablePEX {
980 msg.M[pp.ExtensionNamePex] = pexExtendedId
982 return bencode.MustMarshal(msg)
987 if conn.fastEnabled() {
988 if torrent.haveAllPieces() {
989 conn.post(pp.Message{Type: pp.HaveAll})
990 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
992 } else if !torrent.haveAnyPieces() {
993 conn.post(pp.Message{Type: pp.HaveNone})
994 conn.sentHaves.Clear()
1000 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1001 conn.post(pp.Message{
1008 func (cl *Client) dhtPort() (ret uint16) {
1009 cl.eachDhtServer(func(s DhtServer) {
1010 ret = uint16(missinggo.AddrPort(s.Addr()))
1015 func (cl *Client) haveDhtServer() (ret bool) {
1016 cl.eachDhtServer(func(_ DhtServer) {
1022 // Process incoming ut_metadata message.
1023 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1024 var d map[string]int
1025 err := bencode.Unmarshal(payload, &d)
1026 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1027 } else if err != nil {
1028 return fmt.Errorf("error unmarshalling bencode: %s", err)
1030 msgType, ok := d["msg_type"]
1032 return errors.New("missing msg_type field")
1036 case pp.DataMetadataExtensionMsgType:
1037 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1038 if !c.requestedMetadataPiece(piece) {
1039 return fmt.Errorf("got unexpected piece %d", piece)
1041 c.metadataRequests[piece] = false
1042 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1043 if begin < 0 || begin >= len(payload) {
1044 return fmt.Errorf("data has bad offset in payload: %d", begin)
1046 t.saveMetadataPiece(piece, payload[begin:])
1047 c.lastUsefulChunkReceived = time.Now()
1048 return t.maybeCompleteMetadata()
1049 case pp.RequestMetadataExtensionMsgType:
1050 if !t.haveMetadataPiece(piece) {
1051 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1054 start := (1 << 14) * piece
1055 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1056 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1058 case pp.RejectMetadataExtensionMsgType:
1061 return errors.New("unknown msg_type value")
1065 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1066 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1067 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1072 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1076 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1079 if _, ok := cl.ipBlockRange(ip); ok {
1082 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1088 // Return a Torrent ready for insertion into a Client.
1089 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1090 // use provided storage, if provided
1091 storageClient := cl.defaultStorage
1092 if specStorage != nil {
1093 storageClient = storage.NewClient(specStorage)
1099 peers: prioritizedPeers{
1101 getPrio: func(p PeerInfo) peerPriority {
1102 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1105 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1107 halfOpen: make(map[string]PeerInfo),
1108 pieceStateChanges: pubsub.NewPubSub(),
1110 storageOpener: storageClient,
1111 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1113 networkingEnabled: true,
1114 metadataChanged: sync.Cond{
1117 webSeeds: make(map[string]*Peer),
1119 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1120 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1121 t.logger = cl.logger.WithContextValue(t)
1122 t.setChunkSize(defaultChunkSize)
1126 // A file-like handle to some torrent data resource.
1127 type Handle interface {
1134 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1135 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1138 // Adds a torrent by InfoHash with a custom Storage implementation.
1139 // If the torrent already exists then this Storage is ignored and the
1140 // existing torrent returned with `new` set to `false`
1141 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1144 t, ok := cl.torrents[infoHash]
1150 t = cl.newTorrent(infoHash, specStorage)
1151 cl.eachDhtServer(func(s DhtServer) {
1152 go t.dhtAnnouncer(s)
1154 cl.torrents[infoHash] = t
1155 cl.clearAcceptLimits()
1156 t.updateWantPeersEvent()
1157 // Tickle Client.waitAccept, new torrent may want conns.
1158 cl.event.Broadcast()
1162 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1163 // Torrent.MergeSpec.
1164 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1165 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1166 err = t.MergeSpec(spec)
1167 if err != nil && new {
1173 type stringAddr string
1175 var _ net.Addr = stringAddr("")
1177 func (stringAddr) Network() string { return "" }
1178 func (me stringAddr) String() string { return string(me) }
1180 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1181 // spec.DisallowDataDownload/Upload will be read and applied
1182 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1183 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1184 if spec.DisplayName != "" {
1185 t.SetDisplayName(spec.DisplayName)
1187 if spec.InfoBytes != nil {
1188 err := t.SetInfoBytes(spec.InfoBytes)
1194 cl.AddDhtNodes(spec.DhtNodes)
1197 useTorrentSources(spec.Sources, t)
1198 for _, url := range spec.Webseeds {
1201 for _, peerAddr := range spec.PeerAddrs {
1203 Addr: stringAddr(peerAddr),
1204 Source: PeerSourceDirect,
1208 if spec.ChunkSize != 0 {
1209 t.setChunkSize(pp.Integer(spec.ChunkSize))
1211 t.addTrackers(spec.Trackers)
1213 t.dataDownloadDisallowed = spec.DisallowDataDownload
1214 t.dataUploadDisallowed = spec.DisallowDataUpload
1218 func useTorrentSources(sources []string, t *Torrent) {
1219 for _, s := range sources {
1221 err := useTorrentSource(s, t)
1223 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1225 t.logger.Printf("successfully used source %q", s)
1231 func useTorrentSource(source string, t *Torrent) error {
1232 req, err := http.NewRequest(http.MethodGet, source, nil)
1236 ctx, cancel := context.WithCancel(context.Background())
1246 req = req.WithContext(ctx)
1247 resp, err := http.DefaultClient.Do(req)
1251 mi, err := metainfo.Load(resp.Body)
1253 if ctx.Err() != nil {
1258 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1261 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1262 t, ok := cl.torrents[infoHash]
1264 err = fmt.Errorf("no such torrent")
1271 delete(cl.torrents, infoHash)
1275 func (cl *Client) allTorrentsCompleted() bool {
1276 for _, t := range cl.torrents {
1280 if !t.haveAllPieces() {
1287 // Returns true when all torrents are completely downloaded and false if the
1288 // client is stopped before that.
1289 func (cl *Client) WaitAll() bool {
1292 for !cl.allTorrentsCompleted() {
1293 if cl.closed.IsSet() {
1301 // Returns handles to all the torrents loaded in the Client.
1302 func (cl *Client) Torrents() []*Torrent {
1305 return cl.torrentsAsSlice()
1308 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1309 for _, t := range cl.torrents {
1310 ret = append(ret, t)
1315 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1316 spec, err := TorrentSpecFromMagnetUri(uri)
1320 T, _, err = cl.AddTorrentSpec(spec)
1324 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1325 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1329 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1330 mi, err := metainfo.LoadFromFile(filename)
1334 return cl.AddTorrent(mi)
1337 func (cl *Client) DhtServers() []DhtServer {
1338 return cl.dhtServers
1341 func (cl *Client) AddDhtNodes(nodes []string) {
1342 for _, n := range nodes {
1343 hmp := missinggo.SplitHostMaybePort(n)
1344 ip := net.ParseIP(hmp.Host)
1346 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1349 ni := krpc.NodeInfo{
1350 Addr: krpc.NodeAddr{
1355 cl.eachDhtServer(func(s DhtServer) {
1361 func (cl *Client) banPeerIP(ip net.IP) {
1362 cl.logger.Printf("banning ip %v", ip)
1363 if cl.badPeerIPs == nil {
1364 cl.badPeerIPs = make(map[string]struct{})
1366 cl.badPeerIPs[ip.String()] = struct{}{}
1369 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1375 PeerMaxRequests: 250,
1377 RemoteAddr: remoteAddr,
1380 connString: connString,
1382 writeBuffer: new(bytes.Buffer),
1383 callbacks: &cl.config.Callbacks,
1386 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1387 c.writerCond.L = cl.locker()
1388 c.setRW(connStatsReadWriter{nc, c})
1389 c.r = &rateLimitedReader{
1390 l: cl.config.DownloadRateLimiter,
1393 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1397 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1404 t.addPeers([]PeerInfo{{
1405 Addr: ipPortAddr{ip, port},
1406 Source: PeerSourceDhtAnnouncePeer,
1410 func firstNotNil(ips ...net.IP) net.IP {
1411 for _, ip := range ips {
1419 func (cl *Client) eachDialer(f func(Dialer) bool) {
1420 for _, s := range cl.dialers {
1427 func (cl *Client) eachListener(f func(Listener) bool) {
1428 for _, s := range cl.listeners {
1435 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1436 cl.eachListener(func(l Listener) bool {
1443 func (cl *Client) publicIp(peer net.IP) net.IP {
1444 // TODO: Use BEP 10 to determine how peers are seeing us.
1445 if peer.To4() != nil {
1447 cl.config.PublicIp4,
1448 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1453 cl.config.PublicIp6,
1454 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1458 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1459 l := cl.findListener(
1460 func(l Listener) bool {
1461 return f(addrIpOrNil(l.Addr()))
1467 return addrIpOrNil(l.Addr())
1470 // Our IP as a peer should see it.
1471 func (cl *Client) publicAddr(peer net.IP) IpPort {
1472 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1475 // ListenAddrs addresses currently being listened to.
1476 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1479 cl.eachListener(func(l Listener) bool {
1480 ret = append(ret, l.Addr())
1486 func (cl *Client) onBadAccept(addr net.Addr) {
1487 ipa, ok := tryIpPortFromNetAddr(addr)
1491 ip := maskIpForAcceptLimiting(ipa.IP)
1492 if cl.acceptLimiter == nil {
1493 cl.acceptLimiter = make(map[ipStr]int)
1495 cl.acceptLimiter[ipStr(ip.String())]++
1498 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1499 if ip4 := ip.To4(); ip4 != nil {
1500 return ip4.Mask(net.CIDRMask(24, 32))
1505 func (cl *Client) clearAcceptLimits() {
1506 cl.acceptLimiter = nil
1509 func (cl *Client) acceptLimitClearer() {
1512 case <-cl.closed.LockedChan(cl.locker()):
1514 case <-time.After(15 * time.Minute):
1516 cl.clearAcceptLimits()
1522 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1523 if cl.config.DisableAcceptRateLimiting {
1526 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1529 func (cl *Client) rLock() {
1533 func (cl *Client) rUnlock() {
1537 func (cl *Client) lock() {
1541 func (cl *Client) unlock() {
1545 func (cl *Client) locker() *lockWithDeferreds {
1549 func (cl *Client) String() string {
1550 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1553 // Returns connection-level aggregate stats at the Client level. See the comment on
1554 // TorrentStats.ConnStats.
1555 func (cl *Client) ConnStats() ConnStats {
1556 return cl.stats.Copy()