17 "github.com/anacrolix/dht"
18 "github.com/anacrolix/dht/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
42 // An aggregate of stats over all connections. First in struct to ensure
43 // 64-bit alignment of fields. See #262.
48 closed missinggo.Event
54 defaultStorage *storage.Client
57 dhtServers []*dht.Server
58 ipBlockList iplist.Ranger
59 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
60 extensionBytes pp.PeerExtensionBits
62 // Set of addresses that have our client ID. This intentionally will
63 // include ourselves if we end up trying to connect to our own address
64 // through legitimate channels.
65 dopplegangerAddrs map[string]struct{}
66 badPeerIPs map[string]struct{}
67 torrents map[InfoHash]*Torrent
69 acceptLimiter map[ipStr]int
74 func (cl *Client) BadPeerIPs() []string {
77 return cl.badPeerIPsLocked()
80 func (cl *Client) badPeerIPsLocked() []string {
81 return slices.FromMapKeys(cl.badPeerIPs).([]string)
84 func (cl *Client) PeerID() PeerID {
88 type torrentAddr string
90 func (torrentAddr) Network() string { return "" }
92 func (me torrentAddr) String() string { return string(me) }
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = "debug"
158 func (cl *Client) debugLogFilter(m *log.Msg) bool {
159 if !cl.config.Debug {
160 _, ok := m.Values()[debugLogValue]
166 func (cl *Client) initLogger() {
167 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
197 cl.event.L = cl.locker()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 log.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
230 for _, s := range cl.conns {
231 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
232 go cl.acceptConnections(s)
238 for _, s := range cl.conns {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, ds)
252 func (cl *Client) enabledPeerNetworks() (ns []string) {
253 for _, n := range allPeerNetworks {
254 if peerNetworkEnabled(n, cl.config) {
261 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
262 cfg := dht.ServerConfig{
263 IPBlocklist: cl.ipBlockList,
265 OnAnnouncePeer: cl.onDHTAnnouncePeer,
266 PublicIP: func() net.IP {
267 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
268 return cl.config.PublicIp6
270 return cl.config.PublicIp4
272 StartingNodes: cl.config.DhtStartingNodes,
274 s, err = dht.NewServer(&cfg)
277 if _, err := s.Bootstrap(); err != nil {
278 log.Printf("error bootstrapping dht: %s", err)
285 func firstNonEmptyString(ss ...string) string {
286 for _, s := range ss {
294 func (cl *Client) Closed() <-chan struct{} {
300 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
301 for _, ds := range cl.dhtServers {
306 func (cl *Client) closeSockets() {
307 cl.eachListener(func(l socket) bool {
314 // Stops the client. All connections to peers are closed and all activity will
316 func (cl *Client) Close() {
320 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
322 for _, t := range cl.torrents {
325 for _, f := range cl.onClose {
331 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
332 if cl.ipBlockList == nil {
335 return cl.ipBlockList.Lookup(ip)
338 func (cl *Client) ipIsBlocked(ip net.IP) bool {
339 _, blocked := cl.ipBlockRange(ip)
343 func (cl *Client) waitAccept() {
345 for _, t := range cl.torrents {
350 if cl.closed.IsSet() {
357 func (cl *Client) rejectAccepted(conn net.Conn) bool {
358 ra := conn.RemoteAddr()
359 rip := missinggo.AddrIP(ra)
360 if cl.config.DisableIPv4Peers && rip.To4() != nil {
363 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
366 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
369 if cl.rateLimitAccept(rip) {
372 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
375 func (cl *Client) acceptConnections(l net.Listener) {
377 conn, err := l.Accept()
378 conn = pproffd.WrapNetConn(conn)
380 closed := cl.closed.IsSet()
383 reject = cl.rejectAccepted(conn)
393 log.Printf("error accepting connection: %s", err)
397 torrent.Add("rejected accepted connections", 1)
400 go cl.incomingConnection(conn)
402 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
403 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
404 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
405 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
410 func (cl *Client) incomingConnection(nc net.Conn) {
412 if tc, ok := nc.(*net.TCPConn); ok {
415 c := cl.newConnection(nc, false)
416 c.Discovery = peerSourceIncoming
417 cl.runReceivedConn(c)
420 // Returns a handle to the given torrent, if it's present in the client.
421 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
424 t, ok = cl.torrents[ih]
428 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
429 return cl.torrents[ih]
432 type dialResult struct {
436 func countDialResult(err error) {
438 torrent.Add("successful dials", 1)
440 torrent.Add("unsuccessful dials", 1)
444 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
445 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
446 if ret < minDialTimeout {
452 // Returns whether an address is known to connect to a client with our own ID.
453 func (cl *Client) dopplegangerAddr(addr string) bool {
454 _, ok := cl.dopplegangerAddrs[addr]
458 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
460 case allowIpv4 && allowIpv6:
462 case allowIpv4 && !allowIpv6:
464 case !allowIpv4 && allowIpv6:
467 panic("unhandled ip network combination")
471 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
472 return sock.DialContext(ctx, "", addr)
475 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
477 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
478 c := func(s string) bool {
479 return strings.Contains(network, s)
482 if c("udp") || c("utp") {
486 if cfg.DisableTCP && c("tcp") {
489 if cfg.DisableIPv6 && c("6") {
495 // Returns a connection over UTP or TCP, whichever is first to connect.
496 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
497 ctx, cancel := context.WithCancel(ctx)
498 // As soon as we return one connection, cancel the others.
501 resCh := make(chan dialResult, left)
502 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
505 c, err := f(ctx, addr)
506 // This is a bit optimistic, but it looks non-trivial to thread
507 // this through the proxy code. Set it now in case we close the
508 // connection forthwith.
509 if tc, ok := c.(*net.TCPConn); ok {
513 resCh <- dialResult{c}
519 cl.eachListener(func(s socket) bool {
520 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
527 // Wait for a successful connection.
529 defer perf.ScopeTimer()()
530 for ; left > 0 && res.Conn == nil; left-- {
534 // There are still incompleted dials.
536 for ; left > 0; left-- {
537 conn := (<-resCh).Conn
544 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
549 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
550 if _, ok := t.halfOpen[addr]; !ok {
551 panic("invariant broken")
553 delete(t.halfOpen, addr)
557 // Performs initiator handshakes and returns a connection. Returns nil
558 // *connection if no connection for valid reasons.
559 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
560 c = cl.newConnection(nc, true)
561 c.headerEncrypted = encryptHeader
562 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
564 dl, ok := ctx.Deadline()
568 err = nc.SetDeadline(dl)
572 ok, err = cl.initiateHandshakes(c, t)
579 // Returns nil connection and nil error if no connection could be established
580 // for valid reasons.
581 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
582 nc := cl.dialFirst(ctx, addr)
587 if c == nil || err != nil {
591 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
594 // Returns nil connection and nil error if no connection could be established
595 // for valid reasons.
596 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
597 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
600 return t.dialTimeout()
603 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
604 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
609 torrent.Add("initiated conn with preferred header obfuscation", 1)
612 if cl.config.ForceEncryption {
613 // We should have just tried with an obfuscated header. A plaintext
614 // header can't result in an encrypted connection, so we're done.
615 if !obfuscatedHeaderFirst {
616 panic(cl.config.EncryptionPolicy)
620 // Try again with encryption if we didn't earlier, or without if we did.
621 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
623 torrent.Add("initiated conn with fallback header obfuscation", 1)
628 // Called to dial out and run a connection. The addr we're given is already
629 // considered half-open.
630 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
631 c, err := cl.establishOutgoingConn(t, addr)
634 // Don't release lock between here and addConnection, unless it's for
636 cl.noLongerHalfOpen(t, addr)
639 log.Printf("error establishing outgoing connection: %s", err)
648 cl.runHandshookConn(c, t)
651 // The port number for incoming peer connections. 0 if the client isn't
653 func (cl *Client) incomingPeerPort() int {
654 return cl.LocalPort()
657 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
658 if c.headerEncrypted {
660 rw, c.cryptoMethod, err = mse.InitiateHandshake(
667 func() mse.CryptoMethod {
669 case cl.config.ForceEncryption:
670 return mse.CryptoMethodRC4
671 case cl.config.DisableEncryption:
672 return mse.CryptoMethodPlaintext
674 return mse.AllSupportedCrypto
683 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
684 if ih != t.infoHash {
690 // Calls f with any secret keys.
691 func (cl *Client) forSkeys(f func([]byte) bool) {
694 for ih := range cl.torrents {
701 // Do encryption and bittorrent handshakes as receiver.
702 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
703 defer perf.ScopeTimerErr(&err)()
705 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
708 if err == mse.ErrNoSecretKeyMatch {
713 if cl.config.ForceEncryption && !c.headerEncrypted {
714 err = errors.New("connection not encrypted")
717 ih, ok, err := cl.connBTHandshake(c, nil)
719 err = fmt.Errorf("error during bt handshake: %s", err)
731 // Returns !ok if handshake failed for valid reasons.
732 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
733 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
734 if err != nil || !ok {
738 c.PeerExtensionBytes = res.PeerExtensionBits
739 c.PeerID = res.PeerID
740 c.completedHandshake = time.Now()
744 func (cl *Client) runReceivedConn(c *connection) {
745 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
749 t, err := cl.receiveHandshakes(c)
752 "error receiving handshakes: %s", err,
756 "network", c.remoteAddr().Network(),
758 torrent.Add("error receiving handshake", 1)
760 cl.onBadAccept(c.remoteAddr())
765 torrent.Add("received handshake for unloaded torrent", 1)
767 cl.onBadAccept(c.remoteAddr())
771 torrent.Add("received handshake for loaded torrent", 1)
774 cl.runHandshookConn(c, t)
777 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
779 if c.PeerID == cl.peerID {
782 addr := c.conn.RemoteAddr().String()
783 cl.dopplegangerAddrs[addr] = struct{}{}
785 // Because the remote address is not necessarily the same as its
786 // client's torrent listen address, we won't record the remote address
787 // as a doppleganger. Instead, the initiator can record *us* as the
792 c.conn.SetWriteDeadline(time.Time{})
793 c.r = deadlineReader{c.conn, c.r}
794 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
795 if connIsIpv6(c.conn) {
796 torrent.Add("completed handshake over ipv6", 1)
798 if err := t.addConnection(c); err != nil {
799 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
802 defer t.dropConnection(c)
803 go c.writer(time.Minute)
804 cl.sendInitialMessages(c, t)
805 err := c.mainReadLoop()
806 if err != nil && cl.config.Debug {
807 log.Printf("error during connection main read loop: %s", err)
811 // See the order given in Transmission's tr_peerMsgsNew.
812 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
813 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
814 conn.Post(pp.Message{
816 ExtendedID: pp.HandshakeExtendedID,
817 ExtendedPayload: func() []byte {
818 msg := pp.ExtendedHandshakeMessage{
819 M: map[pp.ExtensionName]pp.ExtensionNumber{
820 pp.ExtensionNameMetadata: metadataExtendedId,
822 V: cl.config.ExtendedHandshakeClientVersion,
823 Reqq: 64, // TODO: Really?
824 YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
825 Encryption: !cl.config.DisableEncryption,
826 Port: cl.incomingPeerPort(),
827 MetadataSize: torrent.metadataSize(),
828 // TODO: We can figured these out specific to the socket
830 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
831 Ipv6: cl.config.PublicIp6.To16(),
833 if !cl.config.DisablePEX {
834 msg.M[pp.ExtensionNamePex] = pexExtendedId
836 return bencode.MustMarshal(msg)
841 if conn.fastEnabled() {
842 if torrent.haveAllPieces() {
843 conn.Post(pp.Message{Type: pp.HaveAll})
844 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
846 } else if !torrent.haveAnyPieces() {
847 conn.Post(pp.Message{Type: pp.HaveNone})
848 conn.sentHaves.Clear()
854 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
855 conn.Post(pp.Message{
862 func (cl *Client) dhtPort() (ret uint16) {
863 cl.eachDhtServer(func(s *dht.Server) {
864 ret = uint16(missinggo.AddrPort(s.Addr()))
869 func (cl *Client) haveDhtServer() (ret bool) {
870 cl.eachDhtServer(func(_ *dht.Server) {
876 // Process incoming ut_metadata message.
877 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
879 err := bencode.Unmarshal(payload, &d)
880 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
881 } else if err != nil {
882 return fmt.Errorf("error unmarshalling bencode: %s", err)
884 msgType, ok := d["msg_type"]
886 return errors.New("missing msg_type field")
890 case pp.DataMetadataExtensionMsgType:
891 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
892 if !c.requestedMetadataPiece(piece) {
893 return fmt.Errorf("got unexpected piece %d", piece)
895 c.metadataRequests[piece] = false
896 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
897 if begin < 0 || begin >= len(payload) {
898 return fmt.Errorf("data has bad offset in payload: %d", begin)
900 t.saveMetadataPiece(piece, payload[begin:])
901 c.lastUsefulChunkReceived = time.Now()
902 return t.maybeCompleteMetadata()
903 case pp.RequestMetadataExtensionMsgType:
904 if !t.haveMetadataPiece(piece) {
905 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
908 start := (1 << 14) * piece
909 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
911 case pp.RejectMetadataExtensionMsgType:
914 return errors.New("unknown msg_type value")
918 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
922 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
925 if _, ok := cl.ipBlockRange(ip); ok {
928 if _, ok := cl.badPeerIPs[ip.String()]; ok {
934 // Return a Torrent ready for insertion into a Client.
935 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
936 // use provided storage, if provided
937 storageClient := cl.defaultStorage
938 if specStorage != nil {
939 storageClient = storage.NewClient(specStorage)
945 peers: prioritizedPeers{
947 getPrio: func(p Peer) peerPriority {
948 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
951 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
953 halfOpen: make(map[string]Peer),
954 pieceStateChanges: pubsub.NewPubSub(),
956 storageOpener: storageClient,
957 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
959 networkingEnabled: true,
961 metadataChanged: sync.Cond{
964 duplicateRequestTimeout: 1 * time.Second,
966 t.logger = cl.logger.Clone().AddValue(t)
967 t.setChunkSize(defaultChunkSize)
971 // A file-like handle to some torrent data resource.
972 type Handle interface {
979 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
980 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
983 // Adds a torrent by InfoHash with a custom Storage implementation.
984 // If the torrent already exists then this Storage is ignored and the
985 // existing torrent returned with `new` set to `false`
986 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
989 t, ok := cl.torrents[infoHash]
995 t = cl.newTorrent(infoHash, specStorage)
996 cl.eachDhtServer(func(s *dht.Server) {
999 cl.torrents[infoHash] = t
1000 cl.clearAcceptLimits()
1001 t.updateWantPeersEvent()
1002 // Tickle Client.waitAccept, new torrent may want conns.
1003 cl.event.Broadcast()
1007 // Add or merge a torrent spec. If the torrent is already present, the
1008 // trackers will be merged with the existing ones. If the Info isn't yet
1009 // known, it will be set. The display name is replaced if the new spec
1010 // provides one. Returns new if the torrent wasn't already in the client.
1011 // Note that any `Storage` defined on the spec will be ignored if the
1012 // torrent is already present (i.e. `new` return value is `true`)
1013 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1014 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1015 if spec.DisplayName != "" {
1016 t.SetDisplayName(spec.DisplayName)
1018 if spec.InfoBytes != nil {
1019 err = t.SetInfoBytes(spec.InfoBytes)
1026 if spec.ChunkSize != 0 {
1027 t.setChunkSize(pp.Integer(spec.ChunkSize))
1029 t.addTrackers(spec.Trackers)
1034 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1035 t, ok := cl.torrents[infoHash]
1037 err = fmt.Errorf("no such torrent")
1044 delete(cl.torrents, infoHash)
1048 func (cl *Client) allTorrentsCompleted() bool {
1049 for _, t := range cl.torrents {
1053 if !t.haveAllPieces() {
1060 // Returns true when all torrents are completely downloaded and false if the
1061 // client is stopped before that.
1062 func (cl *Client) WaitAll() bool {
1065 for !cl.allTorrentsCompleted() {
1066 if cl.closed.IsSet() {
1074 // Returns handles to all the torrents loaded in the Client.
1075 func (cl *Client) Torrents() []*Torrent {
1078 return cl.torrentsAsSlice()
1081 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1082 for _, t := range cl.torrents {
1083 ret = append(ret, t)
1088 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1089 spec, err := TorrentSpecFromMagnetURI(uri)
1093 T, _, err = cl.AddTorrentSpec(spec)
1097 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1098 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1100 slices.MakeInto(&ss, mi.Nodes)
1105 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1106 mi, err := metainfo.LoadFromFile(filename)
1110 return cl.AddTorrent(mi)
1113 func (cl *Client) DhtServers() []*dht.Server {
1114 return cl.dhtServers
1117 func (cl *Client) AddDHTNodes(nodes []string) {
1118 for _, n := range nodes {
1119 hmp := missinggo.SplitHostMaybePort(n)
1120 ip := net.ParseIP(hmp.Host)
1122 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1125 ni := krpc.NodeInfo{
1126 Addr: krpc.NodeAddr{
1131 cl.eachDhtServer(func(s *dht.Server) {
1137 func (cl *Client) banPeerIP(ip net.IP) {
1138 if cl.badPeerIPs == nil {
1139 cl.badPeerIPs = make(map[string]struct{})
1141 cl.badPeerIPs[ip.String()] = struct{}{}
1144 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1150 PeerMaxRequests: 250,
1151 writeBuffer: new(bytes.Buffer),
1153 c.writerCond.L = cl.locker()
1154 c.setRW(connStatsReadWriter{nc, c})
1155 c.r = &rateLimitedReader{
1156 l: cl.config.DownloadRateLimiter,
1162 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1172 Source: peerSourceDHTAnnouncePeer,
1176 func firstNotNil(ips ...net.IP) net.IP {
1177 for _, ip := range ips {
1185 func (cl *Client) eachListener(f func(socket) bool) {
1186 for _, s := range cl.conns {
1193 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1194 cl.eachListener(func(l socket) bool {
1201 func (cl *Client) publicIp(peer net.IP) net.IP {
1202 // TODO: Use BEP 10 to determine how peers are seeing us.
1203 if peer.To4() != nil {
1205 cl.config.PublicIp4,
1206 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1210 cl.config.PublicIp6,
1211 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1216 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1217 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1218 return f(missinggo.AddrIP(l.Addr()))
1222 // Our IP as a peer should see it.
1223 func (cl *Client) publicAddr(peer net.IP) ipPort {
1224 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1227 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1230 cl.eachListener(func(l socket) bool {
1231 ret = append(ret, l.Addr())
1237 func (cl *Client) onBadAccept(addr net.Addr) {
1238 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1239 if cl.acceptLimiter == nil {
1240 cl.acceptLimiter = make(map[ipStr]int)
1242 cl.acceptLimiter[ipStr(ip.String())]++
1245 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1246 if ip4 := ip.To4(); ip4 != nil {
1247 return ip4.Mask(net.CIDRMask(24, 32))
1252 func (cl *Client) clearAcceptLimits() {
1253 cl.acceptLimiter = nil
1256 func (cl *Client) acceptLimitClearer() {
1259 case <-cl.closed.LockedChan(cl.locker()):
1261 case <-time.After(15 * time.Minute):
1263 cl.clearAcceptLimits()
1269 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1270 if cl.config.DisableAcceptRateLimiting {
1273 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1276 func (cl *Client) rLock() {
1280 func (cl *Client) rUnlock() {
1284 func (cl *Client) lock() {
1288 func (cl *Client) unlock() {
1292 func (cl *Client) locker() sync.Locker {
1293 return clientLocker{cl}
1296 type clientLocker struct {
1300 func (cl clientLocker) Lock() {
1304 func (cl clientLocker) Unlock() {