17 "github.com/anacrolix/missinggo/perf"
19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
44 closed missinggo.Event
51 defaultStorage *storage.Client
54 dhtServers []*dht.Server
55 ipBlockList iplist.Ranger
56 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
57 extensionBytes peerExtensionBytes
59 // Set of addresses that have our client ID. This intentionally will
60 // include ourselves if we end up trying to connect to our own address
61 // through legitimate channels.
62 dopplegangerAddrs map[string]struct{}
63 badPeerIPs map[string]struct{}
64 torrents map[metainfo.Hash]*Torrent
65 // An aggregate of stats over all connections.
68 acceptLimiter map[ipStr]int
73 func (cl *Client) BadPeerIPs() []string {
76 return cl.badPeerIPsLocked()
79 func (cl *Client) badPeerIPsLocked() []string {
80 return slices.FromMapKeys(cl.badPeerIPs).([]string)
83 func (cl *Client) PeerID() PeerID {
87 type torrentAddr string
89 func (torrentAddr) Network() string { return "" }
91 func (me torrentAddr) String() string { return string(me) }
93 func (cl *Client) LocalPort() (port int) {
94 cl.eachListener(func(l socket) bool {
95 _port := missinggo.AddrPort(l.Addr())
101 } else if port != _port {
102 panic("mismatched ports")
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
117 // Writes out a human readable status of the client, such as for writing to a
119 func (cl *Client) WriteStatus(_w io.Writer) {
122 w := bufio.NewWriter(_w)
124 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s *dht.Server) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
145 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147 w.WriteString("<missing metainfo>")
155 const debugLogValue = "debug"
157 func (cl *Client) debugLogFilter(m *log.Msg) bool {
158 if !cl.config.Debug {
159 _, ok := m.Values()[debugLogValue]
165 func (cl *Client) initLogger() {
166 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
183 halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 log.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
230 for _, s := range cl.conns {
231 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
232 go cl.acceptConnections(s)
238 for _, s := range cl.conns {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, ds)
252 func (cl *Client) enabledPeerNetworks() (ns []string) {
253 for _, n := range allPeerNetworks {
254 if peerNetworkEnabled(n, cl.config) {
261 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
262 cfg := dht.ServerConfig{
263 IPBlocklist: cl.ipBlockList,
265 OnAnnouncePeer: cl.onDHTAnnouncePeer,
266 PublicIP: func() net.IP {
267 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
268 return cl.config.PublicIp6
270 return cl.config.PublicIp4
272 StartingNodes: cl.config.DhtStartingNodes,
274 s, err = dht.NewServer(&cfg)
277 if _, err := s.Bootstrap(); err != nil {
278 log.Printf("error bootstrapping dht: %s", err)
285 func firstNonEmptyString(ss ...string) string {
286 for _, s := range ss {
294 func (cl *Client) Closed() <-chan struct{} {
300 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
301 for _, ds := range cl.dhtServers {
306 func (cl *Client) closeSockets() {
307 cl.eachListener(func(l socket) bool {
314 // Stops the client. All connections to peers are closed and all activity will
316 func (cl *Client) Close() {
320 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
322 for _, t := range cl.torrents {
325 for _, f := range cl.onClose {
331 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
332 if cl.ipBlockList == nil {
335 return cl.ipBlockList.Lookup(ip)
338 func (cl *Client) ipIsBlocked(ip net.IP) bool {
339 _, blocked := cl.ipBlockRange(ip)
343 func (cl *Client) waitAccept() {
345 for _, t := range cl.torrents {
350 if cl.closed.IsSet() {
357 func (cl *Client) rejectAccepted(conn net.Conn) bool {
358 ra := conn.RemoteAddr()
359 rip := missinggo.AddrIP(ra)
360 if cl.config.DisableIPv4Peers && rip.To4() != nil {
363 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
366 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
369 if cl.rateLimitAccept(rip) {
372 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
375 func (cl *Client) acceptConnections(l net.Listener) {
377 conn, err := l.Accept()
378 conn = pproffd.WrapNetConn(conn)
380 closed := cl.closed.IsSet()
383 reject = cl.rejectAccepted(conn)
394 // I think something harsher should happen here? Our accept
395 // routine just fucked off.
400 torrent.Add("rejected accepted connections", 1)
403 go cl.incomingConnection(conn)
405 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
406 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
407 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
408 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
413 func (cl *Client) incomingConnection(nc net.Conn) {
415 if tc, ok := nc.(*net.TCPConn); ok {
418 c := cl.newConnection(nc, false)
419 c.Discovery = peerSourceIncoming
420 cl.runReceivedConn(c)
423 // Returns a handle to the given torrent, if it's present in the client.
424 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
427 t, ok = cl.torrents[ih]
431 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
432 return cl.torrents[ih]
435 type dialResult struct {
439 func countDialResult(err error) {
441 torrent.Add("successful dials", 1)
443 torrent.Add("unsuccessful dials", 1)
447 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
448 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
449 if ret < minDialTimeout {
455 // Returns whether an address is known to connect to a client with our own ID.
456 func (cl *Client) dopplegangerAddr(addr string) bool {
457 _, ok := cl.dopplegangerAddrs[addr]
461 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
463 case allowIpv4 && allowIpv6:
465 case allowIpv4 && !allowIpv6:
467 case !allowIpv4 && allowIpv6:
470 panic("unhandled ip network combination")
474 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
475 return sock.DialContext(ctx, "", addr)
478 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
480 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
481 c := func(s string) bool {
482 return strings.Contains(network, s)
485 if c("udp") || c("utp") {
489 if cfg.DisableTCP && c("tcp") {
492 if cfg.DisableIPv6 && c("6") {
498 // Returns a connection over UTP or TCP, whichever is first to connect.
499 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
500 ctx, cancel := context.WithCancel(ctx)
501 // As soon as we return one connection, cancel the others.
504 resCh := make(chan dialResult, left)
505 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
508 c, err := f(ctx, addr)
509 // This is a bit optimistic, but it looks non-trivial to thread
510 // this through the proxy code. Set it now in case we close the
511 // connection forthwith.
512 if tc, ok := c.(*net.TCPConn); ok {
516 resCh <- dialResult{c}
522 cl.eachListener(func(s socket) bool {
523 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
530 // Wait for a successful connection.
532 defer perf.ScopeTimer()()
533 for ; left > 0 && res.Conn == nil; left-- {
537 // There are still incompleted dials.
539 for ; left > 0; left-- {
540 conn := (<-resCh).Conn
547 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
552 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
553 if _, ok := t.halfOpen[addr]; !ok {
554 panic("invariant broken")
556 delete(t.halfOpen, addr)
560 // Performs initiator handshakes and returns a connection. Returns nil
561 // *connection if no connection for valid reasons.
562 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
563 c = cl.newConnection(nc, true)
564 c.headerEncrypted = encryptHeader
565 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
567 dl, ok := ctx.Deadline()
571 err = nc.SetDeadline(dl)
575 ok, err = cl.initiateHandshakes(c, t)
582 // Returns nil connection and nil error if no connection could be established
583 // for valid reasons.
584 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
585 nc := cl.dialFirst(ctx, addr)
590 if c == nil || err != nil {
594 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
597 // Returns nil connection and nil error if no connection could be established
598 // for valid reasons.
599 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
600 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
602 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
603 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
608 torrent.Add("initiated conn with preferred header obfuscation", 1)
611 if cl.config.ForceEncryption {
612 // We should have just tried with an obfuscated header. A plaintext
613 // header can't result in an encrypted connection, so we're done.
614 if !obfuscatedHeaderFirst {
615 panic(cl.config.EncryptionPolicy)
619 // Try again with encryption if we didn't earlier, or without if we did.
620 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
622 torrent.Add("initiated conn with fallback header obfuscation", 1)
627 // Called to dial out and run a connection. The addr we're given is already
628 // considered half-open.
629 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
630 c, err := cl.establishOutgoingConn(t, addr)
633 // Don't release lock between here and addConnection, unless it's for
635 cl.noLongerHalfOpen(t, addr)
638 log.Printf("error establishing outgoing connection: %s", err)
647 cl.runHandshookConn(c, t)
650 // The port number for incoming peer connections. 0 if the client isn't
652 func (cl *Client) incomingPeerPort() int {
653 return cl.LocalPort()
656 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
657 if c.headerEncrypted {
659 rw, c.cryptoMethod, err = mse.InitiateHandshake(
666 func() mse.CryptoMethod {
668 case cl.config.ForceEncryption:
669 return mse.CryptoMethodRC4
670 case cl.config.DisableEncryption:
671 return mse.CryptoMethodPlaintext
673 return mse.AllSupportedCrypto
682 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
683 if ih != t.infoHash {
689 // Calls f with any secret keys.
690 func (cl *Client) forSkeys(f func([]byte) bool) {
693 for ih := range cl.torrents {
700 // Do encryption and bittorrent handshakes as receiver.
701 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
702 defer perf.ScopeTimerErr(&err)()
704 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
707 if err == mse.ErrNoSecretKeyMatch {
712 if cl.config.ForceEncryption && !c.headerEncrypted {
713 err = errors.New("connection not encrypted")
716 ih, ok, err := cl.connBTHandshake(c, nil)
718 err = fmt.Errorf("error during bt handshake: %s", err)
730 // Returns !ok if handshake failed for valid reasons.
731 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
732 res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
733 if err != nil || !ok {
737 c.PeerExtensionBytes = res.peerExtensionBytes
738 c.PeerID = res.PeerID
739 c.completedHandshake = time.Now()
743 func (cl *Client) runReceivedConn(c *connection) {
744 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
748 t, err := cl.receiveHandshakes(c)
751 "error receiving handshakes: %s", err,
755 "network", c.remoteAddr().Network(),
757 torrent.Add("error receiving handshake", 1)
759 cl.onBadAccept(c.remoteAddr())
764 torrent.Add("received handshake for unloaded torrent", 1)
766 cl.onBadAccept(c.remoteAddr())
770 torrent.Add("received handshake for loaded torrent", 1)
773 cl.runHandshookConn(c, t)
776 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
778 if c.PeerID == cl.peerID {
781 addr := c.conn.RemoteAddr().String()
782 cl.dopplegangerAddrs[addr] = struct{}{}
784 // Because the remote address is not necessarily the same as its
785 // client's torrent listen address, we won't record the remote address
786 // as a doppleganger. Instead, the initiator can record *us* as the
791 c.conn.SetWriteDeadline(time.Time{})
792 c.r = deadlineReader{c.conn, c.r}
793 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
794 if connIsIpv6(c.conn) {
795 torrent.Add("completed handshake over ipv6", 1)
797 if err := t.addConnection(c); err != nil {
798 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
801 defer t.dropConnection(c)
802 go c.writer(time.Minute)
803 cl.sendInitialMessages(c, t)
804 err := c.mainReadLoop()
805 if err != nil && cl.config.Debug {
806 log.Printf("error during connection main read loop: %s", err)
810 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
812 if conn.fastEnabled() {
813 if torrent.haveAllPieces() {
814 conn.Post(pp.Message{Type: pp.HaveAll})
815 conn.sentHaves.AddRange(0, conn.t.NumPieces())
817 } else if !torrent.haveAnyPieces() {
818 conn.Post(pp.Message{Type: pp.HaveNone})
819 conn.sentHaves.Clear()
825 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
826 conn.Post(pp.Message{
828 ExtendedID: pp.HandshakeExtendedID,
829 ExtendedPayload: func() []byte {
830 d := map[string]interface{}{
831 "m": func() (ret map[string]int) {
832 ret = make(map[string]int, 2)
833 ret["ut_metadata"] = metadataExtendedId
834 if !cl.config.DisablePEX {
835 ret["ut_pex"] = pexExtendedId
839 "v": cl.config.ExtendedHandshakeClientVersion,
840 // No upload queue is implemented yet.
843 if !cl.config.DisableEncryption {
846 if torrent.metadataSizeKnown() {
847 d["metadata_size"] = torrent.metadataSize()
849 if p := cl.incomingPeerPort(); p != 0 {
852 yourip, err := addrCompactIP(conn.remoteAddr())
854 log.Printf("error calculating yourip field value in extension handshake: %s", err)
858 // log.Printf("sending %v", d)
859 b, err := bencode.Marshal(d)
867 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
868 conn.Post(pp.Message{
875 func (cl *Client) dhtPort() (ret uint16) {
876 cl.eachDhtServer(func(s *dht.Server) {
877 ret = uint16(missinggo.AddrPort(s.Addr()))
882 func (cl *Client) haveDhtServer() (ret bool) {
883 cl.eachDhtServer(func(_ *dht.Server) {
889 // Process incoming ut_metadata message.
890 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
892 err := bencode.Unmarshal(payload, &d)
893 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
894 } else if err != nil {
895 return fmt.Errorf("error unmarshalling bencode: %s", err)
897 msgType, ok := d["msg_type"]
899 return errors.New("missing msg_type field")
903 case pp.DataMetadataExtensionMsgType:
904 if !c.requestedMetadataPiece(piece) {
905 return fmt.Errorf("got unexpected piece %d", piece)
907 c.metadataRequests[piece] = false
908 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
909 if begin < 0 || begin >= len(payload) {
910 return fmt.Errorf("data has bad offset in payload: %d", begin)
912 t.saveMetadataPiece(piece, payload[begin:])
913 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
914 c.lastUsefulChunkReceived = time.Now()
915 return t.maybeCompleteMetadata()
916 case pp.RequestMetadataExtensionMsgType:
917 if !t.haveMetadataPiece(piece) {
918 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
921 start := (1 << 14) * piece
922 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
924 case pp.RejectMetadataExtensionMsgType:
927 return errors.New("unknown msg_type value")
931 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
935 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
938 if _, ok := cl.ipBlockRange(ip); ok {
941 if _, ok := cl.badPeerIPs[ip.String()]; ok {
947 // Return a Torrent ready for insertion into a Client.
948 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
949 // use provided storage, if provided
950 storageClient := cl.defaultStorage
951 if specStorage != nil {
952 storageClient = storage.NewClient(specStorage)
958 peers: prioritizedPeers{
960 getPrio: func(p Peer) peerPriority {
961 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
964 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
966 halfOpen: make(map[string]Peer),
967 pieceStateChanges: pubsub.NewPubSub(),
969 storageOpener: storageClient,
970 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
972 networkingEnabled: true,
974 metadataChanged: sync.Cond{
978 t.logger = cl.logger.Clone().AddValue(t)
979 t.setChunkSize(defaultChunkSize)
983 // A file-like handle to some torrent data resource.
984 type Handle interface {
991 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
992 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
995 // Adds a torrent by InfoHash with a custom Storage implementation.
996 // If the torrent already exists then this Storage is ignored and the
997 // existing torrent returned with `new` set to `false`
998 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1000 defer cl.mu.Unlock()
1001 t, ok := cl.torrents[infoHash]
1006 t = cl.newTorrent(infoHash, specStorage)
1007 cl.eachDhtServer(func(s *dht.Server) {
1008 go t.dhtAnnouncer(s)
1010 cl.torrents[infoHash] = t
1011 t.updateWantPeersEvent()
1012 // Tickle Client.waitAccept, new torrent may want conns.
1013 cl.event.Broadcast()
1017 // Add or merge a torrent spec. If the torrent is already present, the
1018 // trackers will be merged with the existing ones. If the Info isn't yet
1019 // known, it will be set. The display name is replaced if the new spec
1020 // provides one. Returns new if the torrent wasn't already in the client.
1021 // Note that any `Storage` defined on the spec will be ignored if the
1022 // torrent is already present (i.e. `new` return value is `true`)
1023 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1024 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1025 if spec.DisplayName != "" {
1026 t.SetDisplayName(spec.DisplayName)
1028 if spec.InfoBytes != nil {
1029 err = t.SetInfoBytes(spec.InfoBytes)
1035 defer cl.mu.Unlock()
1036 if spec.ChunkSize != 0 {
1037 t.setChunkSize(pp.Integer(spec.ChunkSize))
1039 t.addTrackers(spec.Trackers)
1044 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1045 t, ok := cl.torrents[infoHash]
1047 err = fmt.Errorf("no such torrent")
1054 delete(cl.torrents, infoHash)
1058 func (cl *Client) allTorrentsCompleted() bool {
1059 for _, t := range cl.torrents {
1063 if !t.haveAllPieces() {
1070 // Returns true when all torrents are completely downloaded and false if the
1071 // client is stopped before that.
1072 func (cl *Client) WaitAll() bool {
1074 defer cl.mu.Unlock()
1075 for !cl.allTorrentsCompleted() {
1076 if cl.closed.IsSet() {
1084 // Returns handles to all the torrents loaded in the Client.
1085 func (cl *Client) Torrents() []*Torrent {
1087 defer cl.mu.Unlock()
1088 return cl.torrentsAsSlice()
1091 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1092 for _, t := range cl.torrents {
1093 ret = append(ret, t)
1098 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1099 spec, err := TorrentSpecFromMagnetURI(uri)
1103 T, _, err = cl.AddTorrentSpec(spec)
1107 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1108 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1110 slices.MakeInto(&ss, mi.Nodes)
1115 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1116 mi, err := metainfo.LoadFromFile(filename)
1120 return cl.AddTorrent(mi)
1123 func (cl *Client) DhtServers() []*dht.Server {
1124 return cl.dhtServers
1127 func (cl *Client) AddDHTNodes(nodes []string) {
1128 for _, n := range nodes {
1129 hmp := missinggo.SplitHostMaybePort(n)
1130 ip := net.ParseIP(hmp.Host)
1132 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1135 ni := krpc.NodeInfo{
1136 Addr: krpc.NodeAddr{
1141 cl.eachDhtServer(func(s *dht.Server) {
1147 func (cl *Client) banPeerIP(ip net.IP) {
1148 if cl.badPeerIPs == nil {
1149 cl.badPeerIPs = make(map[string]struct{})
1151 cl.badPeerIPs[ip.String()] = struct{}{}
1154 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1160 PeerMaxRequests: 250,
1161 writeBuffer: new(bytes.Buffer),
1163 c.writerCond.L = &cl.mu
1164 c.setRW(connStatsReadWriter{nc, c})
1165 c.r = &rateLimitedReader{
1166 l: cl.config.DownloadRateLimiter,
1172 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1174 defer cl.mu.Unlock()
1182 Source: peerSourceDHTAnnouncePeer,
1186 func firstNotNil(ips ...net.IP) net.IP {
1187 for _, ip := range ips {
1195 func (cl *Client) eachListener(f func(socket) bool) {
1196 for _, s := range cl.conns {
1203 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1204 cl.eachListener(func(l socket) bool {
1211 func (cl *Client) publicIp(peer net.IP) net.IP {
1212 // TODO: Use BEP 10 to determine how peers are seeing us.
1213 if peer.To4() != nil {
1215 cl.config.PublicIp4,
1216 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1220 cl.config.PublicIp6,
1221 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1226 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1227 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1228 return f(missinggo.AddrIP(l.Addr()))
1232 // Our IP as a peer should see it.
1233 func (cl *Client) publicAddr(peer net.IP) ipPort {
1234 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1237 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1239 defer cl.mu.Unlock()
1240 cl.eachListener(func(l socket) bool {
1241 ret = append(ret, l.Addr())
1247 func (cl *Client) onBadAccept(addr net.Addr) {
1248 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1249 if cl.acceptLimiter == nil {
1250 cl.acceptLimiter = make(map[ipStr]int)
1252 cl.acceptLimiter[ipStr(ip.String())]++
1255 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1256 if ip4 := ip.To4(); ip4 != nil {
1257 return ip4.Mask(net.CIDRMask(24, 32))
1262 func (cl *Client) acceptLimitClearer() {
1265 case <-cl.closed.LockedChan(&cl.mu):
1267 case <-time.After(15 * time.Minute):
1269 cl.acceptLimiter = nil
1275 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1277 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] >= 10