19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/anacrolix/torrent/bencode"
31 "github.com/anacrolix/torrent/iplist"
32 "github.com/anacrolix/torrent/metainfo"
33 "github.com/anacrolix/torrent/mse"
34 pp "github.com/anacrolix/torrent/peer_protocol"
35 "github.com/anacrolix/torrent/storage"
36 "github.com/davecgh/go-spew/spew"
37 "github.com/dustin/go-humanize"
38 "github.com/google/btree"
39 "golang.org/x/time/rate"
42 // Clients contain zero or more Torrents. A Client manages a blocklist, the
43 // TCP/UDP protocol ports, and DHT as desired.
45 // An aggregate of stats over all connections. First in struct to ensure
46 // 64-bit alignment of fields. See #262.
51 closed missinggo.Event
57 defaultStorage *storage.Client
60 dhtServers []*dht.Server
61 ipBlockList iplist.Ranger
62 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
63 extensionBytes pp.PeerExtensionBits
65 // Set of addresses that have our client ID. This intentionally will
66 // include ourselves if we end up trying to connect to our own address
67 // through legitimate channels.
68 dopplegangerAddrs map[string]struct{}
69 badPeerIPs map[string]struct{}
70 torrents map[InfoHash]*Torrent
72 acceptLimiter map[ipStr]int
73 dialRateLimiter *rate.Limiter
78 func (cl *Client) BadPeerIPs() []string {
81 return cl.badPeerIPsLocked()
84 func (cl *Client) badPeerIPsLocked() []string {
85 return slices.FromMapKeys(cl.badPeerIPs).([]string)
88 func (cl *Client) PeerID() PeerID {
92 func (cl *Client) LocalPort() (port int) {
93 cl.eachListener(func(l socket) bool {
94 _port := missinggo.AddrPort(l.Addr())
100 } else if port != _port {
101 panic("mismatched ports")
108 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
109 dhtStats := s.Stats()
110 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
111 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
112 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
113 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
116 // Writes out a human readable status of the client, such as for writing to a
118 func (cl *Client) WriteStatus(_w io.Writer) {
121 w := bufio.NewWriter(_w)
123 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
124 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
126 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
127 cl.eachDhtServer(func(s *dht.Server) {
128 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
129 writeDhtServerStatus(w, s)
131 spew.Fdump(w, cl.stats)
132 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
135 return l.InfoHash().AsString() < r.InfoHash().AsString()
138 fmt.Fprint(w, "<unknown name>")
140 fmt.Fprint(w, t.name())
144 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
146 w.WriteString("<missing metainfo>")
154 const debugLogValue = "debug"
156 func (cl *Client) debugLogFilter(m *log.Msg) bool {
157 if !cl.config.Debug {
158 _, ok := m.Values()[debugLogValue]
164 func (cl *Client) initLogger() {
165 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
168 func (cl *Client) announceKey() int32 {
169 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
172 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
174 cfg = NewDefaultClientConfig()
183 dopplegangerAddrs: make(map[string]struct{}),
184 torrents: make(map[metainfo.Hash]*Torrent),
185 dialRateLimiter: rate.NewLimiter(10, 10),
187 go cl.acceptLimitClearer()
195 cl.extensionBytes = defaultPeerExtensionBytes()
196 cl.event.L = cl.locker()
197 storageImpl := cfg.DefaultStorage
198 if storageImpl == nil {
199 // We'd use mmap but HFS+ doesn't support sparse files.
200 storageImpl = storage.NewFile(cfg.DataDir)
201 cl.onClose = append(cl.onClose, func() {
202 if err := storageImpl.Close(); err != nil {
203 log.Printf("error closing default storage: %s", err)
207 cl.defaultStorage = storage.NewClient(storageImpl)
208 if cfg.IPBlocklist != nil {
209 cl.ipBlockList = cfg.IPBlocklist
212 if cfg.PeerID != "" {
213 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
215 o := copy(cl.peerID[:], cfg.Bep20)
216 _, err = rand.Read(cl.peerID[o:])
218 panic("error generating peer id")
222 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
223 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
224 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
228 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
235 for _, s := range cl.conns {
236 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
237 go cl.acceptConnections(s)
243 for _, s := range cl.conns {
244 if pc, ok := s.(net.PacketConn); ok {
245 ds, err := cl.newDhtServer(pc)
249 cl.dhtServers = append(cl.dhtServers, ds)
257 func (cl *Client) firewallCallback(net.Addr) bool {
259 block := !cl.wantConns()
262 torrent.Add("connections firewalled", 1)
264 torrent.Add("connections not firewalled", 1)
269 func (cl *Client) enabledPeerNetworks() (ns []network) {
270 for _, n := range allPeerNetworks {
271 if peerNetworkEnabled(n, cl.config) {
278 func (cl *Client) listenOnNetwork(n network) bool {
279 if n.Ipv4 && cl.config.DisableIPv4 {
282 if n.Ipv6 && cl.config.DisableIPv6 {
285 if n.Tcp && cl.config.DisableTCP {
288 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
294 func (cl *Client) listenNetworks() (ns []network) {
295 for _, n := range allPeerNetworks {
296 if cl.listenOnNetwork(n) {
303 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
304 cfg := dht.ServerConfig{
305 IPBlocklist: cl.ipBlockList,
307 OnAnnouncePeer: cl.onDHTAnnouncePeer,
308 PublicIP: func() net.IP {
309 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
310 return cl.config.PublicIp6
312 return cl.config.PublicIp4
314 StartingNodes: cl.config.DhtStartingNodes,
315 ConnectionTracking: cl.config.ConnTracker,
317 s, err = dht.NewServer(&cfg)
320 ts, err := s.Bootstrap()
322 log.Printf("error bootstrapping dht: %s", err)
324 log.Printf("%s: dht bootstrap: %v", s, ts)
330 func (cl *Client) Closed() <-chan struct{} {
336 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
337 for _, ds := range cl.dhtServers {
342 func (cl *Client) closeSockets() {
343 cl.eachListener(func(l socket) bool {
350 // Stops the client. All connections to peers are closed and all activity will
352 func (cl *Client) Close() {
356 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
358 for _, t := range cl.torrents {
361 for _, f := range cl.onClose {
367 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
368 if cl.ipBlockList == nil {
371 return cl.ipBlockList.Lookup(ip)
374 func (cl *Client) ipIsBlocked(ip net.IP) bool {
375 _, blocked := cl.ipBlockRange(ip)
379 func (cl *Client) wantConns() bool {
380 for _, t := range cl.torrents {
388 func (cl *Client) waitAccept() {
390 if cl.closed.IsSet() {
400 func (cl *Client) rejectAccepted(conn net.Conn) bool {
401 ra := conn.RemoteAddr()
402 rip := missinggo.AddrIP(ra)
403 if cl.config.DisableIPv4Peers && rip.To4() != nil {
406 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
409 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
412 if cl.rateLimitAccept(rip) {
415 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
418 func (cl *Client) acceptConnections(l net.Listener) {
420 conn, err := l.Accept()
421 conn = pproffd.WrapNetConn(conn)
423 closed := cl.closed.IsSet()
426 reject = cl.rejectAccepted(conn)
436 log.Printf("error accepting connection: %s", err)
441 torrent.Add("rejected accepted connections", 1)
444 go cl.incomingConnection(conn)
446 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
447 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
448 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
449 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
454 func (cl *Client) incomingConnection(nc net.Conn) {
456 if tc, ok := nc.(*net.TCPConn); ok {
459 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
460 c.Discovery = peerSourceIncoming
461 cl.runReceivedConn(c)
464 // Returns a handle to the given torrent, if it's present in the client.
465 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
468 t, ok = cl.torrents[ih]
472 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
473 return cl.torrents[ih]
476 type dialResult struct {
481 func countDialResult(err error) {
483 torrent.Add("successful dials", 1)
485 torrent.Add("unsuccessful dials", 1)
489 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
490 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
491 if ret < minDialTimeout {
497 // Returns whether an address is known to connect to a client with our own ID.
498 func (cl *Client) dopplegangerAddr(addr string) bool {
499 _, ok := cl.dopplegangerAddrs[addr]
503 // Returns a connection over UTP or TCP, whichever is first to connect.
504 func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
505 ctx, cancel := context.WithCancel(ctx)
506 // As soon as we return one connection, cancel the others.
509 resCh := make(chan dialResult, left)
513 cl.eachListener(func(s socket) bool {
514 network := s.Addr().Network()
515 if peerNetworkEnabled(parseNetworkString(network), cl.config) {
518 cte := cl.config.ConnTracker.Wait(
519 conntrack.Entry{network, s.Addr().String(), addr},
520 "dial torrent client",
523 c, err := s.dial(ctx, addr)
524 // This is a bit optimistic, but it looks non-trivial to thread
525 // this through the proxy code. Set it now in case we close the
526 // connection forthwith.
527 if tc, ok := c.(*net.TCPConn); ok {
531 dr := dialResult{c, network}
535 dr.Conn = closeWrapper{c, func() error {
548 // Wait for a successful connection.
550 defer perf.ScopeTimer()()
551 for ; left > 0 && res.Conn == nil; left-- {
555 // There are still incompleted dials.
557 for ; left > 0; left-- {
558 conn := (<-resCh).Conn
565 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
570 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
571 if _, ok := t.halfOpen[addr]; !ok {
572 panic("invariant broken")
574 delete(t.halfOpen, addr)
578 // Performs initiator handshakes and returns a connection. Returns nil
579 // *connection if no connection for valid reasons.
580 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
581 c = cl.newConnection(nc, true, remoteAddr, network)
582 c.headerEncrypted = encryptHeader
583 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
585 dl, ok := ctx.Deadline()
589 err = nc.SetDeadline(dl)
593 ok, err = cl.initiateHandshakes(c, t)
600 // Returns nil connection and nil error if no connection could be established
601 // for valid reasons.
602 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
603 dr := cl.dialFirst(ctx, addr.String())
609 if c == nil || err != nil {
613 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network)
616 // Returns nil connection and nil error if no connection could be established
617 // for valid reasons.
618 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
619 torrent.Add("establish outgoing connection", 1)
620 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
623 return t.dialTimeout()
626 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
627 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
632 torrent.Add("initiated conn with preferred header obfuscation", 1)
635 if cl.config.ForceEncryption {
636 // We should have just tried with an obfuscated header. A plaintext
637 // header can't result in an encrypted connection, so we're done.
638 if !obfuscatedHeaderFirst {
639 panic(cl.config.EncryptionPolicy)
643 // Try again with encryption if we didn't earlier, or without if we did.
644 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
646 torrent.Add("initiated conn with fallback header obfuscation", 1)
651 // Called to dial out and run a connection. The addr we're given is already
652 // considered half-open.
653 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
654 cl.dialRateLimiter.Wait(context.Background())
655 c, err := cl.establishOutgoingConn(t, addr)
658 // Don't release lock between here and addConnection, unless it's for
660 cl.noLongerHalfOpen(t, addr.String())
663 log.Printf("error establishing outgoing connection: %s", err)
672 cl.runHandshookConn(c, t)
675 // The port number for incoming peer connections. 0 if the client isn't
677 func (cl *Client) incomingPeerPort() int {
678 return cl.LocalPort()
681 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
682 if c.headerEncrypted {
684 rw, c.cryptoMethod, err = mse.InitiateHandshake(
691 func() mse.CryptoMethod {
693 case cl.config.ForceEncryption:
694 return mse.CryptoMethodRC4
695 case cl.config.DisableEncryption:
696 return mse.CryptoMethodPlaintext
698 return mse.AllSupportedCrypto
707 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
708 if ih != t.infoHash {
714 // Calls f with any secret keys.
715 func (cl *Client) forSkeys(f func([]byte) bool) {
718 for ih := range cl.torrents {
725 // Do encryption and bittorrent handshakes as receiver.
726 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
727 defer perf.ScopeTimerErr(&err)()
729 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
731 if err == nil || err == mse.ErrNoSecretKeyMatch {
732 if c.headerEncrypted {
733 torrent.Add("handshakes received encrypted", 1)
735 torrent.Add("handshakes received unencrypted", 1)
738 torrent.Add("handshakes received with error while handling encryption", 1)
741 if err == mse.ErrNoSecretKeyMatch {
746 if cl.config.ForceEncryption && !c.headerEncrypted {
747 err = errors.New("connection not encrypted")
750 ih, ok, err := cl.connBTHandshake(c, nil)
752 err = fmt.Errorf("error during bt handshake: %s", err)
764 // Returns !ok if handshake failed for valid reasons.
765 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
766 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
767 if err != nil || !ok {
771 c.PeerExtensionBytes = res.PeerExtensionBits
772 c.PeerID = res.PeerID
773 c.completedHandshake = time.Now()
777 func (cl *Client) runReceivedConn(c *connection) {
778 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
782 t, err := cl.receiveHandshakes(c)
785 "error receiving handshakes: %s", err,
789 "network", c.network,
791 torrent.Add("error receiving handshake", 1)
793 cl.onBadAccept(c.remoteAddr)
798 torrent.Add("received handshake for unloaded torrent", 1)
800 cl.onBadAccept(c.remoteAddr)
804 torrent.Add("received handshake for loaded torrent", 1)
807 cl.runHandshookConn(c, t)
810 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
812 if c.PeerID == cl.peerID {
815 addr := c.conn.RemoteAddr().String()
816 cl.dopplegangerAddrs[addr] = struct{}{}
818 // Because the remote address is not necessarily the same as its
819 // client's torrent listen address, we won't record the remote address
820 // as a doppleganger. Instead, the initiator can record *us* as the
825 c.conn.SetWriteDeadline(time.Time{})
826 c.r = deadlineReader{c.conn, c.r}
827 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
828 if connIsIpv6(c.conn) {
829 torrent.Add("completed handshake over ipv6", 1)
831 if err := t.addConnection(c); err != nil {
832 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
835 defer t.dropConnection(c)
836 go c.writer(time.Minute)
837 cl.sendInitialMessages(c, t)
838 err := c.mainReadLoop()
839 if err != nil && cl.config.Debug {
840 log.Printf("error during connection main read loop: %s", err)
844 // See the order given in Transmission's tr_peerMsgsNew.
845 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
846 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
847 conn.Post(pp.Message{
849 ExtendedID: pp.HandshakeExtendedID,
850 ExtendedPayload: func() []byte {
851 msg := pp.ExtendedHandshakeMessage{
852 M: map[pp.ExtensionName]pp.ExtensionNumber{
853 pp.ExtensionNameMetadata: metadataExtendedId,
855 V: cl.config.ExtendedHandshakeClientVersion,
856 Reqq: 64, // TODO: Really?
857 YourIp: pp.CompactIp(conn.remoteAddr.IP),
858 Encryption: !cl.config.DisableEncryption,
859 Port: cl.incomingPeerPort(),
860 MetadataSize: torrent.metadataSize(),
861 // TODO: We can figured these out specific to the socket
863 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
864 Ipv6: cl.config.PublicIp6.To16(),
866 if !cl.config.DisablePEX {
867 msg.M[pp.ExtensionNamePex] = pexExtendedId
869 return bencode.MustMarshal(msg)
874 if conn.fastEnabled() {
875 if torrent.haveAllPieces() {
876 conn.Post(pp.Message{Type: pp.HaveAll})
877 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
879 } else if !torrent.haveAnyPieces() {
880 conn.Post(pp.Message{Type: pp.HaveNone})
881 conn.sentHaves.Clear()
887 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
888 conn.Post(pp.Message{
895 func (cl *Client) dhtPort() (ret uint16) {
896 cl.eachDhtServer(func(s *dht.Server) {
897 ret = uint16(missinggo.AddrPort(s.Addr()))
902 func (cl *Client) haveDhtServer() (ret bool) {
903 cl.eachDhtServer(func(_ *dht.Server) {
909 // Process incoming ut_metadata message.
910 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
912 err := bencode.Unmarshal(payload, &d)
913 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
914 } else if err != nil {
915 return fmt.Errorf("error unmarshalling bencode: %s", err)
917 msgType, ok := d["msg_type"]
919 return errors.New("missing msg_type field")
923 case pp.DataMetadataExtensionMsgType:
924 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
925 if !c.requestedMetadataPiece(piece) {
926 return fmt.Errorf("got unexpected piece %d", piece)
928 c.metadataRequests[piece] = false
929 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
930 if begin < 0 || begin >= len(payload) {
931 return fmt.Errorf("data has bad offset in payload: %d", begin)
933 t.saveMetadataPiece(piece, payload[begin:])
934 c.lastUsefulChunkReceived = time.Now()
935 return t.maybeCompleteMetadata()
936 case pp.RequestMetadataExtensionMsgType:
937 if !t.haveMetadataPiece(piece) {
938 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
941 start := (1 << 14) * piece
942 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
944 case pp.RejectMetadataExtensionMsgType:
947 return errors.New("unknown msg_type value")
951 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
955 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
958 if _, ok := cl.ipBlockRange(ip); ok {
961 if _, ok := cl.badPeerIPs[ip.String()]; ok {
967 // Return a Torrent ready for insertion into a Client.
968 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
969 // use provided storage, if provided
970 storageClient := cl.defaultStorage
971 if specStorage != nil {
972 storageClient = storage.NewClient(specStorage)
978 peers: prioritizedPeers{
980 getPrio: func(p Peer) peerPriority {
981 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
984 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
986 halfOpen: make(map[string]Peer),
987 pieceStateChanges: pubsub.NewPubSub(),
989 storageOpener: storageClient,
990 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
992 networkingEnabled: true,
994 metadataChanged: sync.Cond{
997 duplicateRequestTimeout: 1 * time.Second,
999 t.logger = cl.logger.Clone().AddValue(t)
1000 t.setChunkSize(defaultChunkSize)
1004 // A file-like handle to some torrent data resource.
1005 type Handle interface {
1012 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1013 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1016 // Adds a torrent by InfoHash with a custom Storage implementation.
1017 // If the torrent already exists then this Storage is ignored and the
1018 // existing torrent returned with `new` set to `false`
1019 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1022 t, ok := cl.torrents[infoHash]
1028 t = cl.newTorrent(infoHash, specStorage)
1029 cl.eachDhtServer(func(s *dht.Server) {
1030 go t.dhtAnnouncer(s)
1032 cl.torrents[infoHash] = t
1033 cl.clearAcceptLimits()
1034 t.updateWantPeersEvent()
1035 // Tickle Client.waitAccept, new torrent may want conns.
1036 cl.event.Broadcast()
1040 // Add or merge a torrent spec. If the torrent is already present, the
1041 // trackers will be merged with the existing ones. If the Info isn't yet
1042 // known, it will be set. The display name is replaced if the new spec
1043 // provides one. Returns new if the torrent wasn't already in the client.
1044 // Note that any `Storage` defined on the spec will be ignored if the
1045 // torrent is already present (i.e. `new` return value is `true`)
1046 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1047 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1048 if spec.DisplayName != "" {
1049 t.SetDisplayName(spec.DisplayName)
1051 if spec.InfoBytes != nil {
1052 err = t.SetInfoBytes(spec.InfoBytes)
1059 if spec.ChunkSize != 0 {
1060 t.setChunkSize(pp.Integer(spec.ChunkSize))
1062 t.addTrackers(spec.Trackers)
1067 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1068 t, ok := cl.torrents[infoHash]
1070 err = fmt.Errorf("no such torrent")
1077 delete(cl.torrents, infoHash)
1081 func (cl *Client) allTorrentsCompleted() bool {
1082 for _, t := range cl.torrents {
1086 if !t.haveAllPieces() {
1093 // Returns true when all torrents are completely downloaded and false if the
1094 // client is stopped before that.
1095 func (cl *Client) WaitAll() bool {
1098 for !cl.allTorrentsCompleted() {
1099 if cl.closed.IsSet() {
1107 // Returns handles to all the torrents loaded in the Client.
1108 func (cl *Client) Torrents() []*Torrent {
1111 return cl.torrentsAsSlice()
1114 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1115 for _, t := range cl.torrents {
1116 ret = append(ret, t)
1121 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1122 spec, err := TorrentSpecFromMagnetURI(uri)
1126 T, _, err = cl.AddTorrentSpec(spec)
1130 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1131 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1133 slices.MakeInto(&ss, mi.Nodes)
1138 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1139 mi, err := metainfo.LoadFromFile(filename)
1143 return cl.AddTorrent(mi)
1146 func (cl *Client) DhtServers() []*dht.Server {
1147 return cl.dhtServers
1150 func (cl *Client) AddDHTNodes(nodes []string) {
1151 for _, n := range nodes {
1152 hmp := missinggo.SplitHostMaybePort(n)
1153 ip := net.ParseIP(hmp.Host)
1155 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1158 ni := krpc.NodeInfo{
1159 Addr: krpc.NodeAddr{
1164 cl.eachDhtServer(func(s *dht.Server) {
1170 func (cl *Client) banPeerIP(ip net.IP) {
1171 if cl.badPeerIPs == nil {
1172 cl.badPeerIPs = make(map[string]struct{})
1174 cl.badPeerIPs[ip.String()] = struct{}{}
1177 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1183 PeerMaxRequests: 250,
1184 writeBuffer: new(bytes.Buffer),
1185 remoteAddr: remoteAddr,
1188 c.writerCond.L = cl.locker()
1189 c.setRW(connStatsReadWriter{nc, c})
1190 c.r = &rateLimitedReader{
1191 l: cl.config.DownloadRateLimiter,
1197 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1207 Source: peerSourceDHTAnnouncePeer,
1211 func firstNotNil(ips ...net.IP) net.IP {
1212 for _, ip := range ips {
1220 func (cl *Client) eachListener(f func(socket) bool) {
1221 for _, s := range cl.conns {
1228 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1229 cl.eachListener(func(l socket) bool {
1236 func (cl *Client) publicIp(peer net.IP) net.IP {
1237 // TODO: Use BEP 10 to determine how peers are seeing us.
1238 if peer.To4() != nil {
1240 cl.config.PublicIp4,
1241 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1245 cl.config.PublicIp6,
1246 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1251 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1252 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1253 return f(missinggo.AddrIP(l.Addr()))
1257 // Our IP as a peer should see it.
1258 func (cl *Client) publicAddr(peer net.IP) IpPort {
1259 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1262 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1265 cl.eachListener(func(l socket) bool {
1266 ret = append(ret, l.Addr())
1272 func (cl *Client) onBadAccept(addr IpPort) {
1273 ip := maskIpForAcceptLimiting(addr.IP)
1274 if cl.acceptLimiter == nil {
1275 cl.acceptLimiter = make(map[ipStr]int)
1277 cl.acceptLimiter[ipStr(ip.String())]++
1280 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1281 if ip4 := ip.To4(); ip4 != nil {
1282 return ip4.Mask(net.CIDRMask(24, 32))
1287 func (cl *Client) clearAcceptLimits() {
1288 cl.acceptLimiter = nil
1291 func (cl *Client) acceptLimitClearer() {
1294 case <-cl.closed.LockedChan(cl.locker()):
1296 case <-time.After(15 * time.Minute):
1298 cl.clearAcceptLimits()
1304 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1305 if cl.config.DisableAcceptRateLimiting {
1308 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1311 func (cl *Client) rLock() {
1315 func (cl *Client) rUnlock() {
1319 func (cl *Client) lock() {
1323 func (cl *Client) unlock() {
1327 func (cl *Client) locker() sync.Locker {
1328 return clientLocker{cl}
1331 type clientLocker struct {
1335 func (cl clientLocker) Lock() {
1339 func (cl clientLocker) Unlock() {