19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/anacrolix/torrent/bencode"
31 "github.com/anacrolix/torrent/iplist"
32 "github.com/anacrolix/torrent/metainfo"
33 "github.com/anacrolix/torrent/mse"
34 pp "github.com/anacrolix/torrent/peer_protocol"
35 "github.com/anacrolix/torrent/storage"
36 "github.com/davecgh/go-spew/spew"
37 "github.com/dustin/go-humanize"
38 "github.com/google/btree"
39 "golang.org/x/time/rate"
42 // Clients contain zero or more Torrents. A Client manages a blocklist, the
43 // TCP/UDP protocol ports, and DHT as desired.
45 // An aggregate of stats over all connections. First in struct to ensure
46 // 64-bit alignment of fields. See #262.
51 closed missinggo.Event
57 defaultStorage *storage.Client
60 dhtServers []*dht.Server
61 ipBlockList iplist.Ranger
62 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
63 extensionBytes pp.PeerExtensionBits
65 // Set of addresses that have our client ID. This intentionally will
66 // include ourselves if we end up trying to connect to our own address
67 // through legitimate channels.
68 dopplegangerAddrs map[string]struct{}
69 badPeerIPs map[string]struct{}
70 torrents map[InfoHash]*Torrent
72 acceptLimiter map[ipStr]int
73 dialRateLimiter *rate.Limiter
78 func (cl *Client) BadPeerIPs() []string {
81 return cl.badPeerIPsLocked()
84 func (cl *Client) badPeerIPsLocked() []string {
85 return slices.FromMapKeys(cl.badPeerIPs).([]string)
88 func (cl *Client) PeerID() PeerID {
92 type torrentAddr string
94 func (torrentAddr) Network() string { return "" }
96 func (me torrentAddr) String() string { return string(me) }
98 func (cl *Client) LocalPort() (port int) {
99 cl.eachListener(func(l socket) bool {
100 _port := missinggo.AddrPort(l.Addr())
106 } else if port != _port {
107 panic("mismatched ports")
114 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
115 dhtStats := s.Stats()
116 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
117 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
118 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
119 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
122 // Writes out a human readable status of the client, such as for writing to a
124 func (cl *Client) WriteStatus(_w io.Writer) {
127 w := bufio.NewWriter(_w)
129 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
130 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
131 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
132 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
133 cl.eachDhtServer(func(s *dht.Server) {
134 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
135 writeDhtServerStatus(w, s)
137 spew.Fdump(w, cl.stats)
138 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
140 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
141 return l.InfoHash().AsString() < r.InfoHash().AsString()
144 fmt.Fprint(w, "<unknown name>")
146 fmt.Fprint(w, t.name())
150 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
152 w.WriteString("<missing metainfo>")
160 const debugLogValue = "debug"
162 func (cl *Client) debugLogFilter(m *log.Msg) bool {
163 if !cl.config.Debug {
164 _, ok := m.Values()[debugLogValue]
170 func (cl *Client) initLogger() {
171 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
174 func (cl *Client) announceKey() int32 {
175 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
178 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
180 cfg = NewDefaultClientConfig()
189 dopplegangerAddrs: make(map[string]struct{}),
190 torrents: make(map[metainfo.Hash]*Torrent),
191 dialRateLimiter: rate.NewLimiter(10, 10),
193 go cl.acceptLimitClearer()
201 cl.extensionBytes = defaultPeerExtensionBytes()
202 cl.event.L = cl.locker()
203 storageImpl := cfg.DefaultStorage
204 if storageImpl == nil {
205 // We'd use mmap but HFS+ doesn't support sparse files.
206 storageImpl = storage.NewFile(cfg.DataDir)
207 cl.onClose = append(cl.onClose, func() {
208 if err := storageImpl.Close(); err != nil {
209 log.Printf("error closing default storage: %s", err)
213 cl.defaultStorage = storage.NewClient(storageImpl)
214 if cfg.IPBlocklist != nil {
215 cl.ipBlockList = cfg.IPBlocklist
218 if cfg.PeerID != "" {
219 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
221 o := copy(cl.peerID[:], cfg.Bep20)
222 _, err = rand.Read(cl.peerID[o:])
224 panic("error generating peer id")
228 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
229 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
230 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
234 cl.conns, err = listenAll(allPeerNetworks, cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
241 for _, s := range cl.conns {
242 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
243 go cl.acceptConnections(s)
249 for _, s := range cl.conns {
250 if pc, ok := s.(net.PacketConn); ok {
251 ds, err := cl.newDhtServer(pc)
255 cl.dhtServers = append(cl.dhtServers, ds)
263 func (cl *Client) firewallCallback(net.Addr) bool {
265 block := !cl.wantConns()
268 torrent.Add("connections firewalled", 1)
270 torrent.Add("connections not firewalled", 1)
275 func (cl *Client) enabledPeerNetworks() (ns []string) {
276 for _, n := range allPeerNetworks {
277 if peerNetworkEnabled(n, cl.config) {
284 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
285 cfg := dht.ServerConfig{
286 IPBlocklist: cl.ipBlockList,
288 OnAnnouncePeer: cl.onDHTAnnouncePeer,
289 PublicIP: func() net.IP {
290 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
291 return cl.config.PublicIp6
293 return cl.config.PublicIp4
295 StartingNodes: cl.config.DhtStartingNodes,
296 ConnectionTracking: cl.config.ConnTracker,
298 s, err = dht.NewServer(&cfg)
301 if _, err := s.Bootstrap(); err != nil {
302 log.Printf("error bootstrapping dht: %s", err)
309 func firstNonEmptyString(ss ...string) string {
310 for _, s := range ss {
318 func (cl *Client) Closed() <-chan struct{} {
324 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
325 for _, ds := range cl.dhtServers {
330 func (cl *Client) closeSockets() {
331 cl.eachListener(func(l socket) bool {
338 // Stops the client. All connections to peers are closed and all activity will
340 func (cl *Client) Close() {
344 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
346 for _, t := range cl.torrents {
349 for _, f := range cl.onClose {
355 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
356 if cl.ipBlockList == nil {
359 return cl.ipBlockList.Lookup(ip)
362 func (cl *Client) ipIsBlocked(ip net.IP) bool {
363 _, blocked := cl.ipBlockRange(ip)
367 func (cl *Client) wantConns() bool {
368 for _, t := range cl.torrents {
376 func (cl *Client) waitAccept() {
378 if cl.closed.IsSet() {
388 func (cl *Client) rejectAccepted(conn net.Conn) bool {
389 ra := conn.RemoteAddr()
390 rip := missinggo.AddrIP(ra)
391 if cl.config.DisableIPv4Peers && rip.To4() != nil {
394 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
397 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
400 if cl.rateLimitAccept(rip) {
403 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
406 func (cl *Client) acceptConnections(l net.Listener) {
408 conn, err := l.Accept()
409 conn = pproffd.WrapNetConn(conn)
411 closed := cl.closed.IsSet()
414 reject = cl.rejectAccepted(conn)
424 log.Printf("error accepting connection: %s", err)
429 torrent.Add("rejected accepted connections", 1)
432 go cl.incomingConnection(conn)
434 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
435 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
436 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
437 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
442 func (cl *Client) incomingConnection(nc net.Conn) {
444 if tc, ok := nc.(*net.TCPConn); ok {
447 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
448 c.Discovery = peerSourceIncoming
449 cl.runReceivedConn(c)
452 // Returns a handle to the given torrent, if it's present in the client.
453 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
456 t, ok = cl.torrents[ih]
460 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
461 return cl.torrents[ih]
464 type dialResult struct {
469 func countDialResult(err error) {
471 torrent.Add("successful dials", 1)
473 torrent.Add("unsuccessful dials", 1)
477 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
478 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
479 if ret < minDialTimeout {
485 // Returns whether an address is known to connect to a client with our own ID.
486 func (cl *Client) dopplegangerAddr(addr string) bool {
487 _, ok := cl.dopplegangerAddrs[addr]
491 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
493 case allowIpv4 && allowIpv6:
495 case allowIpv4 && !allowIpv6:
497 case !allowIpv4 && allowIpv6:
500 panic("unhandled ip network combination")
504 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
505 return sock.DialContext(ctx, "", addr)
508 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
510 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
511 c := func(s string) bool {
512 return strings.Contains(network, s)
515 if c("udp") || c("utp") {
519 if cfg.DisableTCP && c("tcp") {
522 if cfg.DisableIPv6 && c("6") {
528 // Returns a connection over UTP or TCP, whichever is first to connect.
529 func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
530 ctx, cancel := context.WithCancel(ctx)
531 // As soon as we return one connection, cancel the others.
534 resCh := make(chan dialResult, left)
538 cl.eachListener(func(s socket) bool {
539 network := s.Addr().Network()
540 if peerNetworkEnabled(network, cl.config) {
543 cte := cl.config.ConnTracker.Wait(
544 conntrack.Entry{network, s.Addr().String(), addr},
545 "dial torrent client")
546 c, err := s.dial(ctx, addr)
547 // This is a bit optimistic, but it looks non-trivial to thread
548 // this through the proxy code. Set it now in case we close the
549 // connection forthwith.
550 if tc, ok := c.(*net.TCPConn); ok {
554 dr := dialResult{c, network}
558 dr.Conn = closeWrapper{c, func() error {
571 // Wait for a successful connection.
573 defer perf.ScopeTimer()()
574 for ; left > 0 && res.Conn == nil; left-- {
578 // There are still incompleted dials.
580 for ; left > 0; left-- {
581 conn := (<-resCh).Conn
588 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
593 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
594 if _, ok := t.halfOpen[addr]; !ok {
595 panic("invariant broken")
597 delete(t.halfOpen, addr)
601 // Performs initiator handshakes and returns a connection. Returns nil
602 // *connection if no connection for valid reasons.
603 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
604 c = cl.newConnection(nc, true, remoteAddr, network)
605 c.headerEncrypted = encryptHeader
606 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
608 dl, ok := ctx.Deadline()
612 err = nc.SetDeadline(dl)
616 ok, err = cl.initiateHandshakes(c, t)
623 // Returns nil connection and nil error if no connection could be established
624 // for valid reasons.
625 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
626 dr := cl.dialFirst(ctx, addr.String())
632 if c == nil || err != nil {
636 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network)
639 // Returns nil connection and nil error if no connection could be established
640 // for valid reasons.
641 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
642 torrent.Add("establish outgoing connection", 1)
643 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
646 return t.dialTimeout()
649 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
650 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
655 torrent.Add("initiated conn with preferred header obfuscation", 1)
658 if cl.config.ForceEncryption {
659 // We should have just tried with an obfuscated header. A plaintext
660 // header can't result in an encrypted connection, so we're done.
661 if !obfuscatedHeaderFirst {
662 panic(cl.config.EncryptionPolicy)
666 // Try again with encryption if we didn't earlier, or without if we did.
667 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
669 torrent.Add("initiated conn with fallback header obfuscation", 1)
674 // Called to dial out and run a connection. The addr we're given is already
675 // considered half-open.
676 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
677 cl.dialRateLimiter.Wait(context.Background())
678 c, err := cl.establishOutgoingConn(t, addr)
681 // Don't release lock between here and addConnection, unless it's for
683 cl.noLongerHalfOpen(t, addr.String())
686 log.Printf("error establishing outgoing connection: %s", err)
695 cl.runHandshookConn(c, t)
698 // The port number for incoming peer connections. 0 if the client isn't
700 func (cl *Client) incomingPeerPort() int {
701 return cl.LocalPort()
704 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
705 if c.headerEncrypted {
707 rw, c.cryptoMethod, err = mse.InitiateHandshake(
714 func() mse.CryptoMethod {
716 case cl.config.ForceEncryption:
717 return mse.CryptoMethodRC4
718 case cl.config.DisableEncryption:
719 return mse.CryptoMethodPlaintext
721 return mse.AllSupportedCrypto
730 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
731 if ih != t.infoHash {
737 // Calls f with any secret keys.
738 func (cl *Client) forSkeys(f func([]byte) bool) {
741 for ih := range cl.torrents {
748 // Do encryption and bittorrent handshakes as receiver.
749 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
750 defer perf.ScopeTimerErr(&err)()
752 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
754 if err == nil || err == mse.ErrNoSecretKeyMatch {
755 if c.headerEncrypted {
756 torrent.Add("handshakes received encrypted", 1)
758 torrent.Add("handshakes received unencrypted", 1)
761 torrent.Add("handshakes received with error while handling encryption", 1)
764 if err == mse.ErrNoSecretKeyMatch {
769 if cl.config.ForceEncryption && !c.headerEncrypted {
770 err = errors.New("connection not encrypted")
773 ih, ok, err := cl.connBTHandshake(c, nil)
775 err = fmt.Errorf("error during bt handshake: %s", err)
787 // Returns !ok if handshake failed for valid reasons.
788 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
789 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
790 if err != nil || !ok {
794 c.PeerExtensionBytes = res.PeerExtensionBits
795 c.PeerID = res.PeerID
796 c.completedHandshake = time.Now()
800 func (cl *Client) runReceivedConn(c *connection) {
801 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
805 t, err := cl.receiveHandshakes(c)
808 "error receiving handshakes: %s", err,
812 "network", c.network,
814 torrent.Add("error receiving handshake", 1)
816 cl.onBadAccept(c.remoteAddr)
821 torrent.Add("received handshake for unloaded torrent", 1)
823 cl.onBadAccept(c.remoteAddr)
827 torrent.Add("received handshake for loaded torrent", 1)
830 cl.runHandshookConn(c, t)
833 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
835 if c.PeerID == cl.peerID {
838 addr := c.conn.RemoteAddr().String()
839 cl.dopplegangerAddrs[addr] = struct{}{}
841 // Because the remote address is not necessarily the same as its
842 // client's torrent listen address, we won't record the remote address
843 // as a doppleganger. Instead, the initiator can record *us* as the
848 c.conn.SetWriteDeadline(time.Time{})
849 c.r = deadlineReader{c.conn, c.r}
850 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
851 if connIsIpv6(c.conn) {
852 torrent.Add("completed handshake over ipv6", 1)
854 if err := t.addConnection(c); err != nil {
855 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
858 defer t.dropConnection(c)
859 go c.writer(time.Minute)
860 cl.sendInitialMessages(c, t)
861 err := c.mainReadLoop()
862 if err != nil && cl.config.Debug {
863 log.Printf("error during connection main read loop: %s", err)
867 // See the order given in Transmission's tr_peerMsgsNew.
868 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
869 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
870 conn.Post(pp.Message{
872 ExtendedID: pp.HandshakeExtendedID,
873 ExtendedPayload: func() []byte {
874 msg := pp.ExtendedHandshakeMessage{
875 M: map[pp.ExtensionName]pp.ExtensionNumber{
876 pp.ExtensionNameMetadata: metadataExtendedId,
878 V: cl.config.ExtendedHandshakeClientVersion,
879 Reqq: 64, // TODO: Really?
880 YourIp: pp.CompactIp(conn.remoteAddr.IP),
881 Encryption: !cl.config.DisableEncryption,
882 Port: cl.incomingPeerPort(),
883 MetadataSize: torrent.metadataSize(),
884 // TODO: We can figured these out specific to the socket
886 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
887 Ipv6: cl.config.PublicIp6.To16(),
889 if !cl.config.DisablePEX {
890 msg.M[pp.ExtensionNamePex] = pexExtendedId
892 return bencode.MustMarshal(msg)
897 if conn.fastEnabled() {
898 if torrent.haveAllPieces() {
899 conn.Post(pp.Message{Type: pp.HaveAll})
900 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
902 } else if !torrent.haveAnyPieces() {
903 conn.Post(pp.Message{Type: pp.HaveNone})
904 conn.sentHaves.Clear()
910 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
911 conn.Post(pp.Message{
918 func (cl *Client) dhtPort() (ret uint16) {
919 cl.eachDhtServer(func(s *dht.Server) {
920 ret = uint16(missinggo.AddrPort(s.Addr()))
925 func (cl *Client) haveDhtServer() (ret bool) {
926 cl.eachDhtServer(func(_ *dht.Server) {
932 // Process incoming ut_metadata message.
933 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
935 err := bencode.Unmarshal(payload, &d)
936 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
937 } else if err != nil {
938 return fmt.Errorf("error unmarshalling bencode: %s", err)
940 msgType, ok := d["msg_type"]
942 return errors.New("missing msg_type field")
946 case pp.DataMetadataExtensionMsgType:
947 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
948 if !c.requestedMetadataPiece(piece) {
949 return fmt.Errorf("got unexpected piece %d", piece)
951 c.metadataRequests[piece] = false
952 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
953 if begin < 0 || begin >= len(payload) {
954 return fmt.Errorf("data has bad offset in payload: %d", begin)
956 t.saveMetadataPiece(piece, payload[begin:])
957 c.lastUsefulChunkReceived = time.Now()
958 return t.maybeCompleteMetadata()
959 case pp.RequestMetadataExtensionMsgType:
960 if !t.haveMetadataPiece(piece) {
961 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
964 start := (1 << 14) * piece
965 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
967 case pp.RejectMetadataExtensionMsgType:
970 return errors.New("unknown msg_type value")
974 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
978 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
981 if _, ok := cl.ipBlockRange(ip); ok {
984 if _, ok := cl.badPeerIPs[ip.String()]; ok {
990 // Return a Torrent ready for insertion into a Client.
991 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
992 // use provided storage, if provided
993 storageClient := cl.defaultStorage
994 if specStorage != nil {
995 storageClient = storage.NewClient(specStorage)
1001 peers: prioritizedPeers{
1003 getPrio: func(p Peer) peerPriority {
1004 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1007 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1009 halfOpen: make(map[string]Peer),
1010 pieceStateChanges: pubsub.NewPubSub(),
1012 storageOpener: storageClient,
1013 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1015 networkingEnabled: true,
1017 metadataChanged: sync.Cond{
1020 duplicateRequestTimeout: 1 * time.Second,
1022 t.logger = cl.logger.Clone().AddValue(t)
1023 t.setChunkSize(defaultChunkSize)
1027 // A file-like handle to some torrent data resource.
1028 type Handle interface {
1035 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1036 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1039 // Adds a torrent by InfoHash with a custom Storage implementation.
1040 // If the torrent already exists then this Storage is ignored and the
1041 // existing torrent returned with `new` set to `false`
1042 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1045 t, ok := cl.torrents[infoHash]
1051 t = cl.newTorrent(infoHash, specStorage)
1052 cl.eachDhtServer(func(s *dht.Server) {
1053 go t.dhtAnnouncer(s)
1055 cl.torrents[infoHash] = t
1056 cl.clearAcceptLimits()
1057 t.updateWantPeersEvent()
1058 // Tickle Client.waitAccept, new torrent may want conns.
1059 cl.event.Broadcast()
1063 // Add or merge a torrent spec. If the torrent is already present, the
1064 // trackers will be merged with the existing ones. If the Info isn't yet
1065 // known, it will be set. The display name is replaced if the new spec
1066 // provides one. Returns new if the torrent wasn't already in the client.
1067 // Note that any `Storage` defined on the spec will be ignored if the
1068 // torrent is already present (i.e. `new` return value is `true`)
1069 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1070 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1071 if spec.DisplayName != "" {
1072 t.SetDisplayName(spec.DisplayName)
1074 if spec.InfoBytes != nil {
1075 err = t.SetInfoBytes(spec.InfoBytes)
1082 if spec.ChunkSize != 0 {
1083 t.setChunkSize(pp.Integer(spec.ChunkSize))
1085 t.addTrackers(spec.Trackers)
1090 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1091 t, ok := cl.torrents[infoHash]
1093 err = fmt.Errorf("no such torrent")
1100 delete(cl.torrents, infoHash)
1104 func (cl *Client) allTorrentsCompleted() bool {
1105 for _, t := range cl.torrents {
1109 if !t.haveAllPieces() {
1116 // Returns true when all torrents are completely downloaded and false if the
1117 // client is stopped before that.
1118 func (cl *Client) WaitAll() bool {
1121 for !cl.allTorrentsCompleted() {
1122 if cl.closed.IsSet() {
1130 // Returns handles to all the torrents loaded in the Client.
1131 func (cl *Client) Torrents() []*Torrent {
1134 return cl.torrentsAsSlice()
1137 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1138 for _, t := range cl.torrents {
1139 ret = append(ret, t)
1144 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1145 spec, err := TorrentSpecFromMagnetURI(uri)
1149 T, _, err = cl.AddTorrentSpec(spec)
1153 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1154 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1156 slices.MakeInto(&ss, mi.Nodes)
1161 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1162 mi, err := metainfo.LoadFromFile(filename)
1166 return cl.AddTorrent(mi)
1169 func (cl *Client) DhtServers() []*dht.Server {
1170 return cl.dhtServers
1173 func (cl *Client) AddDHTNodes(nodes []string) {
1174 for _, n := range nodes {
1175 hmp := missinggo.SplitHostMaybePort(n)
1176 ip := net.ParseIP(hmp.Host)
1178 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1181 ni := krpc.NodeInfo{
1182 Addr: krpc.NodeAddr{
1187 cl.eachDhtServer(func(s *dht.Server) {
1193 func (cl *Client) banPeerIP(ip net.IP) {
1194 if cl.badPeerIPs == nil {
1195 cl.badPeerIPs = make(map[string]struct{})
1197 cl.badPeerIPs[ip.String()] = struct{}{}
1200 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1206 PeerMaxRequests: 250,
1207 writeBuffer: new(bytes.Buffer),
1208 remoteAddr: remoteAddr,
1211 c.writerCond.L = cl.locker()
1212 c.setRW(connStatsReadWriter{nc, c})
1213 c.r = &rateLimitedReader{
1214 l: cl.config.DownloadRateLimiter,
1220 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1230 Source: peerSourceDHTAnnouncePeer,
1234 func firstNotNil(ips ...net.IP) net.IP {
1235 for _, ip := range ips {
1243 func (cl *Client) eachListener(f func(socket) bool) {
1244 for _, s := range cl.conns {
1251 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1252 cl.eachListener(func(l socket) bool {
1259 func (cl *Client) publicIp(peer net.IP) net.IP {
1260 // TODO: Use BEP 10 to determine how peers are seeing us.
1261 if peer.To4() != nil {
1263 cl.config.PublicIp4,
1264 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1268 cl.config.PublicIp6,
1269 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1274 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1275 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1276 return f(missinggo.AddrIP(l.Addr()))
1280 // Our IP as a peer should see it.
1281 func (cl *Client) publicAddr(peer net.IP) IpPort {
1282 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1285 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1288 cl.eachListener(func(l socket) bool {
1289 ret = append(ret, l.Addr())
1295 func (cl *Client) onBadAccept(addr IpPort) {
1296 ip := maskIpForAcceptLimiting(addr.IP)
1297 if cl.acceptLimiter == nil {
1298 cl.acceptLimiter = make(map[ipStr]int)
1300 cl.acceptLimiter[ipStr(ip.String())]++
1303 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1304 if ip4 := ip.To4(); ip4 != nil {
1305 return ip4.Mask(net.CIDRMask(24, 32))
1310 func (cl *Client) clearAcceptLimits() {
1311 cl.acceptLimiter = nil
1314 func (cl *Client) acceptLimitClearer() {
1317 case <-cl.closed.LockedChan(cl.locker()):
1319 case <-time.After(15 * time.Minute):
1321 cl.clearAcceptLimits()
1327 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1328 if cl.config.DisableAcceptRateLimiting {
1331 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1334 func (cl *Client) rLock() {
1338 func (cl *Client) rUnlock() {
1342 func (cl *Client) lock() {
1346 func (cl *Client) unlock() {
1350 func (cl *Client) locker() sync.Locker {
1351 return clientLocker{cl}
1354 type clientLocker struct {
1358 func (cl clientLocker) Lock() {
1362 func (cl clientLocker) Unlock() {