17 "github.com/anacrolix/dht"
18 "github.com/anacrolix/dht/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
42 // An aggregate of stats over all connections. First in struct to ensure
43 // 64-bit alignment of fields. See #262.
48 closed missinggo.Event
54 defaultStorage *storage.Client
57 dhtServers []*dht.Server
58 ipBlockList iplist.Ranger
59 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
60 extensionBytes pp.PeerExtensionBits
62 // Set of addresses that have our client ID. This intentionally will
63 // include ourselves if we end up trying to connect to our own address
64 // through legitimate channels.
65 dopplegangerAddrs map[string]struct{}
66 badPeerIPs map[string]struct{}
67 torrents map[InfoHash]*Torrent
69 acceptLimiter map[ipStr]int
74 func (cl *Client) BadPeerIPs() []string {
77 return cl.badPeerIPsLocked()
80 func (cl *Client) badPeerIPsLocked() []string {
81 return slices.FromMapKeys(cl.badPeerIPs).([]string)
84 func (cl *Client) PeerID() PeerID {
88 type torrentAddr string
90 func (torrentAddr) Network() string { return "" }
92 func (me torrentAddr) String() string { return string(me) }
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = "debug"
158 func (cl *Client) debugLogFilter(m *log.Msg) bool {
159 if !cl.config.Debug {
160 _, ok := m.Values()[debugLogValue]
166 func (cl *Client) initLogger() {
167 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
197 cl.event.L = cl.locker()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 log.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
230 for _, s := range cl.conns {
231 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
232 go cl.acceptConnections(s)
238 for _, s := range cl.conns {
239 if pc, ok := s.(net.PacketConn); ok {
240 ds, err := cl.newDhtServer(pc)
244 cl.dhtServers = append(cl.dhtServers, ds)
252 func (cl *Client) firewallCallback(net.Addr) bool {
254 block := !cl.wantConns()
257 torrent.Add("connections firewalled", 1)
259 torrent.Add("connections not firewalled", 1)
264 func (cl *Client) enabledPeerNetworks() (ns []string) {
265 for _, n := range allPeerNetworks {
266 if peerNetworkEnabled(n, cl.config) {
273 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
274 cfg := dht.ServerConfig{
275 IPBlocklist: cl.ipBlockList,
277 OnAnnouncePeer: cl.onDHTAnnouncePeer,
278 PublicIP: func() net.IP {
279 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
280 return cl.config.PublicIp6
282 return cl.config.PublicIp4
284 StartingNodes: cl.config.DhtStartingNodes,
286 s, err = dht.NewServer(&cfg)
289 if _, err := s.Bootstrap(); err != nil {
290 log.Printf("error bootstrapping dht: %s", err)
297 func firstNonEmptyString(ss ...string) string {
298 for _, s := range ss {
306 func (cl *Client) Closed() <-chan struct{} {
312 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
313 for _, ds := range cl.dhtServers {
318 func (cl *Client) closeSockets() {
319 cl.eachListener(func(l socket) bool {
326 // Stops the client. All connections to peers are closed and all activity will
328 func (cl *Client) Close() {
332 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
334 for _, t := range cl.torrents {
337 for _, f := range cl.onClose {
343 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
344 if cl.ipBlockList == nil {
347 return cl.ipBlockList.Lookup(ip)
350 func (cl *Client) ipIsBlocked(ip net.IP) bool {
351 _, blocked := cl.ipBlockRange(ip)
355 func (cl *Client) wantConns() bool {
356 for _, t := range cl.torrents {
364 func (cl *Client) waitAccept() {
366 if cl.closed.IsSet() {
376 func (cl *Client) rejectAccepted(conn net.Conn) bool {
377 ra := conn.RemoteAddr()
378 rip := missinggo.AddrIP(ra)
379 if cl.config.DisableIPv4Peers && rip.To4() != nil {
382 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
385 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
388 if cl.rateLimitAccept(rip) {
391 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
394 func (cl *Client) acceptConnections(l net.Listener) {
396 conn, err := l.Accept()
397 conn = pproffd.WrapNetConn(conn)
399 closed := cl.closed.IsSet()
402 reject = cl.rejectAccepted(conn)
412 log.Printf("error accepting connection: %s", err)
417 torrent.Add("rejected accepted connections", 1)
420 go cl.incomingConnection(conn)
422 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
423 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
424 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
425 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
430 func (cl *Client) incomingConnection(nc net.Conn) {
432 if tc, ok := nc.(*net.TCPConn); ok {
435 c := cl.newConnection(nc, false)
436 c.Discovery = peerSourceIncoming
437 cl.runReceivedConn(c)
440 // Returns a handle to the given torrent, if it's present in the client.
441 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
444 t, ok = cl.torrents[ih]
448 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
449 return cl.torrents[ih]
452 type dialResult struct {
456 func countDialResult(err error) {
458 torrent.Add("successful dials", 1)
460 torrent.Add("unsuccessful dials", 1)
464 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
465 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
466 if ret < minDialTimeout {
472 // Returns whether an address is known to connect to a client with our own ID.
473 func (cl *Client) dopplegangerAddr(addr string) bool {
474 _, ok := cl.dopplegangerAddrs[addr]
478 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
480 case allowIpv4 && allowIpv6:
482 case allowIpv4 && !allowIpv6:
484 case !allowIpv4 && allowIpv6:
487 panic("unhandled ip network combination")
491 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
492 return sock.DialContext(ctx, "", addr)
495 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
497 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
498 c := func(s string) bool {
499 return strings.Contains(network, s)
502 if c("udp") || c("utp") {
506 if cfg.DisableTCP && c("tcp") {
509 if cfg.DisableIPv6 && c("6") {
515 // Returns a connection over UTP or TCP, whichever is first to connect.
516 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
517 ctx, cancel := context.WithCancel(ctx)
518 // As soon as we return one connection, cancel the others.
521 resCh := make(chan dialResult, left)
522 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
525 c, err := f(ctx, addr)
526 // This is a bit optimistic, but it looks non-trivial to thread
527 // this through the proxy code. Set it now in case we close the
528 // connection forthwith.
529 if tc, ok := c.(*net.TCPConn); ok {
533 resCh <- dialResult{c}
539 cl.eachListener(func(s socket) bool {
540 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
547 // Wait for a successful connection.
549 defer perf.ScopeTimer()()
550 for ; left > 0 && res.Conn == nil; left-- {
554 // There are still incompleted dials.
556 for ; left > 0; left-- {
557 conn := (<-resCh).Conn
564 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
569 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
570 if _, ok := t.halfOpen[addr]; !ok {
571 panic("invariant broken")
573 delete(t.halfOpen, addr)
577 // Performs initiator handshakes and returns a connection. Returns nil
578 // *connection if no connection for valid reasons.
579 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
580 c = cl.newConnection(nc, true)
581 c.headerEncrypted = encryptHeader
582 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
584 dl, ok := ctx.Deadline()
588 err = nc.SetDeadline(dl)
592 ok, err = cl.initiateHandshakes(c, t)
599 // Returns nil connection and nil error if no connection could be established
600 // for valid reasons.
601 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
602 nc := cl.dialFirst(ctx, addr)
607 if c == nil || err != nil {
611 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
614 // Returns nil connection and nil error if no connection could be established
615 // for valid reasons.
616 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
617 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
620 return t.dialTimeout()
623 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
624 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
629 torrent.Add("initiated conn with preferred header obfuscation", 1)
632 if cl.config.ForceEncryption {
633 // We should have just tried with an obfuscated header. A plaintext
634 // header can't result in an encrypted connection, so we're done.
635 if !obfuscatedHeaderFirst {
636 panic(cl.config.EncryptionPolicy)
640 // Try again with encryption if we didn't earlier, or without if we did.
641 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
643 torrent.Add("initiated conn with fallback header obfuscation", 1)
648 // Called to dial out and run a connection. The addr we're given is already
649 // considered half-open.
650 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
651 c, err := cl.establishOutgoingConn(t, addr)
654 // Don't release lock between here and addConnection, unless it's for
656 cl.noLongerHalfOpen(t, addr)
659 log.Printf("error establishing outgoing connection: %s", err)
668 cl.runHandshookConn(c, t)
671 // The port number for incoming peer connections. 0 if the client isn't
673 func (cl *Client) incomingPeerPort() int {
674 return cl.LocalPort()
677 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
678 if c.headerEncrypted {
680 rw, c.cryptoMethod, err = mse.InitiateHandshake(
687 func() mse.CryptoMethod {
689 case cl.config.ForceEncryption:
690 return mse.CryptoMethodRC4
691 case cl.config.DisableEncryption:
692 return mse.CryptoMethodPlaintext
694 return mse.AllSupportedCrypto
703 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
704 if ih != t.infoHash {
710 // Calls f with any secret keys.
711 func (cl *Client) forSkeys(f func([]byte) bool) {
714 for ih := range cl.torrents {
721 // Do encryption and bittorrent handshakes as receiver.
722 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
723 defer perf.ScopeTimerErr(&err)()
725 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
727 if err == nil || err == mse.ErrNoSecretKeyMatch {
728 if c.headerEncrypted {
729 torrent.Add("handshakes received encrypted", 1)
731 torrent.Add("handshakes received unencrypted", 1)
734 torrent.Add("handshakes received with error while handling encryption", 1)
737 if err == mse.ErrNoSecretKeyMatch {
742 if cl.config.ForceEncryption && !c.headerEncrypted {
743 err = errors.New("connection not encrypted")
746 ih, ok, err := cl.connBTHandshake(c, nil)
748 err = fmt.Errorf("error during bt handshake: %s", err)
760 // Returns !ok if handshake failed for valid reasons.
761 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
762 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
763 if err != nil || !ok {
767 c.PeerExtensionBytes = res.PeerExtensionBits
768 c.PeerID = res.PeerID
769 c.completedHandshake = time.Now()
773 func (cl *Client) runReceivedConn(c *connection) {
774 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
778 t, err := cl.receiveHandshakes(c)
781 "error receiving handshakes: %s", err,
785 "network", c.remoteAddr().Network(),
787 torrent.Add("error receiving handshake", 1)
789 cl.onBadAccept(c.remoteAddr())
794 torrent.Add("received handshake for unloaded torrent", 1)
796 cl.onBadAccept(c.remoteAddr())
800 torrent.Add("received handshake for loaded torrent", 1)
803 cl.runHandshookConn(c, t)
806 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
808 if c.PeerID == cl.peerID {
811 addr := c.conn.RemoteAddr().String()
812 cl.dopplegangerAddrs[addr] = struct{}{}
814 // Because the remote address is not necessarily the same as its
815 // client's torrent listen address, we won't record the remote address
816 // as a doppleganger. Instead, the initiator can record *us* as the
821 c.conn.SetWriteDeadline(time.Time{})
822 c.r = deadlineReader{c.conn, c.r}
823 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
824 if connIsIpv6(c.conn) {
825 torrent.Add("completed handshake over ipv6", 1)
827 if err := t.addConnection(c); err != nil {
828 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
831 defer t.dropConnection(c)
832 go c.writer(time.Minute)
833 cl.sendInitialMessages(c, t)
834 err := c.mainReadLoop()
835 if err != nil && cl.config.Debug {
836 log.Printf("error during connection main read loop: %s", err)
840 // See the order given in Transmission's tr_peerMsgsNew.
841 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
842 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
843 conn.Post(pp.Message{
845 ExtendedID: pp.HandshakeExtendedID,
846 ExtendedPayload: func() []byte {
847 msg := pp.ExtendedHandshakeMessage{
848 M: map[pp.ExtensionName]pp.ExtensionNumber{
849 pp.ExtensionNameMetadata: metadataExtendedId,
851 V: cl.config.ExtendedHandshakeClientVersion,
852 Reqq: 64, // TODO: Really?
853 YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
854 Encryption: !cl.config.DisableEncryption,
855 Port: cl.incomingPeerPort(),
856 MetadataSize: torrent.metadataSize(),
857 // TODO: We can figured these out specific to the socket
859 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
860 Ipv6: cl.config.PublicIp6.To16(),
862 if !cl.config.DisablePEX {
863 msg.M[pp.ExtensionNamePex] = pexExtendedId
865 return bencode.MustMarshal(msg)
870 if conn.fastEnabled() {
871 if torrent.haveAllPieces() {
872 conn.Post(pp.Message{Type: pp.HaveAll})
873 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
875 } else if !torrent.haveAnyPieces() {
876 conn.Post(pp.Message{Type: pp.HaveNone})
877 conn.sentHaves.Clear()
883 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
884 conn.Post(pp.Message{
891 func (cl *Client) dhtPort() (ret uint16) {
892 cl.eachDhtServer(func(s *dht.Server) {
893 ret = uint16(missinggo.AddrPort(s.Addr()))
898 func (cl *Client) haveDhtServer() (ret bool) {
899 cl.eachDhtServer(func(_ *dht.Server) {
905 // Process incoming ut_metadata message.
906 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
908 err := bencode.Unmarshal(payload, &d)
909 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
910 } else if err != nil {
911 return fmt.Errorf("error unmarshalling bencode: %s", err)
913 msgType, ok := d["msg_type"]
915 return errors.New("missing msg_type field")
919 case pp.DataMetadataExtensionMsgType:
920 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
921 if !c.requestedMetadataPiece(piece) {
922 return fmt.Errorf("got unexpected piece %d", piece)
924 c.metadataRequests[piece] = false
925 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
926 if begin < 0 || begin >= len(payload) {
927 return fmt.Errorf("data has bad offset in payload: %d", begin)
929 t.saveMetadataPiece(piece, payload[begin:])
930 c.lastUsefulChunkReceived = time.Now()
931 return t.maybeCompleteMetadata()
932 case pp.RequestMetadataExtensionMsgType:
933 if !t.haveMetadataPiece(piece) {
934 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
937 start := (1 << 14) * piece
938 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
940 case pp.RejectMetadataExtensionMsgType:
943 return errors.New("unknown msg_type value")
947 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
951 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
954 if _, ok := cl.ipBlockRange(ip); ok {
957 if _, ok := cl.badPeerIPs[ip.String()]; ok {
963 // Return a Torrent ready for insertion into a Client.
964 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
965 // use provided storage, if provided
966 storageClient := cl.defaultStorage
967 if specStorage != nil {
968 storageClient = storage.NewClient(specStorage)
974 peers: prioritizedPeers{
976 getPrio: func(p Peer) peerPriority {
977 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
980 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
982 halfOpen: make(map[string]Peer),
983 pieceStateChanges: pubsub.NewPubSub(),
985 storageOpener: storageClient,
986 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
988 networkingEnabled: true,
990 metadataChanged: sync.Cond{
993 duplicateRequestTimeout: 1 * time.Second,
995 t.logger = cl.logger.Clone().AddValue(t)
996 t.setChunkSize(defaultChunkSize)
1000 // A file-like handle to some torrent data resource.
1001 type Handle interface {
1008 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1009 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1012 // Adds a torrent by InfoHash with a custom Storage implementation.
1013 // If the torrent already exists then this Storage is ignored and the
1014 // existing torrent returned with `new` set to `false`
1015 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1018 t, ok := cl.torrents[infoHash]
1024 t = cl.newTorrent(infoHash, specStorage)
1025 cl.eachDhtServer(func(s *dht.Server) {
1026 go t.dhtAnnouncer(s)
1028 cl.torrents[infoHash] = t
1029 cl.clearAcceptLimits()
1030 t.updateWantPeersEvent()
1031 // Tickle Client.waitAccept, new torrent may want conns.
1032 cl.event.Broadcast()
1036 // Add or merge a torrent spec. If the torrent is already present, the
1037 // trackers will be merged with the existing ones. If the Info isn't yet
1038 // known, it will be set. The display name is replaced if the new spec
1039 // provides one. Returns new if the torrent wasn't already in the client.
1040 // Note that any `Storage` defined on the spec will be ignored if the
1041 // torrent is already present (i.e. `new` return value is `true`)
1042 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1043 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1044 if spec.DisplayName != "" {
1045 t.SetDisplayName(spec.DisplayName)
1047 if spec.InfoBytes != nil {
1048 err = t.SetInfoBytes(spec.InfoBytes)
1055 if spec.ChunkSize != 0 {
1056 t.setChunkSize(pp.Integer(spec.ChunkSize))
1058 t.addTrackers(spec.Trackers)
1063 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1064 t, ok := cl.torrents[infoHash]
1066 err = fmt.Errorf("no such torrent")
1073 delete(cl.torrents, infoHash)
1077 func (cl *Client) allTorrentsCompleted() bool {
1078 for _, t := range cl.torrents {
1082 if !t.haveAllPieces() {
1089 // Returns true when all torrents are completely downloaded and false if the
1090 // client is stopped before that.
1091 func (cl *Client) WaitAll() bool {
1094 for !cl.allTorrentsCompleted() {
1095 if cl.closed.IsSet() {
1103 // Returns handles to all the torrents loaded in the Client.
1104 func (cl *Client) Torrents() []*Torrent {
1107 return cl.torrentsAsSlice()
1110 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1111 for _, t := range cl.torrents {
1112 ret = append(ret, t)
1117 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1118 spec, err := TorrentSpecFromMagnetURI(uri)
1122 T, _, err = cl.AddTorrentSpec(spec)
1126 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1127 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1129 slices.MakeInto(&ss, mi.Nodes)
1134 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1135 mi, err := metainfo.LoadFromFile(filename)
1139 return cl.AddTorrent(mi)
1142 func (cl *Client) DhtServers() []*dht.Server {
1143 return cl.dhtServers
1146 func (cl *Client) AddDHTNodes(nodes []string) {
1147 for _, n := range nodes {
1148 hmp := missinggo.SplitHostMaybePort(n)
1149 ip := net.ParseIP(hmp.Host)
1151 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1154 ni := krpc.NodeInfo{
1155 Addr: krpc.NodeAddr{
1160 cl.eachDhtServer(func(s *dht.Server) {
1166 func (cl *Client) banPeerIP(ip net.IP) {
1167 if cl.badPeerIPs == nil {
1168 cl.badPeerIPs = make(map[string]struct{})
1170 cl.badPeerIPs[ip.String()] = struct{}{}
1173 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1179 PeerMaxRequests: 250,
1180 writeBuffer: new(bytes.Buffer),
1182 c.writerCond.L = cl.locker()
1183 c.setRW(connStatsReadWriter{nc, c})
1184 c.r = &rateLimitedReader{
1185 l: cl.config.DownloadRateLimiter,
1191 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1201 Source: peerSourceDHTAnnouncePeer,
1205 func firstNotNil(ips ...net.IP) net.IP {
1206 for _, ip := range ips {
1214 func (cl *Client) eachListener(f func(socket) bool) {
1215 for _, s := range cl.conns {
1222 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1223 cl.eachListener(func(l socket) bool {
1230 func (cl *Client) publicIp(peer net.IP) net.IP {
1231 // TODO: Use BEP 10 to determine how peers are seeing us.
1232 if peer.To4() != nil {
1234 cl.config.PublicIp4,
1235 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1239 cl.config.PublicIp6,
1240 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1245 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1246 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1247 return f(missinggo.AddrIP(l.Addr()))
1251 // Our IP as a peer should see it.
1252 func (cl *Client) publicAddr(peer net.IP) ipPort {
1253 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1256 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1259 cl.eachListener(func(l socket) bool {
1260 ret = append(ret, l.Addr())
1266 func (cl *Client) onBadAccept(addr net.Addr) {
1267 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1268 if cl.acceptLimiter == nil {
1269 cl.acceptLimiter = make(map[ipStr]int)
1271 cl.acceptLimiter[ipStr(ip.String())]++
1274 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1275 if ip4 := ip.To4(); ip4 != nil {
1276 return ip4.Mask(net.CIDRMask(24, 32))
1281 func (cl *Client) clearAcceptLimits() {
1282 cl.acceptLimiter = nil
1285 func (cl *Client) acceptLimitClearer() {
1288 case <-cl.closed.LockedChan(cl.locker()):
1290 case <-time.After(15 * time.Minute):
1292 cl.clearAcceptLimits()
1298 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1299 if cl.config.DisableAcceptRateLimiting {
1302 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1305 func (cl *Client) rLock() {
1309 func (cl *Client) rUnlock() {
1313 func (cl *Client) lock() {
1317 func (cl *Client) unlock() {
1321 func (cl *Client) locker() sync.Locker {
1322 return clientLocker{cl}
1325 type clientLocker struct {
1329 func (cl clientLocker) Lock() {
1333 func (cl clientLocker) Unlock() {