18 "github.com/anacrolix/dht"
19 "github.com/anacrolix/dht/krpc"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/missinggo/pproffd"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/anacrolix/sync"
26 "github.com/dustin/go-humanize"
27 "golang.org/x/time/rate"
29 "github.com/anacrolix/torrent/bencode"
30 "github.com/anacrolix/torrent/iplist"
31 "github.com/anacrolix/torrent/metainfo"
32 "github.com/anacrolix/torrent/mse"
33 pp "github.com/anacrolix/torrent/peer_protocol"
34 "github.com/anacrolix/torrent/storage"
37 // Clients contain zero or more Torrents. A Client manages a blocklist, the
38 // TCP/UDP protocol ports, and DHT as desired.
42 closed missinggo.Event
49 defaultStorage *storage.Client
51 tcpListener net.Listener
54 ipBlockList iplist.Ranger
55 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
56 extensionBytes peerExtensionBytes
57 // The net.Addr.String part that should be common to all active listeners.
59 uploadLimit *rate.Limiter
60 downloadLimit *rate.Limiter
62 // Set of addresses that have our client ID. This intentionally will
63 // include ourselves if we end up trying to connect to our own address
64 // through legitimate channels.
65 dopplegangerAddrs map[string]struct{}
66 badPeerIPs map[string]struct{}
67 torrents map[metainfo.Hash]*Torrent
70 func (cl *Client) BadPeerIPs() []string {
73 return cl.badPeerIPsLocked()
76 func (cl *Client) badPeerIPsLocked() []string {
77 return slices.FromMapKeys(cl.badPeerIPs).([]string)
80 func (cl *Client) IPBlockList() iplist.Ranger {
86 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
91 cl.dHT.SetIPBlockList(list)
95 func (cl *Client) PeerID() PeerID {
99 type torrentAddr string
101 func (torrentAddr) Network() string { return "" }
103 func (me torrentAddr) String() string { return string(me) }
105 func (cl *Client) ListenAddr() net.Addr {
106 if cl.listenAddr == "" {
109 return torrentAddr(cl.listenAddr)
112 // Writes out a human readable status of the client, such as for writing to a
114 func (cl *Client) WriteStatus(_w io.Writer) {
117 w := bufio.NewWriter(_w)
119 if addr := cl.ListenAddr(); addr != nil {
120 fmt.Fprintf(w, "Listening on %s\n", addr)
122 fmt.Fprintln(w, "Not listening!")
124 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126 if dht := cl.DHT(); dht != nil {
127 dhtStats := dht.Stats()
128 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
129 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
130 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
131 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
132 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
157 return NewUtpSocket("udp"+networkSuffix, addr)
160 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
161 return net.Listen("tcp"+networkSuffix, addr)
164 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
166 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
170 listenedAddr = tcpL.Addr().String()
171 utpSock, err = listenUTP(networkSuffix, listenedAddr)
176 if !strings.Contains(err.Error(), "address already in use") {
182 // Listen to enabled protocols, ensuring ports match.
183 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
190 host, port, err = missinggo.ParseHostPort(addr)
195 // If both protocols are active, they need to have the same port.
196 return listenBothSameDynamicPort(networkSuffix, host)
205 tcpL, err = listenTCP(networkSuffix, addr)
214 listenedAddr = tcpL.Addr().String()
217 utpSock, err = listenUTP(networkSuffix, addr)
221 listenedAddr = utpSock.Addr().String()
226 const debugLogValue = "debug"
228 func (cl *Client) debugLogFilter(m *log.Msg) bool {
229 if !cl.config.Debug {
230 _, ok := m.Values()[debugLogValue]
236 func (cl *Client) initLogger() {
237 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
240 // Creates a new client.
241 func NewClient(cfg *Config) (cl *Client, err error) {
244 DHTConfig: dht.ServerConfig{
245 StartingNodes: dht.GlobalBootstrapAddrs,
260 halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
262 dopplegangerAddrs: make(map[string]struct{}),
263 torrents: make(map[metainfo.Hash]*Torrent),
272 if cfg.UploadRateLimiter == nil {
273 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
275 cl.uploadLimit = cfg.UploadRateLimiter
277 if cfg.DownloadRateLimiter == nil {
278 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
280 cl.downloadLimit = cfg.DownloadRateLimiter
282 cl.extensionBytes = defaultPeerExtensionBytes()
284 storageImpl := cfg.DefaultStorage
285 if storageImpl == nil {
286 // We'd use mmap but HFS+ doesn't support sparse files.
287 storageImpl = storage.NewFile(cfg.DataDir)
288 cl.onClose = append(cl.onClose, func() {
289 if err := storageImpl.Close(); err != nil {
290 log.Printf("error closing default storage: %s", err)
294 cl.defaultStorage = storage.NewClient(storageImpl)
295 if cfg.IPBlocklist != nil {
296 cl.ipBlockList = cfg.IPBlocklist
299 if cfg.PeerID != "" {
300 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
302 o := copy(cl.peerID[:], cfg.Bep20)
303 _, err = rand.Read(cl.peerID[o:])
305 panic("error generating peer id")
309 cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
310 !cl.config.DisableTCP,
311 !cl.config.DisableUTP,
313 if cl.config.DisableIPv6 {
319 cl.config.ListenAddr)
324 if cl.tcpListener != nil {
325 go cl.acceptConnections(cl.tcpListener, false)
327 if cl.utpSock != nil {
328 go cl.acceptConnections(cl.utpSock, true)
331 dhtCfg := cfg.DHTConfig
332 if dhtCfg.IPBlocklist == nil {
333 dhtCfg.IPBlocklist = cl.ipBlockList
335 if dhtCfg.Conn == nil {
336 if cl.utpSock != nil {
337 dhtCfg.Conn = cl.utpSock
339 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
345 if dhtCfg.OnAnnouncePeer == nil {
346 dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
348 cl.dHT, err = dht.NewServer(&dhtCfg)
353 if _, err := cl.dHT.Bootstrap(); err != nil {
354 log.Printf("error bootstrapping dht: %s", err)
362 func firstNonEmptyString(ss ...string) string {
363 for _, s := range ss {
371 func (cl *Client) Closed() <-chan struct{} {
377 // Stops the client. All connections to peers are closed and all activity will
379 func (cl *Client) Close() {
386 if cl.utpSock != nil {
389 if cl.tcpListener != nil {
390 cl.tcpListener.Close()
392 for _, t := range cl.torrents {
395 for _, f := range cl.onClose {
401 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
402 if cl.ipBlockList == nil {
405 return cl.ipBlockList.Lookup(ip)
408 func (cl *Client) waitAccept() {
410 for _, t := range cl.torrents {
415 if cl.closed.IsSet() {
422 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
428 conn, err := l.Accept()
429 conn = pproffd.WrapNetConn(conn)
431 if cl.closed.IsSet() {
439 // I think something harsher should happen here? Our accept
440 // routine just fucked off.
449 log.Printf("accepted connection from %s", conn.RemoteAddr())
451 reject := cl.badPeerIPPort(
452 missinggo.AddrIP(conn.RemoteAddr()),
453 missinggo.AddrPort(conn.RemoteAddr()))
456 log.Printf("rejecting connection from %s", conn.RemoteAddr())
462 go cl.incomingConnection(conn, utp)
466 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
468 if tc, ok := nc.(*net.TCPConn); ok {
471 c := cl.newConnection(nc)
472 c.Discovery = peerSourceIncoming
474 cl.runReceivedConn(c)
477 // Returns a handle to the given torrent, if it's present in the client.
478 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
481 t, ok = cl.torrents[ih]
485 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
486 return cl.torrents[ih]
489 type dialResult struct {
494 func countDialResult(err error) {
496 successfulDials.Add(1)
498 unsuccessfulDials.Add(1)
502 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
503 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
504 if ret < minDialTimeout {
510 // Returns whether an address is known to connect to a client with our own ID.
511 func (cl *Client) dopplegangerAddr(addr string) bool {
512 _, ok := cl.dopplegangerAddrs[addr]
516 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
518 // LocalAddr: cl.tcpListener.Addr(),
520 c, err = d.DialContext(ctx, "tcp", addr)
523 c.(*net.TCPConn).SetLinger(0)
525 c = pproffd.WrapNetConn(c)
529 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
530 c, err = cl.utpSock.DialContext(ctx, addr)
536 dialledFirstUtp = expvar.NewInt("dialledFirstUtp")
537 dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
540 // Returns a connection over UTP or TCP, whichever is first to connect.
541 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
542 ctx, cancel := context.WithCancel(ctx)
543 // As soon as we return one connection, cancel the others.
546 resCh := make(chan dialResult, left)
547 if !cl.config.DisableUTP {
550 c, _ := cl.dialUTP(ctx, addr)
551 resCh <- dialResult{c, true}
554 if !cl.config.DisableTCP {
557 c, _ := cl.dialTCP(ctx, addr)
558 resCh <- dialResult{c, false}
562 // Wait for a successful connection.
563 for ; left > 0 && res.Conn == nil; left-- {
567 // There are still incompleted dials.
569 for ; left > 0; left-- {
570 conn := (<-resCh).Conn
581 dialledFirstUtp.Add(1)
583 dialledFirstNotUtp.Add(1)
589 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
590 if _, ok := t.halfOpen[addr]; !ok {
591 panic("invariant broken")
593 delete(t.halfOpen, addr)
597 // Performs initiator handshakes and returns a connection. Returns nil
598 // *connection if no connection for valid reasons.
599 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
600 c = cl.newConnection(nc)
601 c.headerEncrypted = encryptHeader
603 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
605 dl, ok := ctx.Deadline()
609 err = nc.SetDeadline(dl)
613 ok, err = cl.initiateHandshakes(c, t)
621 initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
622 initiatedConnWithFallbackHeaderEncryption = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
625 // Returns nil connection and nil error if no connection could be established
626 // for valid reasons.
627 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
628 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
630 nc, utp := cl.dialFirst(ctx, addr)
634 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
635 c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
637 // log.Printf("error initiating connection handshakes: %s", err)
641 initiatedConnWithPreferredHeaderEncryption.Add(1)
645 if cl.config.ForceEncryption {
646 // We should have just tried with an obfuscated header. A plaintext
647 // header can't result in an encrypted connection, so we're done.
648 if !obfuscatedHeaderFirst {
649 panic(cl.config.EncryptionPolicy)
653 // Try again with encryption if we didn't earlier, or without if we did,
654 // using whichever protocol type worked last time.
656 nc, err = cl.dialUTP(ctx, addr)
658 nc, err = cl.dialTCP(ctx, addr)
661 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
664 c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
665 if err != nil || c == nil {
668 if err == nil && c != nil {
669 initiatedConnWithFallbackHeaderEncryption.Add(1)
674 // Called to dial out and run a connection. The addr we're given is already
675 // considered half-open.
676 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
677 c, err := cl.establishOutgoingConn(t, addr)
680 // Don't release lock between here and addConnection, unless it's for
682 cl.noLongerHalfOpen(t, addr)
685 log.Printf("error establishing outgoing connection: %s", err)
694 cl.runHandshookConn(c, t, true)
697 // The port number for incoming peer connections. 0 if the client isn't
699 func (cl *Client) incomingPeerPort() int {
700 if cl.listenAddr == "" {
703 _, port, err := missinggo.ParseHostPort(cl.listenAddr)
710 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
711 if c.headerEncrypted {
713 rw, err = mse.InitiateHandshake(
722 case cl.config.ForceEncryption:
723 return mse.CryptoMethodRC4
724 case cl.config.DisableEncryption:
725 return mse.CryptoMethodPlaintext
727 return mse.AllSupportedCrypto
736 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
737 if ih != t.infoHash {
743 // Calls f with any secret keys.
744 func (cl *Client) forSkeys(f func([]byte) bool) {
747 for ih := range cl.torrents {
754 // Do encryption and bittorrent handshakes as receiver.
755 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
757 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
760 if err == mse.ErrNoSecretKeyMatch {
765 if cl.config.ForceEncryption && !c.headerEncrypted {
766 err = errors.New("connection not encrypted")
769 ih, ok, err := cl.connBTHandshake(c, nil)
771 err = fmt.Errorf("error during bt handshake: %s", err)
783 // Returns !ok if handshake failed for valid reasons.
784 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
785 res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
786 if err != nil || !ok {
790 c.PeerExtensionBytes = res.peerExtensionBytes
791 c.PeerID = res.PeerID
792 c.completedHandshake = time.Now()
796 func (cl *Client) runReceivedConn(c *connection) {
797 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
801 t, err := cl.receiveHandshakes(c)
804 log.Printf("error receiving handshakes: %s", err)
813 cl.runHandshookConn(c, t, false)
816 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
817 t.reconcileHandshakeStats(c)
818 if c.PeerID == cl.peerID {
821 addr := c.conn.RemoteAddr().String()
822 cl.dopplegangerAddrs[addr] = struct{}{}
824 // Because the remote address is not necessarily the same as its
825 // client's torrent listen address, we won't record the remote address
826 // as a doppleganger. Instead, the initiator can record *us* as the
831 c.conn.SetWriteDeadline(time.Time{})
832 c.r = deadlineReader{c.conn, c.r}
833 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
834 if !t.addConnection(c, outgoing) {
837 defer t.dropConnection(c)
838 go c.writer(time.Minute)
839 cl.sendInitialMessages(c, t)
840 err := c.mainReadLoop()
841 if err != nil && cl.config.Debug {
842 log.Printf("error during connection main read loop: %s", err)
846 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
848 if conn.fastEnabled() {
849 if torrent.haveAllPieces() {
850 conn.Post(pp.Message{Type: pp.HaveAll})
851 conn.sentHaves.AddRange(0, conn.t.NumPieces())
853 } else if !torrent.haveAnyPieces() {
854 conn.Post(pp.Message{Type: pp.HaveNone})
855 conn.sentHaves.Clear()
861 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
862 conn.Post(pp.Message{
864 ExtendedID: pp.HandshakeExtendedID,
865 ExtendedPayload: func() []byte {
866 d := map[string]interface{}{
867 "m": func() (ret map[string]int) {
868 ret = make(map[string]int, 2)
869 ret["ut_metadata"] = metadataExtendedId
870 if !cl.config.DisablePEX {
871 ret["ut_pex"] = pexExtendedId
875 "v": cl.config.ExtendedHandshakeClientVersion,
876 // No upload queue is implemented yet.
879 if !cl.config.DisableEncryption {
882 if torrent.metadataSizeKnown() {
883 d["metadata_size"] = torrent.metadataSize()
885 if p := cl.incomingPeerPort(); p != 0 {
888 yourip, err := addrCompactIP(conn.remoteAddr())
890 log.Printf("error calculating yourip field value in extension handshake: %s", err)
894 // log.Printf("sending %v", d)
895 b, err := bencode.Marshal(d)
903 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
904 conn.Post(pp.Message{
906 Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
911 // Process incoming ut_metadata message.
912 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
914 err := bencode.Unmarshal(payload, &d)
916 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
918 msgType, ok := d["msg_type"]
920 return errors.New("missing msg_type field")
924 case pp.DataMetadataExtensionMsgType:
925 if !c.requestedMetadataPiece(piece) {
926 return fmt.Errorf("got unexpected piece %d", piece)
928 c.metadataRequests[piece] = false
929 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
930 if begin < 0 || begin >= len(payload) {
931 return fmt.Errorf("data has bad offset in payload: %d", begin)
933 t.saveMetadataPiece(piece, payload[begin:])
934 c.stats.ChunksReadUseful++
935 c.t.stats.ChunksReadUseful++
936 c.lastUsefulChunkReceived = time.Now()
937 return t.maybeCompleteMetadata()
938 case pp.RequestMetadataExtensionMsgType:
939 if !t.haveMetadataPiece(piece) {
940 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
943 start := (1 << 14) * piece
944 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
946 case pp.RejectMetadataExtensionMsgType:
949 return errors.New("unknown msg_type value")
953 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
957 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
960 if _, ok := cl.ipBlockRange(ip); ok {
963 if _, ok := cl.badPeerIPs[ip.String()]; ok {
969 // Return a Torrent ready for insertion into a Client.
970 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
971 // use provided storage, if provided
972 storageClient := cl.defaultStorage
973 if specStorage != nil {
974 storageClient = storage.NewClient(specStorage)
980 peers: make(map[peersKey]Peer),
981 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
983 halfOpen: make(map[string]Peer),
984 pieceStateChanges: pubsub.NewPubSub(),
986 storageOpener: storageClient,
987 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
989 networkingEnabled: true,
991 metadataChanged: sync.Cond{
995 t.logger = cl.logger.Clone().AddValue(t)
996 t.setChunkSize(defaultChunkSize)
1000 // A file-like handle to some torrent data resource.
1001 type Handle interface {
1008 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1009 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1012 // Adds a torrent by InfoHash with a custom Storage implementation.
1013 // If the torrent already exists then this Storage is ignored and the
1014 // existing torrent returned with `new` set to `false`
1015 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1017 defer cl.mu.Unlock()
1018 t, ok := cl.torrents[infoHash]
1023 t = cl.newTorrent(infoHash, specStorage)
1027 cl.torrents[infoHash] = t
1028 t.updateWantPeersEvent()
1029 // Tickle Client.waitAccept, new torrent may want conns.
1030 cl.event.Broadcast()
1034 // Add or merge a torrent spec. If the torrent is already present, the
1035 // trackers will be merged with the existing ones. If the Info isn't yet
1036 // known, it will be set. The display name is replaced if the new spec
1037 // provides one. Returns new if the torrent wasn't already in the client.
1038 // Note that any `Storage` defined on the spec will be ignored if the
1039 // torrent is already present (i.e. `new` return value is `true`)
1040 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1041 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1042 if spec.DisplayName != "" {
1043 t.SetDisplayName(spec.DisplayName)
1045 if spec.InfoBytes != nil {
1046 err = t.SetInfoBytes(spec.InfoBytes)
1052 defer cl.mu.Unlock()
1053 if spec.ChunkSize != 0 {
1054 t.setChunkSize(pp.Integer(spec.ChunkSize))
1056 t.addTrackers(spec.Trackers)
1061 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1062 t, ok := cl.torrents[infoHash]
1064 err = fmt.Errorf("no such torrent")
1071 delete(cl.torrents, infoHash)
1075 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1076 _url, err := url.Parse(announceURL)
1080 hmp := missinggo.SplitHostMaybePort(_url.Host)
1085 addr, err := net.ResolveIPAddr("ip", hmp.Host)
1090 _, blocked = cl.ipBlockRange(addr.IP)
1093 hmp.Host = addr.String()
1094 _url.Host = hmp.String()
1095 urlToUse = _url.String()
1099 func (cl *Client) allTorrentsCompleted() bool {
1100 for _, t := range cl.torrents {
1104 if !t.haveAllPieces() {
1111 // Returns true when all torrents are completely downloaded and false if the
1112 // client is stopped before that.
1113 func (cl *Client) WaitAll() bool {
1115 defer cl.mu.Unlock()
1116 for !cl.allTorrentsCompleted() {
1117 if cl.closed.IsSet() {
1125 // Returns handles to all the torrents loaded in the Client.
1126 func (cl *Client) Torrents() []*Torrent {
1128 defer cl.mu.Unlock()
1129 return cl.torrentsAsSlice()
1132 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1133 for _, t := range cl.torrents {
1134 ret = append(ret, t)
1139 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1140 spec, err := TorrentSpecFromMagnetURI(uri)
1144 T, _, err = cl.AddTorrentSpec(spec)
1148 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1149 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1151 slices.MakeInto(&ss, mi.Nodes)
1156 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1157 mi, err := metainfo.LoadFromFile(filename)
1161 return cl.AddTorrent(mi)
1164 func (cl *Client) DHT() *dht.Server {
1168 func (cl *Client) AddDHTNodes(nodes []string) {
1169 if cl.DHT() == nil {
1172 for _, n := range nodes {
1173 hmp := missinggo.SplitHostMaybePort(n)
1174 ip := net.ParseIP(hmp.Host)
1176 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1179 ni := krpc.NodeInfo{
1185 cl.DHT().AddNode(ni)
1189 func (cl *Client) banPeerIP(ip net.IP) {
1190 if cl.badPeerIPs == nil {
1191 cl.badPeerIPs = make(map[string]struct{})
1193 cl.badPeerIPs[ip.String()] = struct{}{}
1196 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1201 PeerMaxRequests: 250,
1202 writeBuffer: new(bytes.Buffer),
1204 c.writerCond.L = &cl.mu
1205 c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1206 c.r = &rateLimitedReader{
1207 l: cl.downloadLimit,
1213 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1215 defer cl.mu.Unlock()
1223 Source: peerSourceDHTAnnouncePeer,