17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/missinggo/v2"
26 "github.com/anacrolix/missinggo/v2/conntrack"
27 "github.com/anacrolix/sync"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "golang.org/x/time/rate"
32 "golang.org/x/xerrors"
34 "github.com/anacrolix/torrent/bencode"
35 "github.com/anacrolix/torrent/iplist"
36 "github.com/anacrolix/torrent/metainfo"
37 "github.com/anacrolix/torrent/mse"
38 pp "github.com/anacrolix/torrent/peer_protocol"
39 "github.com/anacrolix/torrent/storage"
42 // Clients contain zero or more Torrents. A Client manages a blocklist, the
43 // TCP/UDP protocol ports, and DHT as desired.
45 // An aggregate of stats over all connections. First in struct to ensure
46 // 64-bit alignment of fields. See #262.
51 closed missinggo.Event
57 defaultStorage *storage.Client
61 dhtServers []DhtServer
62 ipBlockList iplist.Ranger
63 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
64 extensionBytes pp.PeerExtensionBits
66 // Set of addresses that have our client ID. This intentionally will
67 // include ourselves if we end up trying to connect to our own address
68 // through legitimate channels.
69 dopplegangerAddrs map[string]struct{}
70 badPeerIPs map[string]struct{}
71 torrents map[InfoHash]*Torrent
73 acceptLimiter map[ipStr]int
74 dialRateLimiter *rate.Limiter
79 func (cl *Client) BadPeerIPs() []string {
82 return cl.badPeerIPsLocked()
85 func (cl *Client) badPeerIPsLocked() []string {
86 return slices.FromMapKeys(cl.badPeerIPs).([]string)
89 func (cl *Client) PeerID() PeerID {
93 // Returns the port number for the first listener that has one. No longer assumes that all port
94 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
96 func (cl *Client) LocalPort() (port int) {
97 cl.eachListener(func(l Listener) bool {
98 port = addrPortOrZero(l.Addr())
104 func writeDhtServerStatus(w io.Writer, s DhtServer) {
105 dhtStats := s.Stats()
106 fmt.Fprintf(w, " ID: %x\n", s.ID())
107 spew.Fdump(w, dhtStats)
110 // Writes out a human readable status of the client, such as for writing to a
112 func (cl *Client) WriteStatus(_w io.Writer) {
115 w := bufio.NewWriter(_w)
117 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
118 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
119 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
120 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
121 cl.eachDhtServer(func(s DhtServer) {
122 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
123 writeDhtServerStatus(w, s)
125 spew.Fdump(w, &cl.stats)
126 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
128 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
129 return l.InfoHash().AsString() < r.InfoHash().AsString()
132 fmt.Fprint(w, "<unknown name>")
134 fmt.Fprint(w, t.name())
138 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
140 w.WriteString("<missing metainfo>")
148 const debugLogValue = log.Debug
150 func (cl *Client) debugLogFilter(m log.Msg) bool {
154 return !m.HasValue(debugLogValue)
157 func (cl *Client) initLogger() {
158 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
161 func (cl *Client) announceKey() int32 {
162 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
165 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
167 cfg = NewDefaultClientConfig()
177 dopplegangerAddrs: make(map[string]struct{}),
178 torrents: make(map[metainfo.Hash]*Torrent),
179 dialRateLimiter: rate.NewLimiter(10, 10),
181 go cl.acceptLimitClearer()
189 cl.extensionBytes = defaultPeerExtensionBytes()
190 cl.event.L = cl.locker()
191 storageImpl := cfg.DefaultStorage
192 if storageImpl == nil {
193 // We'd use mmap by default but HFS+ doesn't support sparse files.
194 storageImplCloser := storage.NewFile(cfg.DataDir)
195 cl.onClose = append(cl.onClose, func() {
196 if err := storageImplCloser.Close(); err != nil {
197 cl.logger.Printf("error closing default storage: %s", err)
200 storageImpl = storageImplCloser
202 cl.defaultStorage = storage.NewClient(storageImpl)
203 if cfg.IPBlocklist != nil {
204 cl.ipBlockList = cfg.IPBlocklist
207 if cfg.PeerID != "" {
208 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
210 o := copy(cl.peerID[:], cfg.Bep20)
211 _, err = rand.Read(cl.peerID[o:])
213 panic("error generating peer id")
217 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
225 for _, _s := range sockets {
226 s := _s // Go is fucking retarded.
227 cl.onClose = append(cl.onClose, func() { s.Close() })
228 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
229 cl.dialers = append(cl.dialers, s)
230 cl.listeners = append(cl.listeners, s)
231 go cl.acceptConnections(s)
237 for _, s := range sockets {
238 if pc, ok := s.(net.PacketConn); ok {
239 ds, err := cl.newAnacrolixDhtServer(pc)
243 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
244 cl.onClose = append(cl.onClose, func() { ds.Close() })
252 func (cl *Client) AddDhtServer(d DhtServer) {
253 cl.dhtServers = append(cl.dhtServers, d)
256 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
257 // given address for any Torrent.
258 func (cl *Client) AddDialer(d Dialer) {
259 cl.dialers = append(cl.dialers, d)
262 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
264 func (cl *Client) AddListener(l Listener) {
265 cl.listeners = append(cl.listeners, l)
266 go cl.acceptConnections(l)
269 func (cl *Client) firewallCallback(net.Addr) bool {
271 block := !cl.wantConns()
274 torrent.Add("connections firewalled", 1)
276 torrent.Add("connections not firewalled", 1)
281 func (cl *Client) listenOnNetwork(n network) bool {
282 if n.Ipv4 && cl.config.DisableIPv4 {
285 if n.Ipv6 && cl.config.DisableIPv6 {
288 if n.Tcp && cl.config.DisableTCP {
291 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
297 func (cl *Client) listenNetworks() (ns []network) {
298 for _, n := range allPeerNetworks {
299 if cl.listenOnNetwork(n) {
306 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
307 cfg := dht.ServerConfig{
308 IPBlocklist: cl.ipBlockList,
310 OnAnnouncePeer: cl.onDHTAnnouncePeer,
311 PublicIP: func() net.IP {
312 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
313 return cl.config.PublicIp6
315 return cl.config.PublicIp4
317 StartingNodes: cl.config.DhtStartingNodes,
318 ConnectionTracking: cl.config.ConnTracker,
319 OnQuery: cl.config.DHTOnQuery,
320 Logger: cl.logger.WithValues("dht", conn.LocalAddr().String()),
322 s, err = dht.NewServer(&cfg)
325 ts, err := s.Bootstrap()
327 cl.logger.Printf("error bootstrapping dht: %s", err)
329 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
335 func (cl *Client) Closed() <-chan struct{} {
341 func (cl *Client) eachDhtServer(f func(DhtServer)) {
342 for _, ds := range cl.dhtServers {
347 // Stops the client. All connections to peers are closed and all activity will
349 func (cl *Client) Close() {
353 for _, t := range cl.torrents {
356 for i := range cl.onClose {
357 cl.onClose[len(cl.onClose)-1-i]()
362 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
363 if cl.ipBlockList == nil {
366 return cl.ipBlockList.Lookup(ip)
369 func (cl *Client) ipIsBlocked(ip net.IP) bool {
370 _, blocked := cl.ipBlockRange(ip)
374 func (cl *Client) wantConns() bool {
375 for _, t := range cl.torrents {
383 func (cl *Client) waitAccept() {
385 if cl.closed.IsSet() {
395 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
396 func (cl *Client) rejectAccepted(conn net.Conn) error {
397 ra := conn.RemoteAddr()
398 if rip := addrIpOrNil(ra); rip != nil {
399 if cl.config.DisableIPv4Peers && rip.To4() != nil {
400 return errors.New("ipv4 peers disabled")
402 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
403 return errors.New("ipv4 disabled")
406 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
407 return errors.New("ipv6 disabled")
409 if cl.rateLimitAccept(rip) {
410 return errors.New("source IP accepted rate limited")
412 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
413 return errors.New("bad source addr")
419 func (cl *Client) acceptConnections(l net.Listener) {
421 conn, err := l.Accept()
422 torrent.Add("client listener accepts", 1)
423 conn = pproffd.WrapNetConn(conn)
425 closed := cl.closed.IsSet()
428 reject = cl.rejectAccepted(conn)
438 log.Fmsg("error accepting connection: %s", err).AddValue(debugLogValue).Log(cl.logger)
443 torrent.Add("rejected accepted connections", 1)
444 log.Fmsg("rejecting accepted conn: %v", reject).AddValue(debugLogValue).Log(cl.logger)
447 go cl.incomingConnection(conn)
449 log.Fmsg("accepted %q connection at %q from %q",
453 ).AddValue(debugLogValue).Log(cl.logger)
454 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
455 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
456 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
461 func (cl *Client) incomingConnection(nc net.Conn) {
463 if tc, ok := nc.(*net.TCPConn); ok {
466 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network())
467 c.Discovery = PeerSourceIncoming
468 cl.runReceivedConn(c)
471 // Returns a handle to the given torrent, if it's present in the client.
472 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
475 t, ok = cl.torrents[ih]
479 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
480 return cl.torrents[ih]
483 type dialResult struct {
488 func countDialResult(err error) {
490 torrent.Add("successful dials", 1)
492 torrent.Add("unsuccessful dials", 1)
496 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
497 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
498 if ret < minDialTimeout {
504 // Returns whether an address is known to connect to a client with our own ID.
505 func (cl *Client) dopplegangerAddr(addr string) bool {
506 _, ok := cl.dopplegangerAddrs[addr]
510 // Returns a connection over UTP or TCP, whichever is first to connect.
511 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
513 t := perf.NewTimer(perf.CallerName(0))
516 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
518 t.Mark("returned conn over " + res.Network)
522 ctx, cancel := context.WithCancel(ctx)
523 // As soon as we return one connection, cancel the others.
526 resCh := make(chan dialResult, left)
530 cl.eachDialer(func(s Dialer) bool {
533 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
536 cl.dialFromSocket(ctx, s, addr),
537 s.LocalAddr().Network(),
544 // Wait for a successful connection.
546 defer perf.ScopeTimer()()
547 for ; left > 0 && res.Conn == nil; left-- {
551 // There are still incompleted dials.
553 for ; left > 0; left-- {
554 conn := (<-resCh).Conn
561 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
563 //if res.Conn != nil {
564 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
566 // cl.logger.Printf("failed to dial %s", addr)
571 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
572 network := s.LocalAddr().Network()
573 cte := cl.config.ConnTracker.Wait(
575 conntrack.Entry{network, s.LocalAddr().String(), addr},
576 "dial torrent client",
579 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
580 // which dial errors allow us to forget the connection tracking entry handle.
581 if ctx.Err() != nil {
587 c, err := s.Dial(ctx, addr)
588 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
589 // it now in case we close the connection forthwith.
590 if tc, ok := c.(*net.TCPConn); ok {
595 if err != nil && forgettableDialError(err) {
602 return closeWrapper{c, func() error {
609 func forgettableDialError(err error) bool {
610 return strings.Contains(err.Error(), "no suitable address found")
613 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
614 if _, ok := t.halfOpen[addr]; !ok {
615 panic("invariant broken")
617 delete(t.halfOpen, addr)
621 // Performs initiator handshakes and returns a connection. Returns nil
622 // *connection if no connection for valid reasons.
623 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr net.Addr, network string) (c *PeerConn, err error) {
624 c = cl.newConnection(nc, true, remoteAddr, network)
625 c.headerEncrypted = encryptHeader
626 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
628 dl, ok := ctx.Deadline()
632 err = nc.SetDeadline(dl)
636 err = cl.initiateHandshakes(c, t)
640 // Returns nil connection and nil error if no connection could be established
641 // for valid reasons.
642 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
643 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
646 return t.dialTimeout()
649 dr := cl.dialFirst(dialCtx, addr.String())
652 if dialCtx.Err() != nil {
653 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
655 return nil, errors.New("dial failed")
657 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
664 // Returns nil connection and nil error if no connection could be established
665 // for valid reasons.
666 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
667 torrent.Add("establish outgoing connection", 1)
668 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
669 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
671 torrent.Add("initiated conn with preferred header obfuscation", 1)
674 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
675 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
676 // We should have just tried with the preferred header obfuscation. If it was required,
677 // there's nothing else to try.
680 // Try again with encryption if we didn't earlier, or without if we did.
681 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
683 torrent.Add("initiated conn with fallback header obfuscation", 1)
685 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
689 // Called to dial out and run a connection. The addr we're given is already
690 // considered half-open.
691 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
692 cl.dialRateLimiter.Wait(context.Background())
693 c, err := cl.establishOutgoingConn(t, addr)
696 // Don't release lock between here and addConnection, unless it's for
698 cl.noLongerHalfOpen(t, addr.String())
701 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
708 cl.runHandshookConn(c, t)
711 // The port number for incoming peer connections. 0 if the client isn't listening.
712 func (cl *Client) incomingPeerPort() int {
713 return cl.LocalPort()
716 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
717 if c.headerEncrypted {
720 rw, c.cryptoMethod, err = mse.InitiateHandshake(
727 cl.config.CryptoProvides,
731 return xerrors.Errorf("header obfuscation handshake: %w", err)
734 ih, err := cl.connBtHandshake(c, &t.infoHash)
736 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
738 if ih != t.infoHash {
739 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
744 // Calls f with any secret keys.
745 func (cl *Client) forSkeys(f func([]byte) bool) {
748 if false { // Emulate the bug from #114
750 for ih := range cl.torrents {
754 for range cl.torrents {
761 for ih := range cl.torrents {
768 // Do encryption and bittorrent handshakes as receiver.
769 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
770 defer perf.ScopeTimerErr(&err)()
772 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
774 if err == nil || err == mse.ErrNoSecretKeyMatch {
775 if c.headerEncrypted {
776 torrent.Add("handshakes received encrypted", 1)
778 torrent.Add("handshakes received unencrypted", 1)
781 torrent.Add("handshakes received with error while handling encryption", 1)
784 if err == mse.ErrNoSecretKeyMatch {
789 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
790 err = errors.New("connection not have required header obfuscation")
793 ih, err := cl.connBtHandshake(c, nil)
795 err = xerrors.Errorf("during bt handshake: %w", err)
804 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
805 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
810 c.PeerExtensionBytes = res.PeerExtensionBits
811 c.PeerID = res.PeerID
812 c.completedHandshake = time.Now()
816 func (cl *Client) runReceivedConn(c *PeerConn) {
817 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
821 t, err := cl.receiveHandshakes(c)
824 "error receiving handshakes on %v: %s", c, err,
828 "network", c.network,
830 torrent.Add("error receiving handshake", 1)
832 cl.onBadAccept(c.remoteAddr)
837 torrent.Add("received handshake for unloaded torrent", 1)
838 log.Fmsg("received handshake for unloaded torrent").AddValue(debugLogValue).Log(cl.logger)
840 cl.onBadAccept(c.remoteAddr)
844 torrent.Add("received handshake for loaded torrent", 1)
847 cl.runHandshookConn(c, t)
850 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
852 if c.PeerID == cl.peerID {
855 addr := c.conn.RemoteAddr().String()
856 cl.dopplegangerAddrs[addr] = struct{}{}
858 // Because the remote address is not necessarily the same as its client's torrent listen
859 // address, we won't record the remote address as a doppleganger. Instead, the initiator
860 // can record *us* as the doppleganger.
864 c.conn.SetWriteDeadline(time.Time{})
865 c.r = deadlineReader{c.conn, c.r}
866 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
867 if connIsIpv6(c.conn) {
868 torrent.Add("completed handshake over ipv6", 1)
870 if err := t.addConnection(c); err != nil {
871 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
874 defer t.dropConnection(c)
875 go c.writer(time.Minute)
876 cl.sendInitialMessages(c, t)
877 err := c.mainReadLoop()
878 if err != nil && cl.config.Debug {
879 cl.logger.Printf("error during connection main read loop: %s", err)
883 // See the order given in Transmission's tr_peerMsgsNew.
884 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
885 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
886 conn.post(pp.Message{
888 ExtendedID: pp.HandshakeExtendedID,
889 ExtendedPayload: func() []byte {
890 msg := pp.ExtendedHandshakeMessage{
891 M: map[pp.ExtensionName]pp.ExtensionNumber{
892 pp.ExtensionNameMetadata: metadataExtendedId,
894 V: cl.config.ExtendedHandshakeClientVersion,
895 Reqq: 64, // TODO: Really?
896 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
897 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
898 Port: cl.incomingPeerPort(),
899 MetadataSize: torrent.metadataSize(),
900 // TODO: We can figured these out specific to the socket
902 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
903 Ipv6: cl.config.PublicIp6.To16(),
905 if !cl.config.DisablePEX {
906 msg.M[pp.ExtensionNamePex] = pexExtendedId
908 return bencode.MustMarshal(msg)
913 if conn.fastEnabled() {
914 if torrent.haveAllPieces() {
915 conn.post(pp.Message{Type: pp.HaveAll})
916 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
918 } else if !torrent.haveAnyPieces() {
919 conn.post(pp.Message{Type: pp.HaveNone})
920 conn.sentHaves.Clear()
926 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
927 conn.post(pp.Message{
934 func (cl *Client) dhtPort() (ret uint16) {
935 cl.eachDhtServer(func(s DhtServer) {
936 ret = uint16(missinggo.AddrPort(s.Addr()))
941 func (cl *Client) haveDhtServer() (ret bool) {
942 cl.eachDhtServer(func(_ DhtServer) {
948 // Process incoming ut_metadata message.
949 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
951 err := bencode.Unmarshal(payload, &d)
952 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
953 } else if err != nil {
954 return fmt.Errorf("error unmarshalling bencode: %s", err)
956 msgType, ok := d["msg_type"]
958 return errors.New("missing msg_type field")
962 case pp.DataMetadataExtensionMsgType:
963 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
964 if !c.requestedMetadataPiece(piece) {
965 return fmt.Errorf("got unexpected piece %d", piece)
967 c.metadataRequests[piece] = false
968 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
969 if begin < 0 || begin >= len(payload) {
970 return fmt.Errorf("data has bad offset in payload: %d", begin)
972 t.saveMetadataPiece(piece, payload[begin:])
973 c.lastUsefulChunkReceived = time.Now()
974 return t.maybeCompleteMetadata()
975 case pp.RequestMetadataExtensionMsgType:
976 if !t.haveMetadataPiece(piece) {
977 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
980 start := (1 << 14) * piece
981 c.logger.Printf("sending metadata piece %d", piece)
982 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
984 case pp.RejectMetadataExtensionMsgType:
987 return errors.New("unknown msg_type value")
991 func (cl *Client) badPeerAddr(addr net.Addr) bool {
992 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
993 return cl.badPeerIPPort(ipa.IP, ipa.Port)
998 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1002 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1005 if _, ok := cl.ipBlockRange(ip); ok {
1008 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1014 // Return a Torrent ready for insertion into a Client.
1015 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1016 // use provided storage, if provided
1017 storageClient := cl.defaultStorage
1018 if specStorage != nil {
1019 storageClient = storage.NewClient(specStorage)
1025 peers: prioritizedPeers{
1027 getPrio: func(p Peer) peerPriority {
1028 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1031 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1033 halfOpen: make(map[string]Peer),
1034 pieceStateChanges: pubsub.NewPubSub(),
1036 storageOpener: storageClient,
1037 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1039 networkingEnabled: true,
1040 metadataChanged: sync.Cond{
1044 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1045 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1046 return fmt.Sprintf("%v: %s", t, m.Text())
1048 t.setChunkSize(defaultChunkSize)
1052 // A file-like handle to some torrent data resource.
1053 type Handle interface {
1060 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1061 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1064 // Adds a torrent by InfoHash with a custom Storage implementation.
1065 // If the torrent already exists then this Storage is ignored and the
1066 // existing torrent returned with `new` set to `false`
1067 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1070 t, ok := cl.torrents[infoHash]
1076 t = cl.newTorrent(infoHash, specStorage)
1077 cl.eachDhtServer(func(s DhtServer) {
1078 go t.dhtAnnouncer(s)
1080 cl.torrents[infoHash] = t
1081 cl.clearAcceptLimits()
1082 t.updateWantPeersEvent()
1083 // Tickle Client.waitAccept, new torrent may want conns.
1084 cl.event.Broadcast()
1088 // Add or merge a torrent spec. If the torrent is already present, the
1089 // trackers will be merged with the existing ones. If the Info isn't yet
1090 // known, it will be set. The display name is replaced if the new spec
1091 // provides one. Returns new if the torrent wasn't already in the client.
1092 // Note that any `Storage` defined on the spec will be ignored if the
1093 // torrent is already present (i.e. `new` return value is `true`)
1094 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1095 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1096 if spec.DisplayName != "" {
1097 t.SetDisplayName(spec.DisplayName)
1099 if spec.InfoBytes != nil {
1100 err = t.SetInfoBytes(spec.InfoBytes)
1107 if spec.ChunkSize != 0 {
1108 t.setChunkSize(pp.Integer(spec.ChunkSize))
1110 t.addTrackers(spec.Trackers)
1115 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1116 t, ok := cl.torrents[infoHash]
1118 err = fmt.Errorf("no such torrent")
1125 delete(cl.torrents, infoHash)
1129 func (cl *Client) allTorrentsCompleted() bool {
1130 for _, t := range cl.torrents {
1134 if !t.haveAllPieces() {
1141 // Returns true when all torrents are completely downloaded and false if the
1142 // client is stopped before that.
1143 func (cl *Client) WaitAll() bool {
1146 for !cl.allTorrentsCompleted() {
1147 if cl.closed.IsSet() {
1155 // Returns handles to all the torrents loaded in the Client.
1156 func (cl *Client) Torrents() []*Torrent {
1159 return cl.torrentsAsSlice()
1162 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1163 for _, t := range cl.torrents {
1164 ret = append(ret, t)
1169 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1170 spec, err := TorrentSpecFromMagnetURI(uri)
1174 T, _, err = cl.AddTorrentSpec(spec)
1178 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1179 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1181 slices.MakeInto(&ss, mi.Nodes)
1186 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1187 mi, err := metainfo.LoadFromFile(filename)
1191 return cl.AddTorrent(mi)
1194 func (cl *Client) DhtServers() []DhtServer {
1195 return cl.dhtServers
1198 func (cl *Client) AddDHTNodes(nodes []string) {
1199 for _, n := range nodes {
1200 hmp := missinggo.SplitHostMaybePort(n)
1201 ip := net.ParseIP(hmp.Host)
1203 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1206 ni := krpc.NodeInfo{
1207 Addr: krpc.NodeAddr{
1212 cl.eachDhtServer(func(s DhtServer) {
1218 func (cl *Client) banPeerIP(ip net.IP) {
1219 cl.logger.Printf("banning ip %v", ip)
1220 if cl.badPeerIPs == nil {
1221 cl.badPeerIPs = make(map[string]struct{})
1223 cl.badPeerIPs[ip.String()] = struct{}{}
1226 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network string) (c *PeerConn) {
1232 PeerMaxRequests: 250,
1233 writeBuffer: new(bytes.Buffer),
1234 remoteAddr: remoteAddr,
1237 c.logger = cl.logger.WithValues(c,
1238 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1239 ).WithText(func(m log.Msg) string {
1240 return fmt.Sprintf("%v: %s", c, m.Text())
1242 c.writerCond.L = cl.locker()
1243 c.setRW(connStatsReadWriter{nc, c})
1244 c.r = &rateLimitedReader{
1245 l: cl.config.DownloadRateLimiter,
1248 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1252 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1260 Addr: ipPortAddr{ip, port},
1261 Source: PeerSourceDhtAnnouncePeer,
1265 func firstNotNil(ips ...net.IP) net.IP {
1266 for _, ip := range ips {
1274 func (cl *Client) eachDialer(f func(Dialer) bool) {
1275 for _, s := range cl.dialers {
1282 func (cl *Client) eachListener(f func(Listener) bool) {
1283 for _, s := range cl.listeners {
1290 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1291 cl.eachListener(func(l Listener) bool {
1298 func (cl *Client) publicIp(peer net.IP) net.IP {
1299 // TODO: Use BEP 10 to determine how peers are seeing us.
1300 if peer.To4() != nil {
1302 cl.config.PublicIp4,
1303 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1308 cl.config.PublicIp6,
1309 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1313 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1316 func(l net.Listener) bool {
1317 return f(addrIpOrNil(l.Addr()))
1323 // Our IP as a peer should see it.
1324 func (cl *Client) publicAddr(peer net.IP) IpPort {
1325 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1328 // ListenAddrs addresses currently being listened to.
1329 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1332 cl.eachListener(func(l Listener) bool {
1333 ret = append(ret, l.Addr())
1339 func (cl *Client) onBadAccept(addr net.Addr) {
1340 ipa, ok := tryIpPortFromNetAddr(addr)
1344 ip := maskIpForAcceptLimiting(ipa.IP)
1345 if cl.acceptLimiter == nil {
1346 cl.acceptLimiter = make(map[ipStr]int)
1348 cl.acceptLimiter[ipStr(ip.String())]++
1351 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1352 if ip4 := ip.To4(); ip4 != nil {
1353 return ip4.Mask(net.CIDRMask(24, 32))
1358 func (cl *Client) clearAcceptLimits() {
1359 cl.acceptLimiter = nil
1362 func (cl *Client) acceptLimitClearer() {
1365 case <-cl.closed.LockedChan(cl.locker()):
1367 case <-time.After(15 * time.Minute):
1369 cl.clearAcceptLimits()
1375 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1376 if cl.config.DisableAcceptRateLimiting {
1379 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1382 func (cl *Client) rLock() {
1386 func (cl *Client) rUnlock() {
1390 func (cl *Client) lock() {
1394 func (cl *Client) unlock() {
1398 func (cl *Client) locker() *lockWithDeferreds {
1402 func (cl *Client) String() string {
1403 return fmt.Sprintf("<%[1]T %[1]p>", cl)