19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/perf"
25 "github.com/anacrolix/missinggo/pproffd"
26 "github.com/anacrolix/missinggo/pubsub"
27 "github.com/anacrolix/missinggo/slices"
28 "github.com/anacrolix/sync"
29 "github.com/anacrolix/torrent/bencode"
30 "github.com/anacrolix/torrent/iplist"
31 "github.com/anacrolix/torrent/metainfo"
32 "github.com/anacrolix/torrent/mse"
33 pp "github.com/anacrolix/torrent/peer_protocol"
34 "github.com/anacrolix/torrent/storage"
35 "github.com/davecgh/go-spew/spew"
36 "github.com/dustin/go-humanize"
37 "github.com/google/btree"
38 "golang.org/x/time/rate"
41 // Clients contain zero or more Torrents. A Client manages a blocklist, the
42 // TCP/UDP protocol ports, and DHT as desired.
44 // An aggregate of stats over all connections. First in struct to ensure
45 // 64-bit alignment of fields. See #262.
50 closed missinggo.Event
56 defaultStorage *storage.Client
59 dhtServers []*dht.Server
60 ipBlockList iplist.Ranger
61 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
62 extensionBytes pp.PeerExtensionBits
64 // Set of addresses that have our client ID. This intentionally will
65 // include ourselves if we end up trying to connect to our own address
66 // through legitimate channels.
67 dopplegangerAddrs map[string]struct{}
68 badPeerIPs map[string]struct{}
69 torrents map[InfoHash]*Torrent
71 acceptLimiter map[ipStr]int
72 dialRateLimiter *rate.Limiter
77 func (cl *Client) BadPeerIPs() []string {
80 return cl.badPeerIPsLocked()
83 func (cl *Client) badPeerIPsLocked() []string {
84 return slices.FromMapKeys(cl.badPeerIPs).([]string)
87 func (cl *Client) PeerID() PeerID {
91 type torrentAddr string
93 func (torrentAddr) Network() string { return "" }
95 func (me torrentAddr) String() string { return string(me) }
97 func (cl *Client) LocalPort() (port int) {
98 cl.eachListener(func(l socket) bool {
99 _port := missinggo.AddrPort(l.Addr())
105 } else if port != _port {
106 panic("mismatched ports")
113 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
114 dhtStats := s.Stats()
115 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
116 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
117 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
118 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
121 // Writes out a human readable status of the client, such as for writing to a
123 func (cl *Client) WriteStatus(_w io.Writer) {
126 w := bufio.NewWriter(_w)
128 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
129 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
130 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
131 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
132 cl.eachDhtServer(func(s *dht.Server) {
133 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
134 writeDhtServerStatus(w, s)
136 spew.Fdump(w, cl.stats)
137 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
139 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
140 return l.InfoHash().AsString() < r.InfoHash().AsString()
143 fmt.Fprint(w, "<unknown name>")
145 fmt.Fprint(w, t.name())
149 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
151 w.WriteString("<missing metainfo>")
159 const debugLogValue = "debug"
161 func (cl *Client) debugLogFilter(m *log.Msg) bool {
162 if !cl.config.Debug {
163 _, ok := m.Values()[debugLogValue]
169 func (cl *Client) initLogger() {
170 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
173 func (cl *Client) announceKey() int32 {
174 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
177 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
179 cfg = NewDefaultClientConfig()
188 dopplegangerAddrs: make(map[string]struct{}),
189 torrents: make(map[metainfo.Hash]*Torrent),
190 dialRateLimiter: rate.NewLimiter(10, 10),
192 go cl.acceptLimitClearer()
200 cl.extensionBytes = defaultPeerExtensionBytes()
201 cl.event.L = cl.locker()
202 storageImpl := cfg.DefaultStorage
203 if storageImpl == nil {
204 // We'd use mmap but HFS+ doesn't support sparse files.
205 storageImpl = storage.NewFile(cfg.DataDir)
206 cl.onClose = append(cl.onClose, func() {
207 if err := storageImpl.Close(); err != nil {
208 log.Printf("error closing default storage: %s", err)
212 cl.defaultStorage = storage.NewClient(storageImpl)
213 if cfg.IPBlocklist != nil {
214 cl.ipBlockList = cfg.IPBlocklist
217 if cfg.PeerID != "" {
218 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
220 o := copy(cl.peerID[:], cfg.Bep20)
221 _, err = rand.Read(cl.peerID[o:])
223 panic("error generating peer id")
227 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
228 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
229 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
233 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
240 for _, s := range cl.conns {
241 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
242 go cl.acceptConnections(s)
248 for _, s := range cl.conns {
249 if pc, ok := s.(net.PacketConn); ok {
250 ds, err := cl.newDhtServer(pc)
254 cl.dhtServers = append(cl.dhtServers, ds)
262 func (cl *Client) firewallCallback(net.Addr) bool {
264 block := !cl.wantConns()
267 torrent.Add("connections firewalled", 1)
269 torrent.Add("connections not firewalled", 1)
274 func (cl *Client) enabledPeerNetworks() (ns []string) {
275 for _, n := range allPeerNetworks {
276 if peerNetworkEnabled(n, cl.config) {
283 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
284 cfg := dht.ServerConfig{
285 IPBlocklist: cl.ipBlockList,
287 OnAnnouncePeer: cl.onDHTAnnouncePeer,
288 PublicIP: func() net.IP {
289 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
290 return cl.config.PublicIp6
292 return cl.config.PublicIp4
294 StartingNodes: cl.config.DhtStartingNodes,
296 s, err = dht.NewServer(&cfg)
299 if _, err := s.Bootstrap(); err != nil {
300 log.Printf("error bootstrapping dht: %s", err)
307 func firstNonEmptyString(ss ...string) string {
308 for _, s := range ss {
316 func (cl *Client) Closed() <-chan struct{} {
322 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
323 for _, ds := range cl.dhtServers {
328 func (cl *Client) closeSockets() {
329 cl.eachListener(func(l socket) bool {
336 // Stops the client. All connections to peers are closed and all activity will
338 func (cl *Client) Close() {
342 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
344 for _, t := range cl.torrents {
347 for _, f := range cl.onClose {
353 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
354 if cl.ipBlockList == nil {
357 return cl.ipBlockList.Lookup(ip)
360 func (cl *Client) ipIsBlocked(ip net.IP) bool {
361 _, blocked := cl.ipBlockRange(ip)
365 func (cl *Client) wantConns() bool {
366 for _, t := range cl.torrents {
374 func (cl *Client) waitAccept() {
376 if cl.closed.IsSet() {
386 func (cl *Client) rejectAccepted(conn net.Conn) bool {
387 ra := conn.RemoteAddr()
388 rip := missinggo.AddrIP(ra)
389 if cl.config.DisableIPv4Peers && rip.To4() != nil {
392 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
395 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
398 if cl.rateLimitAccept(rip) {
401 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
404 func (cl *Client) acceptConnections(l net.Listener) {
406 conn, err := l.Accept()
407 conn = pproffd.WrapNetConn(conn)
409 closed := cl.closed.IsSet()
412 reject = cl.rejectAccepted(conn)
422 log.Printf("error accepting connection: %s", err)
427 torrent.Add("rejected accepted connections", 1)
430 go cl.incomingConnection(conn)
432 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
433 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
434 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
435 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
440 func (cl *Client) incomingConnection(nc net.Conn) {
442 if tc, ok := nc.(*net.TCPConn); ok {
445 c := cl.newConnection(nc, false)
446 c.Discovery = peerSourceIncoming
447 cl.runReceivedConn(c)
450 // Returns a handle to the given torrent, if it's present in the client.
451 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
454 t, ok = cl.torrents[ih]
458 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
459 return cl.torrents[ih]
462 type dialResult struct {
466 func countDialResult(err error) {
468 torrent.Add("successful dials", 1)
470 torrent.Add("unsuccessful dials", 1)
474 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
475 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
476 if ret < minDialTimeout {
482 // Returns whether an address is known to connect to a client with our own ID.
483 func (cl *Client) dopplegangerAddr(addr string) bool {
484 _, ok := cl.dopplegangerAddrs[addr]
488 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
490 case allowIpv4 && allowIpv6:
492 case allowIpv4 && !allowIpv6:
494 case !allowIpv4 && allowIpv6:
497 panic("unhandled ip network combination")
501 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
502 return sock.DialContext(ctx, "", addr)
505 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
507 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
508 c := func(s string) bool {
509 return strings.Contains(network, s)
512 if c("udp") || c("utp") {
516 if cfg.DisableTCP && c("tcp") {
519 if cfg.DisableIPv6 && c("6") {
525 // Returns a connection over UTP or TCP, whichever is first to connect.
526 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
527 ctx, cancel := context.WithCancel(ctx)
528 // As soon as we return one connection, cancel the others.
531 resCh := make(chan dialResult, left)
532 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
535 c, err := f(ctx, addr)
536 // This is a bit optimistic, but it looks non-trivial to thread
537 // this through the proxy code. Set it now in case we close the
538 // connection forthwith.
539 if tc, ok := c.(*net.TCPConn); ok {
543 resCh <- dialResult{c}
549 cl.eachListener(func(s socket) bool {
550 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
557 // Wait for a successful connection.
559 defer perf.ScopeTimer()()
560 for ; left > 0 && res.Conn == nil; left-- {
564 // There are still incompleted dials.
566 for ; left > 0; left-- {
567 conn := (<-resCh).Conn
574 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
579 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
580 if _, ok := t.halfOpen[addr]; !ok {
581 panic("invariant broken")
583 delete(t.halfOpen, addr)
587 // Performs initiator handshakes and returns a connection. Returns nil
588 // *connection if no connection for valid reasons.
589 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
590 c = cl.newConnection(nc, true)
591 c.headerEncrypted = encryptHeader
592 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
594 dl, ok := ctx.Deadline()
598 err = nc.SetDeadline(dl)
602 ok, err = cl.initiateHandshakes(c, t)
609 // Returns nil connection and nil error if no connection could be established
610 // for valid reasons.
611 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
612 nc := cl.dialFirst(ctx, addr)
617 if c == nil || err != nil {
621 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
624 // Returns nil connection and nil error if no connection could be established
625 // for valid reasons.
626 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
627 torrent.Add("establish outgoing connection", 1)
628 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
631 return t.dialTimeout()
634 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
635 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
640 torrent.Add("initiated conn with preferred header obfuscation", 1)
643 if cl.config.ForceEncryption {
644 // We should have just tried with an obfuscated header. A plaintext
645 // header can't result in an encrypted connection, so we're done.
646 if !obfuscatedHeaderFirst {
647 panic(cl.config.EncryptionPolicy)
651 // Try again with encryption if we didn't earlier, or without if we did.
652 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
654 torrent.Add("initiated conn with fallback header obfuscation", 1)
659 // Called to dial out and run a connection. The addr we're given is already
660 // considered half-open.
661 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
662 cl.dialRateLimiter.Wait(context.Background())
663 c, err := cl.establishOutgoingConn(t, addr)
666 // Don't release lock between here and addConnection, unless it's for
668 cl.noLongerHalfOpen(t, addr)
671 log.Printf("error establishing outgoing connection: %s", err)
680 cl.runHandshookConn(c, t)
683 // The port number for incoming peer connections. 0 if the client isn't
685 func (cl *Client) incomingPeerPort() int {
686 return cl.LocalPort()
689 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
690 if c.headerEncrypted {
692 rw, c.cryptoMethod, err = mse.InitiateHandshake(
699 func() mse.CryptoMethod {
701 case cl.config.ForceEncryption:
702 return mse.CryptoMethodRC4
703 case cl.config.DisableEncryption:
704 return mse.CryptoMethodPlaintext
706 return mse.AllSupportedCrypto
715 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
716 if ih != t.infoHash {
722 // Calls f with any secret keys.
723 func (cl *Client) forSkeys(f func([]byte) bool) {
726 for ih := range cl.torrents {
733 // Do encryption and bittorrent handshakes as receiver.
734 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
735 defer perf.ScopeTimerErr(&err)()
737 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
739 if err == nil || err == mse.ErrNoSecretKeyMatch {
740 if c.headerEncrypted {
741 torrent.Add("handshakes received encrypted", 1)
743 torrent.Add("handshakes received unencrypted", 1)
746 torrent.Add("handshakes received with error while handling encryption", 1)
749 if err == mse.ErrNoSecretKeyMatch {
754 if cl.config.ForceEncryption && !c.headerEncrypted {
755 err = errors.New("connection not encrypted")
758 ih, ok, err := cl.connBTHandshake(c, nil)
760 err = fmt.Errorf("error during bt handshake: %s", err)
772 // Returns !ok if handshake failed for valid reasons.
773 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
774 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
775 if err != nil || !ok {
779 c.PeerExtensionBytes = res.PeerExtensionBits
780 c.PeerID = res.PeerID
781 c.completedHandshake = time.Now()
785 func (cl *Client) runReceivedConn(c *connection) {
786 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
790 t, err := cl.receiveHandshakes(c)
793 "error receiving handshakes: %s", err,
797 "network", c.remoteAddr().Network(),
799 torrent.Add("error receiving handshake", 1)
801 cl.onBadAccept(c.remoteAddr())
806 torrent.Add("received handshake for unloaded torrent", 1)
808 cl.onBadAccept(c.remoteAddr())
812 torrent.Add("received handshake for loaded torrent", 1)
815 cl.runHandshookConn(c, t)
818 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
820 if c.PeerID == cl.peerID {
823 addr := c.conn.RemoteAddr().String()
824 cl.dopplegangerAddrs[addr] = struct{}{}
826 // Because the remote address is not necessarily the same as its
827 // client's torrent listen address, we won't record the remote address
828 // as a doppleganger. Instead, the initiator can record *us* as the
833 c.conn.SetWriteDeadline(time.Time{})
834 c.r = deadlineReader{c.conn, c.r}
835 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
836 if connIsIpv6(c.conn) {
837 torrent.Add("completed handshake over ipv6", 1)
839 if err := t.addConnection(c); err != nil {
840 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
843 defer t.dropConnection(c)
844 go c.writer(time.Minute)
845 cl.sendInitialMessages(c, t)
846 err := c.mainReadLoop()
847 if err != nil && cl.config.Debug {
848 log.Printf("error during connection main read loop: %s", err)
852 // See the order given in Transmission's tr_peerMsgsNew.
853 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
854 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
855 conn.Post(pp.Message{
857 ExtendedID: pp.HandshakeExtendedID,
858 ExtendedPayload: func() []byte {
859 msg := pp.ExtendedHandshakeMessage{
860 M: map[pp.ExtensionName]pp.ExtensionNumber{
861 pp.ExtensionNameMetadata: metadataExtendedId,
863 V: cl.config.ExtendedHandshakeClientVersion,
864 Reqq: 64, // TODO: Really?
865 YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
866 Encryption: !cl.config.DisableEncryption,
867 Port: cl.incomingPeerPort(),
868 MetadataSize: torrent.metadataSize(),
869 // TODO: We can figured these out specific to the socket
871 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
872 Ipv6: cl.config.PublicIp6.To16(),
874 if !cl.config.DisablePEX {
875 msg.M[pp.ExtensionNamePex] = pexExtendedId
877 return bencode.MustMarshal(msg)
882 if conn.fastEnabled() {
883 if torrent.haveAllPieces() {
884 conn.Post(pp.Message{Type: pp.HaveAll})
885 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
887 } else if !torrent.haveAnyPieces() {
888 conn.Post(pp.Message{Type: pp.HaveNone})
889 conn.sentHaves.Clear()
895 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
896 conn.Post(pp.Message{
903 func (cl *Client) dhtPort() (ret uint16) {
904 cl.eachDhtServer(func(s *dht.Server) {
905 ret = uint16(missinggo.AddrPort(s.Addr()))
910 func (cl *Client) haveDhtServer() (ret bool) {
911 cl.eachDhtServer(func(_ *dht.Server) {
917 // Process incoming ut_metadata message.
918 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
920 err := bencode.Unmarshal(payload, &d)
921 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
922 } else if err != nil {
923 return fmt.Errorf("error unmarshalling bencode: %s", err)
925 msgType, ok := d["msg_type"]
927 return errors.New("missing msg_type field")
931 case pp.DataMetadataExtensionMsgType:
932 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
933 if !c.requestedMetadataPiece(piece) {
934 return fmt.Errorf("got unexpected piece %d", piece)
936 c.metadataRequests[piece] = false
937 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
938 if begin < 0 || begin >= len(payload) {
939 return fmt.Errorf("data has bad offset in payload: %d", begin)
941 t.saveMetadataPiece(piece, payload[begin:])
942 c.lastUsefulChunkReceived = time.Now()
943 return t.maybeCompleteMetadata()
944 case pp.RequestMetadataExtensionMsgType:
945 if !t.haveMetadataPiece(piece) {
946 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
949 start := (1 << 14) * piece
950 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
952 case pp.RejectMetadataExtensionMsgType:
955 return errors.New("unknown msg_type value")
959 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
963 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
966 if _, ok := cl.ipBlockRange(ip); ok {
969 if _, ok := cl.badPeerIPs[ip.String()]; ok {
975 // Return a Torrent ready for insertion into a Client.
976 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
977 // use provided storage, if provided
978 storageClient := cl.defaultStorage
979 if specStorage != nil {
980 storageClient = storage.NewClient(specStorage)
986 peers: prioritizedPeers{
988 getPrio: func(p Peer) peerPriority {
989 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
992 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
994 halfOpen: make(map[string]Peer),
995 pieceStateChanges: pubsub.NewPubSub(),
997 storageOpener: storageClient,
998 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1000 networkingEnabled: true,
1002 metadataChanged: sync.Cond{
1005 duplicateRequestTimeout: 1 * time.Second,
1007 t.logger = cl.logger.Clone().AddValue(t)
1008 t.setChunkSize(defaultChunkSize)
1012 // A file-like handle to some torrent data resource.
1013 type Handle interface {
1020 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1021 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1024 // Adds a torrent by InfoHash with a custom Storage implementation.
1025 // If the torrent already exists then this Storage is ignored and the
1026 // existing torrent returned with `new` set to `false`
1027 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1030 t, ok := cl.torrents[infoHash]
1036 t = cl.newTorrent(infoHash, specStorage)
1037 cl.eachDhtServer(func(s *dht.Server) {
1038 go t.dhtAnnouncer(s)
1040 cl.torrents[infoHash] = t
1041 cl.clearAcceptLimits()
1042 t.updateWantPeersEvent()
1043 // Tickle Client.waitAccept, new torrent may want conns.
1044 cl.event.Broadcast()
1048 // Add or merge a torrent spec. If the torrent is already present, the
1049 // trackers will be merged with the existing ones. If the Info isn't yet
1050 // known, it will be set. The display name is replaced if the new spec
1051 // provides one. Returns new if the torrent wasn't already in the client.
1052 // Note that any `Storage` defined on the spec will be ignored if the
1053 // torrent is already present (i.e. `new` return value is `true`)
1054 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1055 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1056 if spec.DisplayName != "" {
1057 t.SetDisplayName(spec.DisplayName)
1059 if spec.InfoBytes != nil {
1060 err = t.SetInfoBytes(spec.InfoBytes)
1067 if spec.ChunkSize != 0 {
1068 t.setChunkSize(pp.Integer(spec.ChunkSize))
1070 t.addTrackers(spec.Trackers)
1075 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1076 t, ok := cl.torrents[infoHash]
1078 err = fmt.Errorf("no such torrent")
1085 delete(cl.torrents, infoHash)
1089 func (cl *Client) allTorrentsCompleted() bool {
1090 for _, t := range cl.torrents {
1094 if !t.haveAllPieces() {
1101 // Returns true when all torrents are completely downloaded and false if the
1102 // client is stopped before that.
1103 func (cl *Client) WaitAll() bool {
1106 for !cl.allTorrentsCompleted() {
1107 if cl.closed.IsSet() {
1115 // Returns handles to all the torrents loaded in the Client.
1116 func (cl *Client) Torrents() []*Torrent {
1119 return cl.torrentsAsSlice()
1122 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1123 for _, t := range cl.torrents {
1124 ret = append(ret, t)
1129 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1130 spec, err := TorrentSpecFromMagnetURI(uri)
1134 T, _, err = cl.AddTorrentSpec(spec)
1138 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1139 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1141 slices.MakeInto(&ss, mi.Nodes)
1146 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1147 mi, err := metainfo.LoadFromFile(filename)
1151 return cl.AddTorrent(mi)
1154 func (cl *Client) DhtServers() []*dht.Server {
1155 return cl.dhtServers
1158 func (cl *Client) AddDHTNodes(nodes []string) {
1159 for _, n := range nodes {
1160 hmp := missinggo.SplitHostMaybePort(n)
1161 ip := net.ParseIP(hmp.Host)
1163 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1166 ni := krpc.NodeInfo{
1167 Addr: krpc.NodeAddr{
1172 cl.eachDhtServer(func(s *dht.Server) {
1178 func (cl *Client) banPeerIP(ip net.IP) {
1179 if cl.badPeerIPs == nil {
1180 cl.badPeerIPs = make(map[string]struct{})
1182 cl.badPeerIPs[ip.String()] = struct{}{}
1185 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1191 PeerMaxRequests: 250,
1192 writeBuffer: new(bytes.Buffer),
1194 c.writerCond.L = cl.locker()
1195 c.setRW(connStatsReadWriter{nc, c})
1196 c.r = &rateLimitedReader{
1197 l: cl.config.DownloadRateLimiter,
1203 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1213 Source: peerSourceDHTAnnouncePeer,
1217 func firstNotNil(ips ...net.IP) net.IP {
1218 for _, ip := range ips {
1226 func (cl *Client) eachListener(f func(socket) bool) {
1227 for _, s := range cl.conns {
1234 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1235 cl.eachListener(func(l socket) bool {
1242 func (cl *Client) publicIp(peer net.IP) net.IP {
1243 // TODO: Use BEP 10 to determine how peers are seeing us.
1244 if peer.To4() != nil {
1246 cl.config.PublicIp4,
1247 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1251 cl.config.PublicIp6,
1252 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1257 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1258 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1259 return f(missinggo.AddrIP(l.Addr()))
1263 // Our IP as a peer should see it.
1264 func (cl *Client) publicAddr(peer net.IP) ipPort {
1265 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1268 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1271 cl.eachListener(func(l socket) bool {
1272 ret = append(ret, l.Addr())
1278 func (cl *Client) onBadAccept(addr net.Addr) {
1279 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1280 if cl.acceptLimiter == nil {
1281 cl.acceptLimiter = make(map[ipStr]int)
1283 cl.acceptLimiter[ipStr(ip.String())]++
1286 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1287 if ip4 := ip.To4(); ip4 != nil {
1288 return ip4.Mask(net.CIDRMask(24, 32))
1293 func (cl *Client) clearAcceptLimits() {
1294 cl.acceptLimiter = nil
1297 func (cl *Client) acceptLimitClearer() {
1300 case <-cl.closed.LockedChan(cl.locker()):
1302 case <-time.After(15 * time.Minute):
1304 cl.clearAcceptLimits()
1310 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1311 if cl.config.DisableAcceptRateLimiting {
1314 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1317 func (cl *Client) rLock() {
1321 func (cl *Client) rUnlock() {
1325 func (cl *Client) lock() {
1329 func (cl *Client) unlock() {
1333 func (cl *Client) locker() sync.Locker {
1334 return clientLocker{cl}
1337 type clientLocker struct {
1341 func (cl clientLocker) Lock() {
1345 func (cl clientLocker) Unlock() {