17 "github.com/anacrolix/missinggo/perf"
19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
30 "golang.org/x/time/rate"
32 "github.com/anacrolix/torrent/bencode"
33 "github.com/anacrolix/torrent/iplist"
34 "github.com/anacrolix/torrent/metainfo"
35 "github.com/anacrolix/torrent/mse"
36 pp "github.com/anacrolix/torrent/peer_protocol"
37 "github.com/anacrolix/torrent/storage"
40 // Clients contain zero or more Torrents. A Client manages a blocklist, the
41 // TCP/UDP protocol ports, and DHT as desired.
45 closed missinggo.Event
52 defaultStorage *storage.Client
55 dhtServers []*dht.Server
56 ipBlockList iplist.Ranger
57 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
58 extensionBytes peerExtensionBytes
59 uploadLimit *rate.Limiter
60 downloadLimit *rate.Limiter
62 // Set of addresses that have our client ID. This intentionally will
63 // include ourselves if we end up trying to connect to our own address
64 // through legitimate channels.
65 dopplegangerAddrs map[string]struct{}
66 badPeerIPs map[string]struct{}
67 torrents map[metainfo.Hash]*Torrent
68 // An aggregate of stats over all connections.
71 acceptLimiter map[ipStr]int
76 func (cl *Client) BadPeerIPs() []string {
79 return cl.badPeerIPsLocked()
82 func (cl *Client) badPeerIPsLocked() []string {
83 return slices.FromMapKeys(cl.badPeerIPs).([]string)
86 func (cl *Client) PeerID() PeerID {
90 type torrentAddr string
92 func (torrentAddr) Network() string { return "" }
94 func (me torrentAddr) String() string { return string(me) }
96 func (cl *Client) LocalPort() (port int) {
97 cl.eachListener(func(l socket) bool {
98 _port := missinggo.AddrPort(l.Addr())
104 } else if port != _port {
105 panic("mismatched ports")
112 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
113 dhtStats := s.Stats()
114 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
115 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
116 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
117 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
120 // Writes out a human readable status of the client, such as for writing to a
122 func (cl *Client) WriteStatus(_w io.Writer) {
125 w := bufio.NewWriter(_w)
127 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
128 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
129 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
130 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
131 cl.eachDhtServer(func(s *dht.Server) {
132 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
133 writeDhtServerStatus(w, s)
135 spew.Fdump(w, cl.stats)
136 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
138 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
139 return l.InfoHash().AsString() < r.InfoHash().AsString()
142 fmt.Fprint(w, "<unknown name>")
144 fmt.Fprint(w, t.name())
148 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
150 w.WriteString("<missing metainfo>")
158 const debugLogValue = "debug"
160 func (cl *Client) debugLogFilter(m *log.Msg) bool {
161 if !cl.config.Debug {
162 _, ok := m.Values()[debugLogValue]
168 func (cl *Client) initLogger() {
169 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
172 func (cl *Client) announceKey() int32 {
173 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
176 func NewClient(cfg *Config) (cl *Client, err error) {
188 halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
190 dopplegangerAddrs: make(map[string]struct{}),
191 torrents: make(map[metainfo.Hash]*Torrent),
193 go cl.acceptLimitClearer()
201 if cfg.UploadRateLimiter == nil {
202 cl.uploadLimit = unlimited
204 cl.uploadLimit = cfg.UploadRateLimiter
206 if cfg.DownloadRateLimiter == nil {
207 cl.downloadLimit = unlimited
209 cl.downloadLimit = cfg.DownloadRateLimiter
211 cl.extensionBytes = defaultPeerExtensionBytes()
213 storageImpl := cfg.DefaultStorage
214 if storageImpl == nil {
215 // We'd use mmap but HFS+ doesn't support sparse files.
216 storageImpl = storage.NewFile(cfg.DataDir)
217 cl.onClose = append(cl.onClose, func() {
218 if err := storageImpl.Close(); err != nil {
219 log.Printf("error closing default storage: %s", err)
223 cl.defaultStorage = storage.NewClient(storageImpl)
224 if cfg.IPBlocklist != nil {
225 cl.ipBlockList = cfg.IPBlocklist
228 if cfg.PeerID != "" {
229 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
231 o := copy(cl.peerID[:], cfg.Bep20)
232 _, err = rand.Read(cl.peerID[o:])
234 panic("error generating peer id")
238 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
245 for _, s := range cl.conns {
246 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
247 go cl.acceptConnections(s)
253 for _, s := range cl.conns {
254 if pc, ok := s.(net.PacketConn); ok {
255 ds, err := cl.newDhtServer(pc)
259 cl.dhtServers = append(cl.dhtServers, ds)
267 func (cl *Client) enabledPeerNetworks() (ns []string) {
268 for _, n := range allPeerNetworks {
269 if peerNetworkEnabled(n, cl.config) {
276 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
277 cfg := dht.ServerConfig{
278 IPBlocklist: cl.ipBlockList,
280 OnAnnouncePeer: cl.onDHTAnnouncePeer,
281 PublicIP: func() net.IP {
282 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
283 return cl.config.PublicIp6
285 return cl.config.PublicIp4
287 StartingNodes: cl.config.DhtStartingNodes,
289 s, err = dht.NewServer(&cfg)
292 if _, err := s.Bootstrap(); err != nil {
293 log.Printf("error bootstrapping dht: %s", err)
300 func firstNonEmptyString(ss ...string) string {
301 for _, s := range ss {
309 func (cl *Client) Closed() <-chan struct{} {
315 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
316 for _, ds := range cl.dhtServers {
321 func (cl *Client) closeSockets() {
322 cl.eachListener(func(l socket) bool {
329 // Stops the client. All connections to peers are closed and all activity will
331 func (cl *Client) Close() {
335 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
337 for _, t := range cl.torrents {
340 for _, f := range cl.onClose {
346 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
347 if cl.ipBlockList == nil {
350 return cl.ipBlockList.Lookup(ip)
353 func (cl *Client) ipIsBlocked(ip net.IP) bool {
354 _, blocked := cl.ipBlockRange(ip)
358 func (cl *Client) waitAccept() {
360 for _, t := range cl.torrents {
365 if cl.closed.IsSet() {
372 func (cl *Client) rejectAccepted(conn net.Conn) bool {
373 ra := conn.RemoteAddr()
374 rip := missinggo.AddrIP(ra)
375 if cl.config.DisableIPv4Peers && rip.To4() != nil {
378 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
381 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
384 if cl.rateLimitAccept(rip) {
387 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
390 func (cl *Client) acceptConnections(l net.Listener) {
392 conn, err := l.Accept()
393 conn = pproffd.WrapNetConn(conn)
395 closed := cl.closed.IsSet()
398 reject = cl.rejectAccepted(conn)
409 // I think something harsher should happen here? Our accept
410 // routine just fucked off.
415 torrent.Add("rejected accepted connections", 1)
418 go cl.incomingConnection(conn)
420 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
421 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
422 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
423 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
428 func (cl *Client) incomingConnection(nc net.Conn) {
430 if tc, ok := nc.(*net.TCPConn); ok {
433 c := cl.newConnection(nc, false)
434 c.Discovery = peerSourceIncoming
435 cl.runReceivedConn(c)
438 // Returns a handle to the given torrent, if it's present in the client.
439 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
442 t, ok = cl.torrents[ih]
446 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
447 return cl.torrents[ih]
450 type dialResult struct {
454 func countDialResult(err error) {
456 torrent.Add("successful dials", 1)
458 torrent.Add("unsuccessful dials", 1)
462 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
463 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
464 if ret < minDialTimeout {
470 // Returns whether an address is known to connect to a client with our own ID.
471 func (cl *Client) dopplegangerAddr(addr string) bool {
472 _, ok := cl.dopplegangerAddrs[addr]
476 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
478 case allowIpv4 && allowIpv6:
480 case allowIpv4 && !allowIpv6:
482 case !allowIpv4 && allowIpv6:
485 panic("unhandled ip network combination")
489 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
490 return sock.DialContext(ctx, "", addr)
493 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
495 func peerNetworkEnabled(network string, cfg Config) bool {
496 c := func(s string) bool {
497 return strings.Contains(network, s)
500 if c("udp") || c("utp") {
504 if cfg.DisableTCP && c("tcp") {
507 if cfg.DisableIPv6 && c("6") {
513 // Returns a connection over UTP or TCP, whichever is first to connect.
514 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
515 ctx, cancel := context.WithCancel(ctx)
516 // As soon as we return one connection, cancel the others.
519 resCh := make(chan dialResult, left)
520 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
523 c, err := f(ctx, addr)
524 // This is a bit optimistic, but it looks non-trivial to thread
525 // this through the proxy code. Set it now in case we close the
526 // connection forthwith.
527 if tc, ok := c.(*net.TCPConn); ok {
531 resCh <- dialResult{c}
537 cl.eachListener(func(s socket) bool {
538 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
545 // Wait for a successful connection.
547 defer perf.ScopeTimer()()
548 for ; left > 0 && res.Conn == nil; left-- {
552 // There are still incompleted dials.
554 for ; left > 0; left-- {
555 conn := (<-resCh).Conn
562 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
567 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
568 if _, ok := t.halfOpen[addr]; !ok {
569 panic("invariant broken")
571 delete(t.halfOpen, addr)
575 // Performs initiator handshakes and returns a connection. Returns nil
576 // *connection if no connection for valid reasons.
577 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
578 c = cl.newConnection(nc, true)
579 c.headerEncrypted = encryptHeader
580 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
582 dl, ok := ctx.Deadline()
586 err = nc.SetDeadline(dl)
590 ok, err = cl.initiateHandshakes(c, t)
597 // Returns nil connection and nil error if no connection could be established
598 // for valid reasons.
599 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
600 nc := cl.dialFirst(ctx, addr)
605 if c == nil || err != nil {
609 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
612 // Returns nil connection and nil error if no connection could be established
613 // for valid reasons.
614 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
615 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
617 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
618 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
623 torrent.Add("initiated conn with preferred header obfuscation", 1)
626 if cl.config.ForceEncryption {
627 // We should have just tried with an obfuscated header. A plaintext
628 // header can't result in an encrypted connection, so we're done.
629 if !obfuscatedHeaderFirst {
630 panic(cl.config.EncryptionPolicy)
634 // Try again with encryption if we didn't earlier, or without if we did.
635 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
637 torrent.Add("initiated conn with fallback header obfuscation", 1)
642 // Called to dial out and run a connection. The addr we're given is already
643 // considered half-open.
644 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
645 c, err := cl.establishOutgoingConn(t, addr)
648 // Don't release lock between here and addConnection, unless it's for
650 cl.noLongerHalfOpen(t, addr)
653 log.Printf("error establishing outgoing connection: %s", err)
662 cl.runHandshookConn(c, t)
665 // The port number for incoming peer connections. 0 if the client isn't
667 func (cl *Client) incomingPeerPort() int {
668 return cl.LocalPort()
671 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
672 if c.headerEncrypted {
674 rw, c.cryptoMethod, err = mse.InitiateHandshake(
681 func() mse.CryptoMethod {
683 case cl.config.ForceEncryption:
684 return mse.CryptoMethodRC4
685 case cl.config.DisableEncryption:
686 return mse.CryptoMethodPlaintext
688 return mse.AllSupportedCrypto
697 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
698 if ih != t.infoHash {
704 // Calls f with any secret keys.
705 func (cl *Client) forSkeys(f func([]byte) bool) {
708 for ih := range cl.torrents {
715 // Do encryption and bittorrent handshakes as receiver.
716 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
717 defer perf.ScopeTimerErr(&err)()
719 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
722 if err == mse.ErrNoSecretKeyMatch {
727 if cl.config.ForceEncryption && !c.headerEncrypted {
728 err = errors.New("connection not encrypted")
731 ih, ok, err := cl.connBTHandshake(c, nil)
733 err = fmt.Errorf("error during bt handshake: %s", err)
745 // Returns !ok if handshake failed for valid reasons.
746 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
747 res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
748 if err != nil || !ok {
752 c.PeerExtensionBytes = res.peerExtensionBytes
753 c.PeerID = res.PeerID
754 c.completedHandshake = time.Now()
758 func (cl *Client) runReceivedConn(c *connection) {
759 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
763 t, err := cl.receiveHandshakes(c)
766 "error receiving handshakes: %s", err,
770 "network", c.remoteAddr().Network(),
772 torrent.Add("error receiving handshake", 1)
774 cl.onBadAccept(c.remoteAddr())
779 torrent.Add("received handshake for unloaded torrent", 1)
781 cl.onBadAccept(c.remoteAddr())
785 torrent.Add("received handshake for loaded torrent", 1)
788 cl.runHandshookConn(c, t)
791 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
793 if c.PeerID == cl.peerID {
796 addr := c.conn.RemoteAddr().String()
797 cl.dopplegangerAddrs[addr] = struct{}{}
799 // Because the remote address is not necessarily the same as its
800 // client's torrent listen address, we won't record the remote address
801 // as a doppleganger. Instead, the initiator can record *us* as the
806 c.conn.SetWriteDeadline(time.Time{})
807 c.r = deadlineReader{c.conn, c.r}
808 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
809 if connIsIpv6(c.conn) {
810 torrent.Add("completed handshake over ipv6", 1)
812 if err := t.addConnection(c); err != nil {
813 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
816 defer t.dropConnection(c)
817 go c.writer(time.Minute)
818 cl.sendInitialMessages(c, t)
819 err := c.mainReadLoop()
820 if err != nil && cl.config.Debug {
821 log.Printf("error during connection main read loop: %s", err)
825 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
827 if conn.fastEnabled() {
828 if torrent.haveAllPieces() {
829 conn.Post(pp.Message{Type: pp.HaveAll})
830 conn.sentHaves.AddRange(0, conn.t.NumPieces())
832 } else if !torrent.haveAnyPieces() {
833 conn.Post(pp.Message{Type: pp.HaveNone})
834 conn.sentHaves.Clear()
840 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
841 conn.Post(pp.Message{
843 ExtendedID: pp.HandshakeExtendedID,
844 ExtendedPayload: func() []byte {
845 d := map[string]interface{}{
846 "m": func() (ret map[string]int) {
847 ret = make(map[string]int, 2)
848 ret["ut_metadata"] = metadataExtendedId
849 if !cl.config.DisablePEX {
850 ret["ut_pex"] = pexExtendedId
854 "v": cl.config.ExtendedHandshakeClientVersion,
855 // No upload queue is implemented yet.
858 if !cl.config.DisableEncryption {
861 if torrent.metadataSizeKnown() {
862 d["metadata_size"] = torrent.metadataSize()
864 if p := cl.incomingPeerPort(); p != 0 {
867 yourip, err := addrCompactIP(conn.remoteAddr())
869 log.Printf("error calculating yourip field value in extension handshake: %s", err)
873 // log.Printf("sending %v", d)
874 b, err := bencode.Marshal(d)
882 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
883 conn.Post(pp.Message{
890 func (cl *Client) dhtPort() (ret uint16) {
891 cl.eachDhtServer(func(s *dht.Server) {
892 ret = uint16(missinggo.AddrPort(s.Addr()))
897 func (cl *Client) haveDhtServer() (ret bool) {
898 cl.eachDhtServer(func(_ *dht.Server) {
904 // Process incoming ut_metadata message.
905 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
907 err := bencode.Unmarshal(payload, &d)
908 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
909 } else if err != nil {
910 return fmt.Errorf("error unmarshalling bencode: %s", err)
912 msgType, ok := d["msg_type"]
914 return errors.New("missing msg_type field")
918 case pp.DataMetadataExtensionMsgType:
919 if !c.requestedMetadataPiece(piece) {
920 return fmt.Errorf("got unexpected piece %d", piece)
922 c.metadataRequests[piece] = false
923 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
924 if begin < 0 || begin >= len(payload) {
925 return fmt.Errorf("data has bad offset in payload: %d", begin)
927 t.saveMetadataPiece(piece, payload[begin:])
928 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
929 c.lastUsefulChunkReceived = time.Now()
930 return t.maybeCompleteMetadata()
931 case pp.RequestMetadataExtensionMsgType:
932 if !t.haveMetadataPiece(piece) {
933 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
936 start := (1 << 14) * piece
937 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
939 case pp.RejectMetadataExtensionMsgType:
942 return errors.New("unknown msg_type value")
946 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
950 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
953 if _, ok := cl.ipBlockRange(ip); ok {
956 if _, ok := cl.badPeerIPs[ip.String()]; ok {
962 // Return a Torrent ready for insertion into a Client.
963 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
964 // use provided storage, if provided
965 storageClient := cl.defaultStorage
966 if specStorage != nil {
967 storageClient = storage.NewClient(specStorage)
973 peers: prioritizedPeers{
975 getPrio: func(p Peer) peerPriority {
976 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
979 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
981 halfOpen: make(map[string]Peer),
982 pieceStateChanges: pubsub.NewPubSub(),
984 storageOpener: storageClient,
985 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
987 networkingEnabled: true,
989 metadataChanged: sync.Cond{
993 t.logger = cl.logger.Clone().AddValue(t)
994 t.setChunkSize(defaultChunkSize)
998 // A file-like handle to some torrent data resource.
999 type Handle interface {
1006 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1007 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1010 // Adds a torrent by InfoHash with a custom Storage implementation.
1011 // If the torrent already exists then this Storage is ignored and the
1012 // existing torrent returned with `new` set to `false`
1013 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1015 defer cl.mu.Unlock()
1016 t, ok := cl.torrents[infoHash]
1021 t = cl.newTorrent(infoHash, specStorage)
1022 cl.eachDhtServer(func(s *dht.Server) {
1023 go t.dhtAnnouncer(s)
1025 cl.torrents[infoHash] = t
1026 t.updateWantPeersEvent()
1027 // Tickle Client.waitAccept, new torrent may want conns.
1028 cl.event.Broadcast()
1032 // Add or merge a torrent spec. If the torrent is already present, the
1033 // trackers will be merged with the existing ones. If the Info isn't yet
1034 // known, it will be set. The display name is replaced if the new spec
1035 // provides one. Returns new if the torrent wasn't already in the client.
1036 // Note that any `Storage` defined on the spec will be ignored if the
1037 // torrent is already present (i.e. `new` return value is `true`)
1038 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1039 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1040 if spec.DisplayName != "" {
1041 t.SetDisplayName(spec.DisplayName)
1043 if spec.InfoBytes != nil {
1044 err = t.SetInfoBytes(spec.InfoBytes)
1050 defer cl.mu.Unlock()
1051 if spec.ChunkSize != 0 {
1052 t.setChunkSize(pp.Integer(spec.ChunkSize))
1054 t.addTrackers(spec.Trackers)
1059 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1060 t, ok := cl.torrents[infoHash]
1062 err = fmt.Errorf("no such torrent")
1069 delete(cl.torrents, infoHash)
1073 func (cl *Client) allTorrentsCompleted() bool {
1074 for _, t := range cl.torrents {
1078 if !t.haveAllPieces() {
1085 // Returns true when all torrents are completely downloaded and false if the
1086 // client is stopped before that.
1087 func (cl *Client) WaitAll() bool {
1089 defer cl.mu.Unlock()
1090 for !cl.allTorrentsCompleted() {
1091 if cl.closed.IsSet() {
1099 // Returns handles to all the torrents loaded in the Client.
1100 func (cl *Client) Torrents() []*Torrent {
1102 defer cl.mu.Unlock()
1103 return cl.torrentsAsSlice()
1106 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1107 for _, t := range cl.torrents {
1108 ret = append(ret, t)
1113 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1114 spec, err := TorrentSpecFromMagnetURI(uri)
1118 T, _, err = cl.AddTorrentSpec(spec)
1122 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1123 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1125 slices.MakeInto(&ss, mi.Nodes)
1130 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1131 mi, err := metainfo.LoadFromFile(filename)
1135 return cl.AddTorrent(mi)
1138 func (cl *Client) DhtServers() []*dht.Server {
1139 return cl.dhtServers
1142 func (cl *Client) AddDHTNodes(nodes []string) {
1143 for _, n := range nodes {
1144 hmp := missinggo.SplitHostMaybePort(n)
1145 ip := net.ParseIP(hmp.Host)
1147 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1150 ni := krpc.NodeInfo{
1151 Addr: krpc.NodeAddr{
1156 cl.eachDhtServer(func(s *dht.Server) {
1162 func (cl *Client) banPeerIP(ip net.IP) {
1163 if cl.badPeerIPs == nil {
1164 cl.badPeerIPs = make(map[string]struct{})
1166 cl.badPeerIPs[ip.String()] = struct{}{}
1169 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1175 PeerMaxRequests: 250,
1176 writeBuffer: new(bytes.Buffer),
1178 c.writerCond.L = &cl.mu
1179 c.setRW(connStatsReadWriter{nc, c})
1180 c.r = &rateLimitedReader{
1181 l: cl.downloadLimit,
1187 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1189 defer cl.mu.Unlock()
1197 Source: peerSourceDHTAnnouncePeer,
1201 func firstNotNil(ips ...net.IP) net.IP {
1202 for _, ip := range ips {
1210 func (cl *Client) eachListener(f func(socket) bool) {
1211 for _, s := range cl.conns {
1218 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1219 cl.eachListener(func(l socket) bool {
1226 func (cl *Client) publicIp(peer net.IP) net.IP {
1227 // TODO: Use BEP 10 to determine how peers are seeing us.
1228 if peer.To4() != nil {
1230 cl.config.PublicIp4,
1231 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1235 cl.config.PublicIp6,
1236 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1241 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1242 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1243 return f(missinggo.AddrIP(l.Addr()))
1247 // Our IP as a peer should see it.
1248 func (cl *Client) publicAddr(peer net.IP) ipPort {
1249 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1252 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1254 defer cl.mu.Unlock()
1255 cl.eachListener(func(l socket) bool {
1256 ret = append(ret, l.Addr())
1262 func (cl *Client) onBadAccept(addr net.Addr) {
1263 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1264 if cl.acceptLimiter == nil {
1265 cl.acceptLimiter = make(map[ipStr]int)
1267 cl.acceptLimiter[ipStr(ip.String())]++
1270 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1271 if ip4 := ip.To4(); ip4 != nil {
1272 return ip4.Mask(net.CIDRMask(24, 32))
1277 func (cl *Client) acceptLimitClearer() {
1280 case <-cl.closed.LockedChan(&cl.mu):
1282 case <-time.After(15 * time.Minute):
1284 cl.acceptLimiter = nil
1290 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1292 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] >= 10