17 "github.com/anacrolix/dht"
18 "github.com/anacrolix/dht/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
42 // An aggregate of stats over all connections. First in struct to ensure
43 // 64-bit alignment of fields. See #262.
47 closed missinggo.Event
53 defaultStorage *storage.Client
56 dhtServers []*dht.Server
57 ipBlockList iplist.Ranger
58 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
59 extensionBytes pp.PeerExtensionBits
61 // Set of addresses that have our client ID. This intentionally will
62 // include ourselves if we end up trying to connect to our own address
63 // through legitimate channels.
64 dopplegangerAddrs map[string]struct{}
65 badPeerIPs map[string]struct{}
66 torrents map[InfoHash]*Torrent
68 acceptLimiter map[ipStr]int
73 func (cl *Client) BadPeerIPs() []string {
76 return cl.badPeerIPsLocked()
79 func (cl *Client) badPeerIPsLocked() []string {
80 return slices.FromMapKeys(cl.badPeerIPs).([]string)
83 func (cl *Client) PeerID() PeerID {
87 type torrentAddr string
89 func (torrentAddr) Network() string { return "" }
91 func (me torrentAddr) String() string { return string(me) }
93 func (cl *Client) LocalPort() (port int) {
94 cl.eachListener(func(l socket) bool {
95 _port := missinggo.AddrPort(l.Addr())
101 } else if port != _port {
102 panic("mismatched ports")
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
117 // Writes out a human readable status of the client, such as for writing to a
119 func (cl *Client) WriteStatus(_w io.Writer) {
121 defer cl.mu.RUnlock()
122 w := bufio.NewWriter(_w)
124 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s *dht.Server) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
145 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147 w.WriteString("<missing metainfo>")
155 const debugLogValue = "debug"
157 func (cl *Client) debugLogFilter(m *log.Msg) bool {
158 if !cl.config.Debug {
159 _, ok := m.Values()[debugLogValue]
165 func (cl *Client) initLogger() {
166 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
184 dopplegangerAddrs: make(map[string]struct{}),
185 torrents: make(map[metainfo.Hash]*Torrent),
187 go cl.acceptLimitClearer()
195 cl.extensionBytes = defaultPeerExtensionBytes()
197 storageImpl := cfg.DefaultStorage
198 if storageImpl == nil {
199 // We'd use mmap but HFS+ doesn't support sparse files.
200 storageImpl = storage.NewFile(cfg.DataDir)
201 cl.onClose = append(cl.onClose, func() {
202 if err := storageImpl.Close(); err != nil {
203 log.Printf("error closing default storage: %s", err)
207 cl.defaultStorage = storage.NewClient(storageImpl)
208 if cfg.IPBlocklist != nil {
209 cl.ipBlockList = cfg.IPBlocklist
212 if cfg.PeerID != "" {
213 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
215 o := copy(cl.peerID[:], cfg.Bep20)
216 _, err = rand.Read(cl.peerID[o:])
218 panic("error generating peer id")
222 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
229 for _, s := range cl.conns {
230 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
231 go cl.acceptConnections(s)
237 for _, s := range cl.conns {
238 if pc, ok := s.(net.PacketConn); ok {
239 ds, err := cl.newDhtServer(pc)
243 cl.dhtServers = append(cl.dhtServers, ds)
251 func (cl *Client) enabledPeerNetworks() (ns []string) {
252 for _, n := range allPeerNetworks {
253 if peerNetworkEnabled(n, cl.config) {
260 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
261 cfg := dht.ServerConfig{
262 IPBlocklist: cl.ipBlockList,
264 OnAnnouncePeer: cl.onDHTAnnouncePeer,
265 PublicIP: func() net.IP {
266 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
267 return cl.config.PublicIp6
269 return cl.config.PublicIp4
271 StartingNodes: cl.config.DhtStartingNodes,
273 s, err = dht.NewServer(&cfg)
276 if _, err := s.Bootstrap(); err != nil {
277 log.Printf("error bootstrapping dht: %s", err)
284 func firstNonEmptyString(ss ...string) string {
285 for _, s := range ss {
293 func (cl *Client) Closed() <-chan struct{} {
299 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
300 for _, ds := range cl.dhtServers {
305 func (cl *Client) closeSockets() {
306 cl.eachListener(func(l socket) bool {
313 // Stops the client. All connections to peers are closed and all activity will
315 func (cl *Client) Close() {
319 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
321 for _, t := range cl.torrents {
324 for _, f := range cl.onClose {
330 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
331 if cl.ipBlockList == nil {
334 return cl.ipBlockList.Lookup(ip)
337 func (cl *Client) ipIsBlocked(ip net.IP) bool {
338 _, blocked := cl.ipBlockRange(ip)
342 func (cl *Client) waitAccept() {
344 for _, t := range cl.torrents {
349 if cl.closed.IsSet() {
356 func (cl *Client) rejectAccepted(conn net.Conn) bool {
357 ra := conn.RemoteAddr()
358 rip := missinggo.AddrIP(ra)
359 if cl.config.DisableIPv4Peers && rip.To4() != nil {
362 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
365 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
368 if cl.rateLimitAccept(rip) {
371 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
374 func (cl *Client) acceptConnections(l net.Listener) {
376 conn, err := l.Accept()
377 conn = pproffd.WrapNetConn(conn)
379 closed := cl.closed.IsSet()
382 reject = cl.rejectAccepted(conn)
393 // I think something harsher should happen here? Our accept
394 // routine just fucked off.
399 torrent.Add("rejected accepted connections", 1)
402 go cl.incomingConnection(conn)
404 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
405 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
406 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
407 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
412 func (cl *Client) incomingConnection(nc net.Conn) {
414 if tc, ok := nc.(*net.TCPConn); ok {
417 c := cl.newConnection(nc, false)
418 c.Discovery = peerSourceIncoming
419 cl.runReceivedConn(c)
422 // Returns a handle to the given torrent, if it's present in the client.
423 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
426 t, ok = cl.torrents[ih]
430 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
431 return cl.torrents[ih]
434 type dialResult struct {
438 func countDialResult(err error) {
440 torrent.Add("successful dials", 1)
442 torrent.Add("unsuccessful dials", 1)
446 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
447 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
448 if ret < minDialTimeout {
454 // Returns whether an address is known to connect to a client with our own ID.
455 func (cl *Client) dopplegangerAddr(addr string) bool {
456 _, ok := cl.dopplegangerAddrs[addr]
460 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
462 case allowIpv4 && allowIpv6:
464 case allowIpv4 && !allowIpv6:
466 case !allowIpv4 && allowIpv6:
469 panic("unhandled ip network combination")
473 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
474 return sock.DialContext(ctx, "", addr)
477 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
479 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
480 c := func(s string) bool {
481 return strings.Contains(network, s)
484 if c("udp") || c("utp") {
488 if cfg.DisableTCP && c("tcp") {
491 if cfg.DisableIPv6 && c("6") {
497 // Returns a connection over UTP or TCP, whichever is first to connect.
498 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
499 ctx, cancel := context.WithCancel(ctx)
500 // As soon as we return one connection, cancel the others.
503 resCh := make(chan dialResult, left)
504 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
507 c, err := f(ctx, addr)
508 // This is a bit optimistic, but it looks non-trivial to thread
509 // this through the proxy code. Set it now in case we close the
510 // connection forthwith.
511 if tc, ok := c.(*net.TCPConn); ok {
515 resCh <- dialResult{c}
521 cl.eachListener(func(s socket) bool {
522 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
529 // Wait for a successful connection.
531 defer perf.ScopeTimer()()
532 for ; left > 0 && res.Conn == nil; left-- {
536 // There are still incompleted dials.
538 for ; left > 0; left-- {
539 conn := (<-resCh).Conn
546 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
551 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
552 if _, ok := t.halfOpen[addr]; !ok {
553 panic("invariant broken")
555 delete(t.halfOpen, addr)
559 // Performs initiator handshakes and returns a connection. Returns nil
560 // *connection if no connection for valid reasons.
561 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
562 c = cl.newConnection(nc, true)
563 c.headerEncrypted = encryptHeader
564 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
566 dl, ok := ctx.Deadline()
570 err = nc.SetDeadline(dl)
574 ok, err = cl.initiateHandshakes(c, t)
581 // Returns nil connection and nil error if no connection could be established
582 // for valid reasons.
583 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
584 nc := cl.dialFirst(ctx, addr)
589 if c == nil || err != nil {
593 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
596 // Returns nil connection and nil error if no connection could be established
597 // for valid reasons.
598 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
599 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
601 defer cl.mu.RUnlock()
602 return t.dialTimeout()
605 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
606 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
611 torrent.Add("initiated conn with preferred header obfuscation", 1)
614 if cl.config.ForceEncryption {
615 // We should have just tried with an obfuscated header. A plaintext
616 // header can't result in an encrypted connection, so we're done.
617 if !obfuscatedHeaderFirst {
618 panic(cl.config.EncryptionPolicy)
622 // Try again with encryption if we didn't earlier, or without if we did.
623 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
625 torrent.Add("initiated conn with fallback header obfuscation", 1)
630 // Called to dial out and run a connection. The addr we're given is already
631 // considered half-open.
632 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
633 c, err := cl.establishOutgoingConn(t, addr)
636 // Don't release lock between here and addConnection, unless it's for
638 cl.noLongerHalfOpen(t, addr)
641 log.Printf("error establishing outgoing connection: %s", err)
650 cl.runHandshookConn(c, t)
653 // The port number for incoming peer connections. 0 if the client isn't
655 func (cl *Client) incomingPeerPort() int {
656 return cl.LocalPort()
659 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
660 if c.headerEncrypted {
662 rw, c.cryptoMethod, err = mse.InitiateHandshake(
669 func() mse.CryptoMethod {
671 case cl.config.ForceEncryption:
672 return mse.CryptoMethodRC4
673 case cl.config.DisableEncryption:
674 return mse.CryptoMethodPlaintext
676 return mse.AllSupportedCrypto
685 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
686 if ih != t.infoHash {
692 // Calls f with any secret keys.
693 func (cl *Client) forSkeys(f func([]byte) bool) {
696 for ih := range cl.torrents {
703 // Do encryption and bittorrent handshakes as receiver.
704 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
705 defer perf.ScopeTimerErr(&err)()
707 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
710 if err == mse.ErrNoSecretKeyMatch {
715 if cl.config.ForceEncryption && !c.headerEncrypted {
716 err = errors.New("connection not encrypted")
719 ih, ok, err := cl.connBTHandshake(c, nil)
721 err = fmt.Errorf("error during bt handshake: %s", err)
733 // Returns !ok if handshake failed for valid reasons.
734 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
735 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
736 if err != nil || !ok {
740 c.PeerExtensionBytes = res.PeerExtensionBits
741 c.PeerID = res.PeerID
742 c.completedHandshake = time.Now()
746 func (cl *Client) runReceivedConn(c *connection) {
747 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
751 t, err := cl.receiveHandshakes(c)
754 "error receiving handshakes: %s", err,
758 "network", c.remoteAddr().Network(),
760 torrent.Add("error receiving handshake", 1)
762 cl.onBadAccept(c.remoteAddr())
767 torrent.Add("received handshake for unloaded torrent", 1)
769 cl.onBadAccept(c.remoteAddr())
773 torrent.Add("received handshake for loaded torrent", 1)
776 cl.runHandshookConn(c, t)
779 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
781 if c.PeerID == cl.peerID {
784 addr := c.conn.RemoteAddr().String()
785 cl.dopplegangerAddrs[addr] = struct{}{}
787 // Because the remote address is not necessarily the same as its
788 // client's torrent listen address, we won't record the remote address
789 // as a doppleganger. Instead, the initiator can record *us* as the
794 c.conn.SetWriteDeadline(time.Time{})
795 c.r = deadlineReader{c.conn, c.r}
796 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
797 if connIsIpv6(c.conn) {
798 torrent.Add("completed handshake over ipv6", 1)
800 if err := t.addConnection(c); err != nil {
801 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
804 defer t.dropConnection(c)
805 go c.writer(time.Minute)
806 cl.sendInitialMessages(c, t)
807 err := c.mainReadLoop()
808 if err != nil && cl.config.Debug {
809 log.Printf("error during connection main read loop: %s", err)
813 // See the order given in Transmission's tr_peerMsgsNew.
814 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
815 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
816 conn.Post(pp.Message{
818 ExtendedID: pp.HandshakeExtendedID,
819 ExtendedPayload: func() []byte {
820 msg := pp.ExtendedHandshakeMessage{
821 M: map[pp.ExtensionName]pp.ExtensionNumber{
822 pp.ExtensionNameMetadata: metadataExtendedId,
824 V: cl.config.ExtendedHandshakeClientVersion,
825 Reqq: 64, // TODO: Really?
826 YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
827 Encryption: !cl.config.DisableEncryption,
828 Port: cl.incomingPeerPort(),
829 MetadataSize: torrent.metadataSize(),
830 // TODO: We can figured these out specific to the socket
832 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
833 Ipv6: cl.config.PublicIp6.To16(),
835 if !cl.config.DisablePEX {
836 msg.M[pp.ExtensionNamePex] = pexExtendedId
838 return bencode.MustMarshal(msg)
843 if conn.fastEnabled() {
844 if torrent.haveAllPieces() {
845 conn.Post(pp.Message{Type: pp.HaveAll})
846 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
848 } else if !torrent.haveAnyPieces() {
849 conn.Post(pp.Message{Type: pp.HaveNone})
850 conn.sentHaves.Clear()
856 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
857 conn.Post(pp.Message{
864 func (cl *Client) dhtPort() (ret uint16) {
865 cl.eachDhtServer(func(s *dht.Server) {
866 ret = uint16(missinggo.AddrPort(s.Addr()))
871 func (cl *Client) haveDhtServer() (ret bool) {
872 cl.eachDhtServer(func(_ *dht.Server) {
878 // Process incoming ut_metadata message.
879 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
881 err := bencode.Unmarshal(payload, &d)
882 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
883 } else if err != nil {
884 return fmt.Errorf("error unmarshalling bencode: %s", err)
886 msgType, ok := d["msg_type"]
888 return errors.New("missing msg_type field")
892 case pp.DataMetadataExtensionMsgType:
893 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
894 if !c.requestedMetadataPiece(piece) {
895 return fmt.Errorf("got unexpected piece %d", piece)
897 c.metadataRequests[piece] = false
898 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
899 if begin < 0 || begin >= len(payload) {
900 return fmt.Errorf("data has bad offset in payload: %d", begin)
902 t.saveMetadataPiece(piece, payload[begin:])
903 c.lastUsefulChunkReceived = time.Now()
904 return t.maybeCompleteMetadata()
905 case pp.RequestMetadataExtensionMsgType:
906 if !t.haveMetadataPiece(piece) {
907 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
910 start := (1 << 14) * piece
911 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
913 case pp.RejectMetadataExtensionMsgType:
916 return errors.New("unknown msg_type value")
920 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
924 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
927 if _, ok := cl.ipBlockRange(ip); ok {
930 if _, ok := cl.badPeerIPs[ip.String()]; ok {
936 // Return a Torrent ready for insertion into a Client.
937 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
938 // use provided storage, if provided
939 storageClient := cl.defaultStorage
940 if specStorage != nil {
941 storageClient = storage.NewClient(specStorage)
947 peers: prioritizedPeers{
949 getPrio: func(p Peer) peerPriority {
950 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
953 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
955 halfOpen: make(map[string]Peer),
956 pieceStateChanges: pubsub.NewPubSub(),
958 storageOpener: storageClient,
959 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
961 networkingEnabled: true,
963 metadataChanged: sync.Cond{
966 duplicateRequestTimeout: 1 * time.Second,
968 t.logger = cl.logger.Clone().AddValue(t)
969 t.setChunkSize(defaultChunkSize)
973 // A file-like handle to some torrent data resource.
974 type Handle interface {
981 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
982 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
985 // Adds a torrent by InfoHash with a custom Storage implementation.
986 // If the torrent already exists then this Storage is ignored and the
987 // existing torrent returned with `new` set to `false`
988 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
991 t, ok := cl.torrents[infoHash]
997 t = cl.newTorrent(infoHash, specStorage)
998 cl.eachDhtServer(func(s *dht.Server) {
1001 cl.torrents[infoHash] = t
1002 cl.clearAcceptLimits()
1003 t.updateWantPeersEvent()
1004 // Tickle Client.waitAccept, new torrent may want conns.
1005 cl.event.Broadcast()
1009 // Add or merge a torrent spec. If the torrent is already present, the
1010 // trackers will be merged with the existing ones. If the Info isn't yet
1011 // known, it will be set. The display name is replaced if the new spec
1012 // provides one. Returns new if the torrent wasn't already in the client.
1013 // Note that any `Storage` defined on the spec will be ignored if the
1014 // torrent is already present (i.e. `new` return value is `true`)
1015 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1016 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1017 if spec.DisplayName != "" {
1018 t.SetDisplayName(spec.DisplayName)
1020 if spec.InfoBytes != nil {
1021 err = t.SetInfoBytes(spec.InfoBytes)
1027 defer cl.mu.Unlock()
1028 if spec.ChunkSize != 0 {
1029 t.setChunkSize(pp.Integer(spec.ChunkSize))
1031 t.addTrackers(spec.Trackers)
1036 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1037 t, ok := cl.torrents[infoHash]
1039 err = fmt.Errorf("no such torrent")
1046 delete(cl.torrents, infoHash)
1050 func (cl *Client) allTorrentsCompleted() bool {
1051 for _, t := range cl.torrents {
1055 if !t.haveAllPieces() {
1062 // Returns true when all torrents are completely downloaded and false if the
1063 // client is stopped before that.
1064 func (cl *Client) WaitAll() bool {
1066 defer cl.mu.Unlock()
1067 for !cl.allTorrentsCompleted() {
1068 if cl.closed.IsSet() {
1076 // Returns handles to all the torrents loaded in the Client.
1077 func (cl *Client) Torrents() []*Torrent {
1079 defer cl.mu.Unlock()
1080 return cl.torrentsAsSlice()
1083 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1084 for _, t := range cl.torrents {
1085 ret = append(ret, t)
1090 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1091 spec, err := TorrentSpecFromMagnetURI(uri)
1095 T, _, err = cl.AddTorrentSpec(spec)
1099 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1100 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1102 slices.MakeInto(&ss, mi.Nodes)
1107 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1108 mi, err := metainfo.LoadFromFile(filename)
1112 return cl.AddTorrent(mi)
1115 func (cl *Client) DhtServers() []*dht.Server {
1116 return cl.dhtServers
1119 func (cl *Client) AddDHTNodes(nodes []string) {
1120 for _, n := range nodes {
1121 hmp := missinggo.SplitHostMaybePort(n)
1122 ip := net.ParseIP(hmp.Host)
1124 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1127 ni := krpc.NodeInfo{
1128 Addr: krpc.NodeAddr{
1133 cl.eachDhtServer(func(s *dht.Server) {
1139 func (cl *Client) banPeerIP(ip net.IP) {
1140 if cl.badPeerIPs == nil {
1141 cl.badPeerIPs = make(map[string]struct{})
1143 cl.badPeerIPs[ip.String()] = struct{}{}
1146 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1152 PeerMaxRequests: 250,
1153 writeBuffer: new(bytes.Buffer),
1155 c.writerCond.L = &cl.mu
1156 c.setRW(connStatsReadWriter{nc, c})
1157 c.r = &rateLimitedReader{
1158 l: cl.config.DownloadRateLimiter,
1164 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1166 defer cl.mu.Unlock()
1174 Source: peerSourceDHTAnnouncePeer,
1178 func firstNotNil(ips ...net.IP) net.IP {
1179 for _, ip := range ips {
1187 func (cl *Client) eachListener(f func(socket) bool) {
1188 for _, s := range cl.conns {
1195 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1196 cl.eachListener(func(l socket) bool {
1203 func (cl *Client) publicIp(peer net.IP) net.IP {
1204 // TODO: Use BEP 10 to determine how peers are seeing us.
1205 if peer.To4() != nil {
1207 cl.config.PublicIp4,
1208 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1212 cl.config.PublicIp6,
1213 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1218 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1219 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1220 return f(missinggo.AddrIP(l.Addr()))
1224 // Our IP as a peer should see it.
1225 func (cl *Client) publicAddr(peer net.IP) ipPort {
1226 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1229 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1231 defer cl.mu.Unlock()
1232 cl.eachListener(func(l socket) bool {
1233 ret = append(ret, l.Addr())
1239 func (cl *Client) onBadAccept(addr net.Addr) {
1240 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1241 if cl.acceptLimiter == nil {
1242 cl.acceptLimiter = make(map[ipStr]int)
1244 cl.acceptLimiter[ipStr(ip.String())]++
1247 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1248 if ip4 := ip.To4(); ip4 != nil {
1249 return ip4.Mask(net.CIDRMask(24, 32))
1254 func (cl *Client) clearAcceptLimits() {
1255 cl.acceptLimiter = nil
1258 func (cl *Client) acceptLimitClearer() {
1261 case <-cl.closed.LockedChan(&cl.mu):
1263 case <-time.After(15 * time.Minute):
1265 cl.clearAcceptLimits()
1271 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1272 if cl.config.DisableAcceptRateLimiting {
1275 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0