17 "github.com/anacrolix/missinggo/perf"
19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
44 closed missinggo.Event
51 defaultStorage *storage.Client
54 dhtServers []*dht.Server
55 ipBlockList iplist.Ranger
56 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
57 extensionBytes peerExtensionBytes
59 // Set of addresses that have our client ID. This intentionally will
60 // include ourselves if we end up trying to connect to our own address
61 // through legitimate channels.
62 dopplegangerAddrs map[string]struct{}
63 badPeerIPs map[string]struct{}
64 torrents map[metainfo.Hash]*Torrent
65 // An aggregate of stats over all connections.
68 acceptLimiter map[ipStr]int
73 func (cl *Client) BadPeerIPs() []string {
76 return cl.badPeerIPsLocked()
79 func (cl *Client) badPeerIPsLocked() []string {
80 return slices.FromMapKeys(cl.badPeerIPs).([]string)
83 func (cl *Client) PeerID() PeerID {
87 type torrentAddr string
89 func (torrentAddr) Network() string { return "" }
91 func (me torrentAddr) String() string { return string(me) }
93 func (cl *Client) LocalPort() (port int) {
94 cl.eachListener(func(l socket) bool {
95 _port := missinggo.AddrPort(l.Addr())
101 } else if port != _port {
102 panic("mismatched ports")
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
117 // Writes out a human readable status of the client, such as for writing to a
119 func (cl *Client) WriteStatus(_w io.Writer) {
122 w := bufio.NewWriter(_w)
124 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s *dht.Server) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
145 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147 w.WriteString("<missing metainfo>")
155 const debugLogValue = "debug"
157 func (cl *Client) debugLogFilter(m *log.Msg) bool {
158 if !cl.config.Debug {
159 _, ok := m.Values()[debugLogValue]
165 func (cl *Client) initLogger() {
166 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
183 halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 log.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
230 for _, s := range cl.conns {
231 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
232 go cl.acceptConnections(s)
238 for _, s := range cl.conns {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, ds)
252 func (cl *Client) enabledPeerNetworks() (ns []string) {
253 for _, n := range allPeerNetworks {
254 if peerNetworkEnabled(n, cl.config) {
261 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
262 cfg := dht.ServerConfig{
263 IPBlocklist: cl.ipBlockList,
265 OnAnnouncePeer: cl.onDHTAnnouncePeer,
266 PublicIP: func() net.IP {
267 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
268 return cl.config.PublicIp6
270 return cl.config.PublicIp4
272 StartingNodes: cl.config.DhtStartingNodes,
274 s, err = dht.NewServer(&cfg)
277 if _, err := s.Bootstrap(); err != nil {
278 log.Printf("error bootstrapping dht: %s", err)
285 func firstNonEmptyString(ss ...string) string {
286 for _, s := range ss {
294 func (cl *Client) Closed() <-chan struct{} {
300 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
301 for _, ds := range cl.dhtServers {
306 func (cl *Client) closeSockets() {
307 cl.eachListener(func(l socket) bool {
314 // Stops the client. All connections to peers are closed and all activity will
316 func (cl *Client) Close() {
320 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
322 for _, t := range cl.torrents {
325 for _, f := range cl.onClose {
331 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
332 if cl.ipBlockList == nil {
335 return cl.ipBlockList.Lookup(ip)
338 func (cl *Client) ipIsBlocked(ip net.IP) bool {
339 _, blocked := cl.ipBlockRange(ip)
343 func (cl *Client) waitAccept() {
345 for _, t := range cl.torrents {
350 if cl.closed.IsSet() {
357 func (cl *Client) rejectAccepted(conn net.Conn) bool {
358 ra := conn.RemoteAddr()
359 rip := missinggo.AddrIP(ra)
360 if cl.config.DisableIPv4Peers && rip.To4() != nil {
363 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
366 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
369 if cl.rateLimitAccept(rip) {
372 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
375 func (cl *Client) acceptConnections(l net.Listener) {
377 conn, err := l.Accept()
378 conn = pproffd.WrapNetConn(conn)
380 closed := cl.closed.IsSet()
383 reject = cl.rejectAccepted(conn)
394 // I think something harsher should happen here? Our accept
395 // routine just fucked off.
400 torrent.Add("rejected accepted connections", 1)
403 go cl.incomingConnection(conn)
405 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
406 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
407 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
408 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
413 func (cl *Client) incomingConnection(nc net.Conn) {
415 if tc, ok := nc.(*net.TCPConn); ok {
418 c := cl.newConnection(nc, false)
419 c.Discovery = peerSourceIncoming
420 cl.runReceivedConn(c)
423 // Returns a handle to the given torrent, if it's present in the client.
424 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
427 t, ok = cl.torrents[ih]
431 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
432 return cl.torrents[ih]
435 type dialResult struct {
439 func countDialResult(err error) {
441 torrent.Add("successful dials", 1)
443 torrent.Add("unsuccessful dials", 1)
447 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
448 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
449 if ret < minDialTimeout {
455 // Returns whether an address is known to connect to a client with our own ID.
456 func (cl *Client) dopplegangerAddr(addr string) bool {
457 _, ok := cl.dopplegangerAddrs[addr]
461 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
463 case allowIpv4 && allowIpv6:
465 case allowIpv4 && !allowIpv6:
467 case !allowIpv4 && allowIpv6:
470 panic("unhandled ip network combination")
474 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
475 return sock.DialContext(ctx, "", addr)
478 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
480 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
481 c := func(s string) bool {
482 return strings.Contains(network, s)
485 if c("udp") || c("utp") {
489 if cfg.DisableTCP && c("tcp") {
492 if cfg.DisableIPv6 && c("6") {
498 // Returns a connection over UTP or TCP, whichever is first to connect.
499 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
500 ctx, cancel := context.WithCancel(ctx)
501 // As soon as we return one connection, cancel the others.
504 resCh := make(chan dialResult, left)
505 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
508 c, err := f(ctx, addr)
509 // This is a bit optimistic, but it looks non-trivial to thread
510 // this through the proxy code. Set it now in case we close the
511 // connection forthwith.
512 if tc, ok := c.(*net.TCPConn); ok {
516 resCh <- dialResult{c}
522 cl.eachListener(func(s socket) bool {
523 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
530 // Wait for a successful connection.
532 defer perf.ScopeTimer()()
533 for ; left > 0 && res.Conn == nil; left-- {
537 // There are still incompleted dials.
539 for ; left > 0; left-- {
540 conn := (<-resCh).Conn
547 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
552 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
553 if _, ok := t.halfOpen[addr]; !ok {
554 panic("invariant broken")
556 delete(t.halfOpen, addr)
560 // Performs initiator handshakes and returns a connection. Returns nil
561 // *connection if no connection for valid reasons.
562 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
563 c = cl.newConnection(nc, true)
564 c.headerEncrypted = encryptHeader
565 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
567 dl, ok := ctx.Deadline()
571 err = nc.SetDeadline(dl)
575 ok, err = cl.initiateHandshakes(c, t)
582 // Returns nil connection and nil error if no connection could be established
583 // for valid reasons.
584 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
585 nc := cl.dialFirst(ctx, addr)
590 if c == nil || err != nil {
594 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
597 // Returns nil connection and nil error if no connection could be established
598 // for valid reasons.
599 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
600 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
602 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
603 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
608 torrent.Add("initiated conn with preferred header obfuscation", 1)
611 if cl.config.ForceEncryption {
612 // We should have just tried with an obfuscated header. A plaintext
613 // header can't result in an encrypted connection, so we're done.
614 if !obfuscatedHeaderFirst {
615 panic(cl.config.EncryptionPolicy)
619 // Try again with encryption if we didn't earlier, or without if we did.
620 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
622 torrent.Add("initiated conn with fallback header obfuscation", 1)
627 // Called to dial out and run a connection. The addr we're given is already
628 // considered half-open.
629 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
630 c, err := cl.establishOutgoingConn(t, addr)
633 // Don't release lock between here and addConnection, unless it's for
635 cl.noLongerHalfOpen(t, addr)
638 log.Printf("error establishing outgoing connection: %s", err)
647 cl.runHandshookConn(c, t)
650 // The port number for incoming peer connections. 0 if the client isn't
652 func (cl *Client) incomingPeerPort() int {
653 return cl.LocalPort()
656 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
657 if c.headerEncrypted {
659 rw, c.cryptoMethod, err = mse.InitiateHandshake(
666 func() mse.CryptoMethod {
668 case cl.config.ForceEncryption:
669 return mse.CryptoMethodRC4
670 case cl.config.DisableEncryption:
671 return mse.CryptoMethodPlaintext
673 return mse.AllSupportedCrypto
682 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
683 if ih != t.infoHash {
689 // Calls f with any secret keys.
690 func (cl *Client) forSkeys(f func([]byte) bool) {
693 for ih := range cl.torrents {
700 // Do encryption and bittorrent handshakes as receiver.
701 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
702 defer perf.ScopeTimerErr(&err)()
704 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
707 if err == mse.ErrNoSecretKeyMatch {
712 if cl.config.ForceEncryption && !c.headerEncrypted {
713 err = errors.New("connection not encrypted")
716 ih, ok, err := cl.connBTHandshake(c, nil)
718 err = fmt.Errorf("error during bt handshake: %s", err)
730 // Returns !ok if handshake failed for valid reasons.
731 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
732 res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
733 if err != nil || !ok {
737 c.PeerExtensionBytes = res.peerExtensionBytes
738 c.PeerID = res.PeerID
739 c.completedHandshake = time.Now()
743 func (cl *Client) runReceivedConn(c *connection) {
744 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
748 t, err := cl.receiveHandshakes(c)
751 "error receiving handshakes: %s", err,
755 "network", c.remoteAddr().Network(),
757 torrent.Add("error receiving handshake", 1)
759 cl.onBadAccept(c.remoteAddr())
764 torrent.Add("received handshake for unloaded torrent", 1)
766 cl.onBadAccept(c.remoteAddr())
770 torrent.Add("received handshake for loaded torrent", 1)
773 cl.runHandshookConn(c, t)
776 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
778 if c.PeerID == cl.peerID {
781 addr := c.conn.RemoteAddr().String()
782 cl.dopplegangerAddrs[addr] = struct{}{}
784 // Because the remote address is not necessarily the same as its
785 // client's torrent listen address, we won't record the remote address
786 // as a doppleganger. Instead, the initiator can record *us* as the
791 c.conn.SetWriteDeadline(time.Time{})
792 c.r = deadlineReader{c.conn, c.r}
793 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
794 if connIsIpv6(c.conn) {
795 torrent.Add("completed handshake over ipv6", 1)
797 if err := t.addConnection(c); err != nil {
798 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
801 defer t.dropConnection(c)
802 go c.writer(time.Minute)
803 cl.sendInitialMessages(c, t)
804 err := c.mainReadLoop()
805 if err != nil && cl.config.Debug {
806 log.Printf("error during connection main read loop: %s", err)
810 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
812 if conn.fastEnabled() {
813 if torrent.haveAllPieces() {
814 conn.Post(pp.Message{Type: pp.HaveAll})
815 conn.sentHaves.AddRange(0, conn.t.NumPieces())
817 } else if !torrent.haveAnyPieces() {
818 conn.Post(pp.Message{Type: pp.HaveNone})
819 conn.sentHaves.Clear()
825 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
826 conn.Post(pp.Message{
828 ExtendedID: pp.HandshakeExtendedID,
829 ExtendedPayload: func() []byte {
830 d := map[string]interface{}{
831 "m": func() (ret map[string]int) {
832 ret = make(map[string]int, 2)
833 ret["ut_metadata"] = metadataExtendedId
834 if !cl.config.DisablePEX {
835 ret["ut_pex"] = pexExtendedId
839 "v": cl.config.ExtendedHandshakeClientVersion,
840 // No upload queue is implemented yet.
843 if !cl.config.DisableEncryption {
846 if torrent.metadataSizeKnown() {
847 d["metadata_size"] = torrent.metadataSize()
849 if p := cl.incomingPeerPort(); p != 0 {
852 yourip, err := addrCompactIP(conn.remoteAddr())
854 log.Printf("error calculating yourip field value in extension handshake: %s", err)
858 // log.Printf("sending %v", d)
859 b, err := bencode.Marshal(d)
867 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
868 conn.Post(pp.Message{
875 func (cl *Client) dhtPort() (ret uint16) {
876 cl.eachDhtServer(func(s *dht.Server) {
877 ret = uint16(missinggo.AddrPort(s.Addr()))
882 func (cl *Client) haveDhtServer() (ret bool) {
883 cl.eachDhtServer(func(_ *dht.Server) {
889 // Process incoming ut_metadata message.
890 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
892 err := bencode.Unmarshal(payload, &d)
893 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
894 } else if err != nil {
895 return fmt.Errorf("error unmarshalling bencode: %s", err)
897 msgType, ok := d["msg_type"]
899 return errors.New("missing msg_type field")
903 case pp.DataMetadataExtensionMsgType:
904 if !c.requestedMetadataPiece(piece) {
905 return fmt.Errorf("got unexpected piece %d", piece)
907 c.metadataRequests[piece] = false
908 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
909 if begin < 0 || begin >= len(payload) {
910 return fmt.Errorf("data has bad offset in payload: %d", begin)
912 t.saveMetadataPiece(piece, payload[begin:])
913 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
914 c.lastUsefulChunkReceived = time.Now()
915 return t.maybeCompleteMetadata()
916 case pp.RequestMetadataExtensionMsgType:
917 if !t.haveMetadataPiece(piece) {
918 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
921 start := (1 << 14) * piece
922 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
924 case pp.RejectMetadataExtensionMsgType:
927 return errors.New("unknown msg_type value")
931 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
935 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
938 if _, ok := cl.ipBlockRange(ip); ok {
941 if _, ok := cl.badPeerIPs[ip.String()]; ok {
947 // Return a Torrent ready for insertion into a Client.
948 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
949 // use provided storage, if provided
950 storageClient := cl.defaultStorage
951 if specStorage != nil {
952 storageClient = storage.NewClient(specStorage)
958 peers: prioritizedPeers{
960 getPrio: func(p Peer) peerPriority {
961 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
964 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
966 halfOpen: make(map[string]Peer),
967 pieceStateChanges: pubsub.NewPubSub(),
969 storageOpener: storageClient,
970 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
972 networkingEnabled: true,
974 metadataChanged: sync.Cond{
978 t.logger = cl.logger.Clone().AddValue(t)
979 t.setChunkSize(defaultChunkSize)
983 // A file-like handle to some torrent data resource.
984 type Handle interface {
991 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
992 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
995 // Adds a torrent by InfoHash with a custom Storage implementation.
996 // If the torrent already exists then this Storage is ignored and the
997 // existing torrent returned with `new` set to `false`
998 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1000 defer cl.mu.Unlock()
1001 t, ok := cl.torrents[infoHash]
1007 t = cl.newTorrent(infoHash, specStorage)
1008 cl.eachDhtServer(func(s *dht.Server) {
1009 go t.dhtAnnouncer(s)
1011 cl.torrents[infoHash] = t
1012 cl.clearAcceptLimits()
1013 t.updateWantPeersEvent()
1014 // Tickle Client.waitAccept, new torrent may want conns.
1015 cl.event.Broadcast()
1019 // Add or merge a torrent spec. If the torrent is already present, the
1020 // trackers will be merged with the existing ones. If the Info isn't yet
1021 // known, it will be set. The display name is replaced if the new spec
1022 // provides one. Returns new if the torrent wasn't already in the client.
1023 // Note that any `Storage` defined on the spec will be ignored if the
1024 // torrent is already present (i.e. `new` return value is `true`)
1025 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1026 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1027 if spec.DisplayName != "" {
1028 t.SetDisplayName(spec.DisplayName)
1030 if spec.InfoBytes != nil {
1031 err = t.SetInfoBytes(spec.InfoBytes)
1037 defer cl.mu.Unlock()
1038 if spec.ChunkSize != 0 {
1039 t.setChunkSize(pp.Integer(spec.ChunkSize))
1041 t.addTrackers(spec.Trackers)
1046 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1047 t, ok := cl.torrents[infoHash]
1049 err = fmt.Errorf("no such torrent")
1056 delete(cl.torrents, infoHash)
1060 func (cl *Client) allTorrentsCompleted() bool {
1061 for _, t := range cl.torrents {
1065 if !t.haveAllPieces() {
1072 // Returns true when all torrents are completely downloaded and false if the
1073 // client is stopped before that.
1074 func (cl *Client) WaitAll() bool {
1076 defer cl.mu.Unlock()
1077 for !cl.allTorrentsCompleted() {
1078 if cl.closed.IsSet() {
1086 // Returns handles to all the torrents loaded in the Client.
1087 func (cl *Client) Torrents() []*Torrent {
1089 defer cl.mu.Unlock()
1090 return cl.torrentsAsSlice()
1093 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1094 for _, t := range cl.torrents {
1095 ret = append(ret, t)
1100 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1101 spec, err := TorrentSpecFromMagnetURI(uri)
1105 T, _, err = cl.AddTorrentSpec(spec)
1109 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1110 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1112 slices.MakeInto(&ss, mi.Nodes)
1117 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1118 mi, err := metainfo.LoadFromFile(filename)
1122 return cl.AddTorrent(mi)
1125 func (cl *Client) DhtServers() []*dht.Server {
1126 return cl.dhtServers
1129 func (cl *Client) AddDHTNodes(nodes []string) {
1130 for _, n := range nodes {
1131 hmp := missinggo.SplitHostMaybePort(n)
1132 ip := net.ParseIP(hmp.Host)
1134 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1137 ni := krpc.NodeInfo{
1138 Addr: krpc.NodeAddr{
1143 cl.eachDhtServer(func(s *dht.Server) {
1149 func (cl *Client) banPeerIP(ip net.IP) {
1150 if cl.badPeerIPs == nil {
1151 cl.badPeerIPs = make(map[string]struct{})
1153 cl.badPeerIPs[ip.String()] = struct{}{}
1156 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1162 PeerMaxRequests: 250,
1163 writeBuffer: new(bytes.Buffer),
1165 c.writerCond.L = &cl.mu
1166 c.setRW(connStatsReadWriter{nc, c})
1167 c.r = &rateLimitedReader{
1168 l: cl.config.DownloadRateLimiter,
1174 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1176 defer cl.mu.Unlock()
1184 Source: peerSourceDHTAnnouncePeer,
1188 func firstNotNil(ips ...net.IP) net.IP {
1189 for _, ip := range ips {
1197 func (cl *Client) eachListener(f func(socket) bool) {
1198 for _, s := range cl.conns {
1205 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1206 cl.eachListener(func(l socket) bool {
1213 func (cl *Client) publicIp(peer net.IP) net.IP {
1214 // TODO: Use BEP 10 to determine how peers are seeing us.
1215 if peer.To4() != nil {
1217 cl.config.PublicIp4,
1218 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1222 cl.config.PublicIp6,
1223 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1228 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1229 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1230 return f(missinggo.AddrIP(l.Addr()))
1234 // Our IP as a peer should see it.
1235 func (cl *Client) publicAddr(peer net.IP) ipPort {
1236 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1239 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1241 defer cl.mu.Unlock()
1242 cl.eachListener(func(l socket) bool {
1243 ret = append(ret, l.Addr())
1249 func (cl *Client) onBadAccept(addr net.Addr) {
1250 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1251 if cl.acceptLimiter == nil {
1252 cl.acceptLimiter = make(map[ipStr]int)
1254 cl.acceptLimiter[ipStr(ip.String())]++
1257 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1258 if ip4 := ip.To4(); ip4 != nil {
1259 return ip4.Mask(net.CIDRMask(24, 32))
1264 func (cl *Client) clearAcceptLimits() {
1265 cl.acceptLimiter = nil
1268 func (cl *Client) acceptLimitClearer() {
1271 case <-cl.closed.LockedChan(&cl.mu):
1273 case <-time.After(5 * time.Minute):
1275 cl.clearAcceptLimits()
1281 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1282 if cl.config.DisableAcceptRateLimiting {
1285 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] >= 3