18 "github.com/anacrolix/dht/v2"
19 "github.com/anacrolix/dht/v2/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2/pproffd"
26 "github.com/anacrolix/sync"
27 "github.com/anacrolix/torrent/internal/limiter"
28 "github.com/anacrolix/torrent/tracker"
29 "github.com/anacrolix/torrent/webtorrent"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "github.com/pion/datachannel"
34 "golang.org/x/time/rate"
35 "golang.org/x/xerrors"
37 "github.com/anacrolix/missinggo/v2"
38 "github.com/anacrolix/missinggo/v2/conntrack"
40 "github.com/anacrolix/torrent/bencode"
41 "github.com/anacrolix/torrent/iplist"
42 "github.com/anacrolix/torrent/metainfo"
43 "github.com/anacrolix/torrent/mse"
44 pp "github.com/anacrolix/torrent/peer_protocol"
45 "github.com/anacrolix/torrent/storage"
48 // Clients contain zero or more Torrents. A Client manages a blocklist, the
49 // TCP/UDP protocol ports, and DHT as desired.
51 // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
57 closed missinggo.Event
63 defaultStorage *storage.Client
67 dhtServers []DhtServer
68 ipBlockList iplist.Ranger
70 // Set of addresses that have our client ID. This intentionally will
71 // include ourselves if we end up trying to connect to our own address
72 // through legitimate channels.
73 dopplegangerAddrs map[string]struct{}
74 badPeerIPs map[string]struct{}
75 torrents map[InfoHash]*Torrent
77 acceptLimiter map[ipStr]int
78 dialRateLimiter *rate.Limiter
81 websocketTrackers websocketTrackers
83 activeAnnounceLimiter limiter.Instance
85 clientPieceRequestOrder
90 func (cl *Client) BadPeerIPs() []string {
93 return cl.badPeerIPsLocked()
96 func (cl *Client) badPeerIPsLocked() []string {
97 return slices.FromMapKeys(cl.badPeerIPs).([]string)
100 func (cl *Client) PeerID() PeerID {
104 // Returns the port number for the first listener that has one. No longer assumes that all port
105 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
107 func (cl *Client) LocalPort() (port int) {
108 cl.eachListener(func(l Listener) bool {
109 port = addrPortOrZero(l.Addr())
115 func writeDhtServerStatus(w io.Writer, s DhtServer) {
116 dhtStats := s.Stats()
117 fmt.Fprintf(w, " ID: %x\n", s.ID())
118 spew.Fdump(w, dhtStats)
121 // Writes out a human readable status of the client, such as for writing to a
123 func (cl *Client) WriteStatus(_w io.Writer) {
126 w := bufio.NewWriter(_w)
128 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
129 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
130 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
131 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
132 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
133 cl.eachDhtServer(func(s DhtServer) {
134 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
135 writeDhtServerStatus(w, s)
137 spew.Fdump(w, &cl.stats)
138 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
140 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
141 return l.InfoHash().AsString() < r.InfoHash().AsString()
144 fmt.Fprint(w, "<unknown name>")
146 fmt.Fprint(w, t.name())
152 "%f%% of %d bytes (%s)",
153 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
155 humanize.Bytes(uint64(*t.length)))
157 w.WriteString("<missing metainfo>")
165 // Filters things that are less than warning from UPnP discovery.
166 func upnpDiscoverLogFilter(m log.Msg) bool {
167 level, ok := m.GetLevel()
168 return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok)
171 func (cl *Client) initLogger() {
172 logger := cl.config.Logger
175 if !cl.config.Debug {
176 logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter)
179 cl.logger = logger.WithValues(cl)
182 func (cl *Client) announceKey() int32 {
183 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
186 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
188 cfg = NewDefaultClientConfig()
198 dopplegangerAddrs: make(map[string]struct{}),
199 torrents: make(map[metainfo.Hash]*Torrent),
200 dialRateLimiter: rate.NewLimiter(10, 10),
202 cl.activeAnnounceLimiter.SlotsPerKey = 2
203 go cl.acceptLimitClearer()
211 cl.event.L = cl.locker()
212 storageImpl := cfg.DefaultStorage
213 if storageImpl == nil {
214 // We'd use mmap by default but HFS+ doesn't support sparse files.
215 storageImplCloser := storage.NewFile(cfg.DataDir)
216 cl.onClose = append(cl.onClose, func() {
217 if err := storageImplCloser.Close(); err != nil {
218 cl.logger.Printf("error closing default storage: %s", err)
221 storageImpl = storageImplCloser
223 cl.defaultStorage = storage.NewClient(storageImpl)
224 if cfg.IPBlocklist != nil {
225 cl.ipBlockList = cfg.IPBlocklist
228 if cfg.PeerID != "" {
229 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
231 o := copy(cl.peerID[:], cfg.Bep20)
232 _, err = rand.Read(cl.peerID[o:])
234 panic("error generating peer id")
238 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
246 for _, _s := range sockets {
247 s := _s // Go is fucking retarded.
248 cl.onClose = append(cl.onClose, func() { s.Close() })
249 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
250 cl.dialers = append(cl.dialers, s)
251 cl.listeners = append(cl.listeners, s)
252 go cl.acceptConnections(s)
258 for _, s := range sockets {
259 if pc, ok := s.(net.PacketConn); ok {
260 ds, err := cl.newAnacrolixDhtServer(pc)
264 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
265 cl.onClose = append(cl.onClose, func() { ds.Close() })
270 cl.websocketTrackers = websocketTrackers{
273 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
276 t, ok := cl.torrents[infoHash]
278 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
280 return t.announceRequest(event), nil
282 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
285 t, ok := cl.torrents[dcc.InfoHash]
287 cl.logger.WithDefaultLevel(log.Warning).Printf(
288 "got webrtc conn for unloaded torrent with infohash %x",
294 go t.onWebRtcConn(dc, dcc)
303 func (cl *Client) AddDhtServer(d DhtServer) {
304 cl.dhtServers = append(cl.dhtServers, d)
307 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
308 // given address for any Torrent.
309 func (cl *Client) AddDialer(d Dialer) {
312 cl.dialers = append(cl.dialers, d)
313 for _, t := range cl.torrents {
318 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
320 func (cl *Client) AddListener(l Listener) {
321 cl.listeners = append(cl.listeners, l)
322 go cl.acceptConnections(l)
325 func (cl *Client) firewallCallback(net.Addr) bool {
327 block := !cl.wantConns()
330 torrent.Add("connections firewalled", 1)
332 torrent.Add("connections not firewalled", 1)
337 func (cl *Client) listenOnNetwork(n network) bool {
338 if n.Ipv4 && cl.config.DisableIPv4 {
341 if n.Ipv6 && cl.config.DisableIPv6 {
344 if n.Tcp && cl.config.DisableTCP {
347 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
353 func (cl *Client) listenNetworks() (ns []network) {
354 for _, n := range allPeerNetworks {
355 if cl.listenOnNetwork(n) {
362 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
363 cfg := dht.ServerConfig{
364 IPBlocklist: cl.ipBlockList,
366 OnAnnouncePeer: cl.onDHTAnnouncePeer,
367 PublicIP: func() net.IP {
368 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
369 return cl.config.PublicIp6
371 return cl.config.PublicIp4
373 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
374 ConnectionTracking: cl.config.ConnTracker,
375 OnQuery: cl.config.DHTOnQuery,
376 Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
378 if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
381 s, err = dht.NewServer(&cfg)
384 ts, err := s.Bootstrap()
386 cl.logger.Printf("error bootstrapping dht: %s", err)
388 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
394 func (cl *Client) Closed() <-chan struct{} {
400 func (cl *Client) eachDhtServer(f func(DhtServer)) {
401 for _, ds := range cl.dhtServers {
406 // Stops the client. All connections to peers are closed and all activity will
408 func (cl *Client) Close() {
412 for _, t := range cl.torrents {
415 for i := range cl.onClose {
416 cl.onClose[len(cl.onClose)-1-i]()
421 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
422 if cl.ipBlockList == nil {
425 return cl.ipBlockList.Lookup(ip)
428 func (cl *Client) ipIsBlocked(ip net.IP) bool {
429 _, blocked := cl.ipBlockRange(ip)
433 func (cl *Client) wantConns() bool {
434 for _, t := range cl.torrents {
442 func (cl *Client) waitAccept() {
444 if cl.closed.IsSet() {
454 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
455 func (cl *Client) rejectAccepted(conn net.Conn) error {
456 ra := conn.RemoteAddr()
457 if rip := addrIpOrNil(ra); rip != nil {
458 if cl.config.DisableIPv4Peers && rip.To4() != nil {
459 return errors.New("ipv4 peers disabled")
461 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
462 return errors.New("ipv4 disabled")
465 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
466 return errors.New("ipv6 disabled")
468 if cl.rateLimitAccept(rip) {
469 return errors.New("source IP accepted rate limited")
471 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
472 return errors.New("bad source addr")
478 func (cl *Client) acceptConnections(l Listener) {
480 conn, err := l.Accept()
481 torrent.Add("client listener accepts", 1)
482 conn = pproffd.WrapNetConn(conn)
484 closed := cl.closed.IsSet()
487 reject = cl.rejectAccepted(conn)
497 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
502 torrent.Add("rejected accepted connections", 1)
503 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
506 go cl.incomingConnection(conn)
508 log.Fmsg("accepted %q connection at %q from %q",
512 ).SetLevel(log.Debug).Log(cl.logger)
513 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
514 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
515 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
520 // Creates the PeerConn.connString for a regular net.Conn PeerConn.
521 func regularNetConnPeerConnConnString(nc net.Conn) string {
522 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
525 func (cl *Client) incomingConnection(nc net.Conn) {
527 if tc, ok := nc.(*net.TCPConn); ok {
530 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
531 regularNetConnPeerConnConnString(nc))
533 c.Discovery = PeerSourceIncoming
534 cl.runReceivedConn(c)
537 // Returns a handle to the given torrent, if it's present in the client.
538 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
541 t, ok = cl.torrents[ih]
545 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
546 return cl.torrents[ih]
549 type dialResult struct {
554 func countDialResult(err error) {
556 torrent.Add("successful dials", 1)
558 torrent.Add("unsuccessful dials", 1)
562 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
563 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
564 if ret < minDialTimeout {
570 // Returns whether an address is known to connect to a client with our own ID.
571 func (cl *Client) dopplegangerAddr(addr string) bool {
572 _, ok := cl.dopplegangerAddrs[addr]
576 // Returns a connection over UTP or TCP, whichever is first to connect.
577 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
579 t := perf.NewTimer(perf.CallerName(0))
582 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
584 t.Mark("returned conn over " + res.Network)
588 ctx, cancel := context.WithCancel(ctx)
589 // As soon as we return one connection, cancel the others.
592 resCh := make(chan dialResult, left)
596 cl.eachDialer(func(s Dialer) bool {
599 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
602 cl.dialFromSocket(ctx, s, addr),
603 s.LocalAddr().Network(),
610 // Wait for a successful connection.
612 defer perf.ScopeTimer()()
613 for ; left > 0 && res.Conn == nil; left-- {
617 // There are still incompleted dials.
619 for ; left > 0; left-- {
620 conn := (<-resCh).Conn
627 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
629 //if res.Conn != nil {
630 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
632 // cl.logger.Printf("failed to dial %s", addr)
637 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
638 network := s.LocalAddr().Network()
639 cte := cl.config.ConnTracker.Wait(
641 conntrack.Entry{network, s.LocalAddr().String(), addr},
642 "dial torrent client",
645 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
646 // which dial errors allow us to forget the connection tracking entry handle.
647 if ctx.Err() != nil {
653 c, err := s.Dial(ctx, addr)
654 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
655 // it now in case we close the connection forthwith.
656 if tc, ok := c.(*net.TCPConn); ok {
661 if err != nil && forgettableDialError(err) {
668 return closeWrapper{c, func() error {
675 func forgettableDialError(err error) bool {
676 return strings.Contains(err.Error(), "no suitable address found")
679 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
680 if _, ok := t.halfOpen[addr]; !ok {
681 panic("invariant broken")
683 delete(t.halfOpen, addr)
685 for _, t := range cl.torrents {
690 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
691 // for valid reasons.
692 func (cl *Client) initiateProtocolHandshakes(
696 outgoing, encryptHeader bool,
697 remoteAddr PeerRemoteAddr,
698 network, connString string,
700 c *PeerConn, err error,
702 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
703 c.headerEncrypted = encryptHeader
704 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
706 dl, ok := ctx.Deadline()
710 err = nc.SetDeadline(dl)
714 err = cl.initiateHandshakes(c, t)
718 // Returns nil connection and nil error if no connection could be established for valid reasons.
719 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
720 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
723 return t.dialTimeout()
726 dr := cl.dialFirst(dialCtx, addr.String())
729 if dialCtx.Err() != nil {
730 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
732 return nil, errors.New("dial failed")
734 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularNetConnPeerConnConnString(nc))
741 // Returns nil connection and nil error if no connection could be established
742 // for valid reasons.
743 func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
744 torrent.Add("establish outgoing connection", 1)
745 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
746 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
748 torrent.Add("initiated conn with preferred header obfuscation", 1)
751 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
752 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
753 // We should have just tried with the preferred header obfuscation. If it was required,
754 // there's nothing else to try.
757 // Try again with encryption if we didn't earlier, or without if we did.
758 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
760 torrent.Add("initiated conn with fallback header obfuscation", 1)
762 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
766 // Called to dial out and run a connection. The addr we're given is already
767 // considered half-open.
768 func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
769 cl.dialRateLimiter.Wait(context.Background())
770 c, err := cl.establishOutgoingConn(t, addr)
773 // Don't release lock between here and addPeerConn, unless it's for
775 cl.noLongerHalfOpen(t, addr.String())
778 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
785 t.runHandshookConnLoggingErr(c)
788 // The port number for incoming peer connections. 0 if the client isn't listening.
789 func (cl *Client) incomingPeerPort() int {
790 return cl.LocalPort()
793 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
794 if c.headerEncrypted {
797 rw, c.cryptoMethod, err = mse.InitiateHandshake(
804 cl.config.CryptoProvides,
808 return xerrors.Errorf("header obfuscation handshake: %w", err)
811 ih, err := cl.connBtHandshake(c, &t.infoHash)
813 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
815 if ih != t.infoHash {
816 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
821 // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
822 // that won't also try to take the lock. This saves us copying all the infohashes everytime.
823 func (cl *Client) forSkeys(f func([]byte) bool) {
826 if false { // Emulate the bug from #114
828 for ih := range cl.torrents {
832 for range cl.torrents {
839 for ih := range cl.torrents {
846 func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
847 if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
853 // Do encryption and bittorrent handshakes as receiver.
854 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
855 defer perf.ScopeTimerErr(&err)()
857 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
859 if err == nil || err == mse.ErrNoSecretKeyMatch {
860 if c.headerEncrypted {
861 torrent.Add("handshakes received encrypted", 1)
863 torrent.Add("handshakes received unencrypted", 1)
866 torrent.Add("handshakes received with error while handling encryption", 1)
869 if err == mse.ErrNoSecretKeyMatch {
874 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
875 err = errors.New("connection does not have required header obfuscation")
878 ih, err := cl.connBtHandshake(c, nil)
880 err = xerrors.Errorf("during bt handshake: %w", err)
889 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
890 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
895 c.PeerExtensionBytes = res.PeerExtensionBits
896 c.PeerID = res.PeerID
897 c.completedHandshake = time.Now()
898 if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
904 func (cl *Client) runReceivedConn(c *PeerConn) {
905 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
909 t, err := cl.receiveHandshakes(c)
912 "error receiving handshakes on %v: %s", c, err,
913 ).SetLevel(log.Debug).
915 "network", c.Network,
917 torrent.Add("error receiving handshake", 1)
919 cl.onBadAccept(c.RemoteAddr)
924 torrent.Add("received handshake for unloaded torrent", 1)
925 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
927 cl.onBadAccept(c.RemoteAddr)
931 torrent.Add("received handshake for loaded torrent", 1)
934 t.runHandshookConnLoggingErr(c)
937 // Client lock must be held before entering this.
938 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
940 if c.PeerID == cl.peerID {
943 addr := c.conn.RemoteAddr().String()
944 cl.dopplegangerAddrs[addr] = struct{}{}
946 // Because the remote address is not necessarily the same as its client's torrent listen
947 // address, we won't record the remote address as a doppleganger. Instead, the initiator
948 // can record *us* as the doppleganger.
950 return errors.New("local and remote peer ids are the same")
952 c.conn.SetWriteDeadline(time.Time{})
953 c.r = deadlineReader{c.conn, c.r}
954 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
955 if connIsIpv6(c.conn) {
956 torrent.Add("completed handshake over ipv6", 1)
958 if err := t.addPeerConn(c); err != nil {
959 return fmt.Errorf("adding connection: %w", err)
961 defer t.dropConnection(c)
962 go c.writer(time.Minute)
963 cl.sendInitialMessages(c, t)
964 err := c.mainReadLoop()
966 return fmt.Errorf("main read loop: %w", err)
971 // See the order given in Transmission's tr_peerMsgsNew.
972 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
973 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
974 conn.post(pp.Message{
976 ExtendedID: pp.HandshakeExtendedID,
977 ExtendedPayload: func() []byte {
978 msg := pp.ExtendedHandshakeMessage{
979 M: map[pp.ExtensionName]pp.ExtensionNumber{
980 pp.ExtensionNameMetadata: metadataExtendedId,
982 V: cl.config.ExtendedHandshakeClientVersion,
983 // If peer requests are buffered on read, this instructs the amount of memory
984 // that might be used to cache pending writes. Assuming 512KiB cached for
985 // sending, for 16KiB chunks.
987 YourIp: pp.CompactIp(conn.remoteIp()),
988 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
989 Port: cl.incomingPeerPort(),
990 MetadataSize: torrent.metadataSize(),
991 // TODO: We can figured these out specific to the socket
993 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
994 Ipv6: cl.config.PublicIp6.To16(),
996 if !cl.config.DisablePEX {
997 msg.M[pp.ExtensionNamePex] = pexExtendedId
999 return bencode.MustMarshal(msg)
1004 if conn.fastEnabled() {
1005 if torrent.haveAllPieces() {
1006 conn.post(pp.Message{Type: pp.HaveAll})
1007 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
1009 } else if !torrent.haveAnyPieces() {
1010 conn.post(pp.Message{Type: pp.HaveNone})
1011 conn.sentHaves.Clear()
1017 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
1018 conn.post(pp.Message{
1025 func (cl *Client) dhtPort() (ret uint16) {
1026 cl.eachDhtServer(func(s DhtServer) {
1027 ret = uint16(missinggo.AddrPort(s.Addr()))
1032 func (cl *Client) haveDhtServer() (ret bool) {
1033 cl.eachDhtServer(func(_ DhtServer) {
1039 // Process incoming ut_metadata message.
1040 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1041 var d map[string]int
1042 err := bencode.Unmarshal(payload, &d)
1043 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1044 } else if err != nil {
1045 return fmt.Errorf("error unmarshalling bencode: %s", err)
1047 msgType, ok := d["msg_type"]
1049 return errors.New("missing msg_type field")
1053 case pp.DataMetadataExtensionMsgType:
1054 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1055 if !c.requestedMetadataPiece(piece) {
1056 return fmt.Errorf("got unexpected piece %d", piece)
1058 c.metadataRequests[piece] = false
1059 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1060 if begin < 0 || begin >= len(payload) {
1061 return fmt.Errorf("data has bad offset in payload: %d", begin)
1063 t.saveMetadataPiece(piece, payload[begin:])
1064 c.lastUsefulChunkReceived = time.Now()
1065 err = t.maybeCompleteMetadata()
1067 // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
1068 // don't know who to blame. TODO: Also errors can be returned here that aren't related
1069 // to verifying metadata, which should be fixed. This should be tagged with metadata, so
1070 // log consumers can filter for this message.
1071 t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
1074 case pp.RequestMetadataExtensionMsgType:
1075 if !t.haveMetadataPiece(piece) {
1076 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1079 start := (1 << 14) * piece
1080 c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
1081 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1083 case pp.RejectMetadataExtensionMsgType:
1086 return errors.New("unknown msg_type value")
1090 func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
1091 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1092 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1097 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1101 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1104 if _, ok := cl.ipBlockRange(ip); ok {
1107 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1113 // Return a Torrent ready for insertion into a Client.
1114 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1115 // use provided storage, if provided
1116 storageClient := cl.defaultStorage
1117 if specStorage != nil {
1118 storageClient = storage.NewClient(specStorage)
1124 peers: prioritizedPeers{
1126 getPrio: func(p PeerInfo) peerPriority {
1128 return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
1131 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1133 halfOpen: make(map[string]PeerInfo),
1134 pieceStateChanges: pubsub.NewPubSub(),
1136 storageOpener: storageClient,
1137 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1139 networkingEnabled: true,
1140 metadataChanged: sync.Cond{
1143 webSeeds: make(map[string]*Peer),
1145 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1146 t.logger = cl.logger.WithContextValue(t)
1147 t.setChunkSize(defaultChunkSize)
1151 // A file-like handle to some torrent data resource.
1152 type Handle interface {
1159 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1160 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1163 // Adds a torrent by InfoHash with a custom Storage implementation.
1164 // If the torrent already exists then this Storage is ignored and the
1165 // existing torrent returned with `new` set to `false`
1166 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1169 t, ok := cl.torrents[infoHash]
1175 t = cl.newTorrent(infoHash, specStorage)
1176 cl.eachDhtServer(func(s DhtServer) {
1177 go t.dhtAnnouncer(s)
1179 cl.torrents[infoHash] = t
1180 cl.clearAcceptLimits()
1181 t.updateWantPeersEvent()
1182 // Tickle Client.waitAccept, new torrent may want conns.
1183 cl.event.Broadcast()
1187 // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
1188 // Torrent.MergeSpec.
1189 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1190 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1191 err = t.MergeSpec(spec)
1192 if err != nil && new {
1198 type stringAddr string
1200 var _ net.Addr = stringAddr("")
1202 func (stringAddr) Network() string { return "" }
1203 func (me stringAddr) String() string { return string(me) }
1205 // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
1206 // spec.DisallowDataDownload/Upload will be read and applied
1207 // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
1208 func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
1209 if spec.DisplayName != "" {
1210 t.SetDisplayName(spec.DisplayName)
1212 if spec.InfoBytes != nil {
1213 err := t.SetInfoBytes(spec.InfoBytes)
1219 cl.AddDhtNodes(spec.DhtNodes)
1222 useTorrentSources(spec.Sources, t)
1223 for _, url := range spec.Webseeds {
1226 for _, peerAddr := range spec.PeerAddrs {
1228 Addr: stringAddr(peerAddr),
1229 Source: PeerSourceDirect,
1233 if spec.ChunkSize != 0 {
1234 t.setChunkSize(pp.Integer(spec.ChunkSize))
1236 t.addTrackers(spec.Trackers)
1238 t.dataDownloadDisallowed = spec.DisallowDataDownload
1239 t.dataUploadDisallowed = spec.DisallowDataUpload
1243 func useTorrentSources(sources []string, t *Torrent) {
1244 for _, s := range sources {
1246 err := useTorrentSource(s, t)
1248 t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
1250 t.logger.Printf("successfully used source %q", s)
1256 func useTorrentSource(source string, t *Torrent) error {
1257 req, err := http.NewRequest(http.MethodGet, source, nil)
1261 ctx, cancel := context.WithCancel(context.Background())
1271 req = req.WithContext(ctx)
1272 resp, err := http.DefaultClient.Do(req)
1276 mi, err := metainfo.Load(resp.Body)
1278 if ctx.Err() != nil {
1283 return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
1286 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1287 t, ok := cl.torrents[infoHash]
1289 err = fmt.Errorf("no such torrent")
1296 delete(cl.torrents, infoHash)
1300 func (cl *Client) allTorrentsCompleted() bool {
1301 for _, t := range cl.torrents {
1305 if !t.haveAllPieces() {
1312 // Returns true when all torrents are completely downloaded and false if the
1313 // client is stopped before that.
1314 func (cl *Client) WaitAll() bool {
1317 for !cl.allTorrentsCompleted() {
1318 if cl.closed.IsSet() {
1326 // Returns handles to all the torrents loaded in the Client.
1327 func (cl *Client) Torrents() []*Torrent {
1330 return cl.torrentsAsSlice()
1333 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1334 for _, t := range cl.torrents {
1335 ret = append(ret, t)
1340 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1341 spec, err := TorrentSpecFromMagnetUri(uri)
1345 T, _, err = cl.AddTorrentSpec(spec)
1349 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1350 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1354 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1355 mi, err := metainfo.LoadFromFile(filename)
1359 return cl.AddTorrent(mi)
1362 func (cl *Client) DhtServers() []DhtServer {
1363 return cl.dhtServers
1366 func (cl *Client) AddDhtNodes(nodes []string) {
1367 for _, n := range nodes {
1368 hmp := missinggo.SplitHostMaybePort(n)
1369 ip := net.ParseIP(hmp.Host)
1371 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1374 ni := krpc.NodeInfo{
1375 Addr: krpc.NodeAddr{
1380 cl.eachDhtServer(func(s DhtServer) {
1386 func (cl *Client) banPeerIP(ip net.IP) {
1387 cl.logger.Printf("banning ip %v", ip)
1388 if cl.badPeerIPs == nil {
1389 cl.badPeerIPs = make(map[string]struct{})
1391 cl.badPeerIPs[ip.String()] = struct{}{}
1394 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
1403 PeerMaxRequests: 250,
1405 RemoteAddr: remoteAddr,
1407 callbacks: &cl.config.Callbacks,
1409 connString: connString,
1411 writeBuffer: new(bytes.Buffer),
1414 c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
1415 c.writerCond.L = cl.locker()
1416 c.setRW(connStatsReadWriter{nc, c})
1417 c.r = &rateLimitedReader{
1418 l: cl.config.DownloadRateLimiter,
1421 c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1422 for _, f := range cl.config.Callbacks.NewPeer {
1428 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1435 t.addPeers([]PeerInfo{{
1436 Addr: ipPortAddr{ip, port},
1437 Source: PeerSourceDhtAnnouncePeer,
1441 func firstNotNil(ips ...net.IP) net.IP {
1442 for _, ip := range ips {
1450 func (cl *Client) eachDialer(f func(Dialer) bool) {
1451 for _, s := range cl.dialers {
1458 func (cl *Client) eachListener(f func(Listener) bool) {
1459 for _, s := range cl.listeners {
1466 func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
1467 cl.eachListener(func(l Listener) bool {
1474 func (cl *Client) publicIp(peer net.IP) net.IP {
1475 // TODO: Use BEP 10 to determine how peers are seeing us.
1476 if peer.To4() != nil {
1478 cl.config.PublicIp4,
1479 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1484 cl.config.PublicIp6,
1485 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1489 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1490 l := cl.findListener(
1491 func(l Listener) bool {
1492 return f(addrIpOrNil(l.Addr()))
1498 return addrIpOrNil(l.Addr())
1501 // Our IP as a peer should see it.
1502 func (cl *Client) publicAddr(peer net.IP) IpPort {
1503 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1506 // ListenAddrs addresses currently being listened to.
1507 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1510 cl.eachListener(func(l Listener) bool {
1511 ret = append(ret, l.Addr())
1517 func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
1518 ipa, ok := tryIpPortFromNetAddr(addr)
1522 ip := maskIpForAcceptLimiting(ipa.IP)
1523 if cl.acceptLimiter == nil {
1524 cl.acceptLimiter = make(map[ipStr]int)
1526 cl.acceptLimiter[ipStr(ip.String())]++
1529 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1530 if ip4 := ip.To4(); ip4 != nil {
1531 return ip4.Mask(net.CIDRMask(24, 32))
1536 func (cl *Client) clearAcceptLimits() {
1537 cl.acceptLimiter = nil
1540 func (cl *Client) acceptLimitClearer() {
1543 case <-cl.closed.LockedChan(cl.locker()):
1545 case <-time.After(15 * time.Minute):
1547 cl.clearAcceptLimits()
1553 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1554 if cl.config.DisableAcceptRateLimiting {
1557 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1560 func (cl *Client) rLock() {
1564 func (cl *Client) rUnlock() {
1568 func (cl *Client) lock() {
1572 func (cl *Client) unlock() {
1576 func (cl *Client) locker() *lockWithDeferreds {
1580 func (cl *Client) String() string {
1581 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1584 // Returns connection-level aggregate stats at the Client level. See the comment on
1585 // TorrentStats.ConnStats.
1586 func (cl *Client) ConnStats() ConnStats {
1587 return cl.stats.Copy()