19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/anacrolix/torrent/bencode"
31 "github.com/anacrolix/torrent/iplist"
32 "github.com/anacrolix/torrent/metainfo"
33 "github.com/anacrolix/torrent/mse"
34 pp "github.com/anacrolix/torrent/peer_protocol"
35 "github.com/anacrolix/torrent/storage"
36 "github.com/davecgh/go-spew/spew"
37 "github.com/dustin/go-humanize"
38 "github.com/google/btree"
39 "golang.org/x/time/rate"
40 "golang.org/x/xerrors"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
61 dhtServers []*dht.Server
62 ipBlockList iplist.Ranger
63 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
64 extensionBytes pp.PeerExtensionBits
66 // Set of addresses that have our client ID. This intentionally will
67 // include ourselves if we end up trying to connect to our own address
68 // through legitimate channels.
69 dopplegangerAddrs map[string]struct{}
70 badPeerIPs map[string]struct{}
71 torrents map[InfoHash]*Torrent
73 acceptLimiter map[ipStr]int
74 dialRateLimiter *rate.Limiter
79 func (cl *Client) BadPeerIPs() []string {
82 return cl.badPeerIPsLocked()
85 func (cl *Client) badPeerIPsLocked() []string {
86 return slices.FromMapKeys(cl.badPeerIPs).([]string)
89 func (cl *Client) PeerID() PeerID {
93 func (cl *Client) LocalPort() (port int) {
94 cl.eachListener(func(l socket) bool {
95 _port := missinggo.AddrPort(l.Addr())
101 } else if port != _port {
102 panic("mismatched ports")
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
117 // Writes out a human readable status of the client, such as for writing to a
119 func (cl *Client) WriteStatus(_w io.Writer) {
122 w := bufio.NewWriter(_w)
124 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s *dht.Server) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
145 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147 w.WriteString("<missing metainfo>")
155 const debugLogValue = log.Debug
157 func (cl *Client) debugLogFilter(m *log.Msg) bool {
158 if !cl.config.Debug {
159 _, ok := m.Values()[debugLogValue]
165 func (cl *Client) initLogger() {
166 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
184 dopplegangerAddrs: make(map[string]struct{}),
185 torrents: make(map[metainfo.Hash]*Torrent),
186 dialRateLimiter: rate.NewLimiter(10, 10),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
197 cl.event.L = cl.locker()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 cl.logger.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
224 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
225 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
229 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
236 for _, s := range cl.conns {
237 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
238 go cl.acceptConnections(s)
244 for _, s := range cl.conns {
245 if pc, ok := s.(net.PacketConn); ok {
246 ds, err := cl.newDhtServer(pc)
250 cl.dhtServers = append(cl.dhtServers, ds)
258 func (cl *Client) firewallCallback(net.Addr) bool {
260 block := !cl.wantConns()
263 torrent.Add("connections firewalled", 1)
265 torrent.Add("connections not firewalled", 1)
270 func (cl *Client) enabledPeerNetworks() (ns []network) {
271 for _, n := range allPeerNetworks {
272 if peerNetworkEnabled(n, cl.config) {
279 func (cl *Client) listenOnNetwork(n network) bool {
280 if n.Ipv4 && cl.config.DisableIPv4 {
283 if n.Ipv6 && cl.config.DisableIPv6 {
286 if n.Tcp && cl.config.DisableTCP {
289 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
295 func (cl *Client) listenNetworks() (ns []network) {
296 for _, n := range allPeerNetworks {
297 if cl.listenOnNetwork(n) {
304 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
305 cfg := dht.ServerConfig{
306 IPBlocklist: cl.ipBlockList,
308 OnAnnouncePeer: cl.onDHTAnnouncePeer,
309 PublicIP: func() net.IP {
310 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
311 return cl.config.PublicIp6
313 return cl.config.PublicIp4
315 StartingNodes: cl.config.DhtStartingNodes,
316 ConnectionTracking: cl.config.ConnTracker,
317 OnQuery: cl.config.DHTOnQuery,
319 s, err = dht.NewServer(&cfg)
322 ts, err := s.Bootstrap()
324 cl.logger.Printf("error bootstrapping dht: %s", err)
326 log.Str("completed bootstrap").AddValues(s, ts).Log(cl.logger)
332 func (cl *Client) Closed() <-chan struct{} {
338 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
339 for _, ds := range cl.dhtServers {
344 func (cl *Client) closeSockets() {
345 cl.eachListener(func(l socket) bool {
352 // Stops the client. All connections to peers are closed and all activity will
354 func (cl *Client) Close() {
358 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
360 for _, t := range cl.torrents {
363 for _, f := range cl.onClose {
369 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
370 if cl.ipBlockList == nil {
373 return cl.ipBlockList.Lookup(ip)
376 func (cl *Client) ipIsBlocked(ip net.IP) bool {
377 _, blocked := cl.ipBlockRange(ip)
381 func (cl *Client) wantConns() bool {
382 for _, t := range cl.torrents {
390 func (cl *Client) waitAccept() {
392 if cl.closed.IsSet() {
402 func (cl *Client) rejectAccepted(conn net.Conn) bool {
403 ra := conn.RemoteAddr()
404 rip := missinggo.AddrIP(ra)
405 if cl.config.DisableIPv4Peers && rip.To4() != nil {
408 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
414 if cl.rateLimitAccept(rip) {
417 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
420 func (cl *Client) acceptConnections(l net.Listener) {
422 conn, err := l.Accept()
423 torrent.Add("client listener accepts", 1)
424 conn = pproffd.WrapNetConn(conn)
426 closed := cl.closed.IsSet()
429 reject = cl.rejectAccepted(conn)
439 cl.logger.Printf("error accepting connection: %s", err)
444 torrent.Add("rejected accepted connections", 1)
447 go cl.incomingConnection(conn)
449 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
450 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
451 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
452 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
457 func (cl *Client) incomingConnection(nc net.Conn) {
459 if tc, ok := nc.(*net.TCPConn); ok {
462 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
463 c.Discovery = peerSourceIncoming
464 cl.runReceivedConn(c)
467 // Returns a handle to the given torrent, if it's present in the client.
468 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
471 t, ok = cl.torrents[ih]
475 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
476 return cl.torrents[ih]
479 type dialResult struct {
484 func countDialResult(err error) {
486 torrent.Add("successful dials", 1)
488 torrent.Add("unsuccessful dials", 1)
492 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
493 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
494 if ret < minDialTimeout {
500 // Returns whether an address is known to connect to a client with our own ID.
501 func (cl *Client) dopplegangerAddr(addr string) bool {
502 _, ok := cl.dopplegangerAddrs[addr]
506 // Returns a connection over UTP or TCP, whichever is first to connect.
507 func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
508 ctx, cancel := context.WithCancel(ctx)
509 // As soon as we return one connection, cancel the others.
512 resCh := make(chan dialResult, left)
516 cl.eachListener(func(s socket) bool {
518 network := s.Addr().Network()
519 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
525 cl.dialFromSocket(ctx, s, addr),
534 // Wait for a successful connection.
536 defer perf.ScopeTimer()()
537 for ; left > 0 && res.Conn == nil; left-- {
541 // There are still incompleted dials.
543 for ; left > 0; left-- {
544 conn := (<-resCh).Conn
551 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
556 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
557 network := s.Addr().Network()
558 cte := cl.config.ConnTracker.Wait(
560 conntrack.Entry{network, s.Addr().String(), addr},
561 "dial torrent client",
564 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
565 // which dial errors allow us to forget the connection tracking entry handle.
566 if ctx.Err() != nil {
572 c, err := s.dial(ctx, addr)
573 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
574 // it now in case we close the connection forthwith.
575 if tc, ok := c.(*net.TCPConn); ok {
580 if err != nil && forgettableDialError(err) {
587 return closeWrapper{c, func() error {
594 func forgettableDialError(err error) bool {
595 return strings.Contains(err.Error(), "no suitable address found")
598 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
599 if _, ok := t.halfOpen[addr]; !ok {
600 panic("invariant broken")
602 delete(t.halfOpen, addr)
606 // Performs initiator handshakes and returns a connection. Returns nil
607 // *connection if no connection for valid reasons.
608 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
609 c = cl.newConnection(nc, true, remoteAddr, network)
610 c.headerEncrypted = encryptHeader
611 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
613 dl, ok := ctx.Deadline()
617 err = nc.SetDeadline(dl)
621 err = cl.initiateHandshakes(c, t)
625 // Returns nil connection and nil error if no connection could be established
626 // for valid reasons.
627 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
628 ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
631 return t.dialTimeout()
634 dr := cl.dialFirst(ctx, addr.String())
637 if ctx.Err() != nil {
638 return nil, ctx.Err()
640 return nil, errors.New("dial failed")
642 c, err := cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network)
649 // Returns nil connection and nil error if no connection could be established
650 // for valid reasons.
651 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
652 torrent.Add("establish outgoing connection", 1)
653 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
654 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
656 torrent.Add("initiated conn with preferred header obfuscation", 1)
659 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
660 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
661 // We should have just tried with the preferred header obfuscation. If it was required,
662 // there's nothing else to try.
665 // Try again with encryption if we didn't earlier, or without if we did.
666 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
668 torrent.Add("initiated conn with fallback header obfuscation", 1)
670 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
674 // Called to dial out and run a connection. The addr we're given is already
675 // considered half-open.
676 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
677 cl.dialRateLimiter.Wait(context.Background())
678 c, err := cl.establishOutgoingConn(t, addr)
681 // Don't release lock between here and addConnection, unless it's for
683 cl.noLongerHalfOpen(t, addr.String())
686 cl.logger.Printf("error establishing outgoing connection: %s", err)
692 cl.runHandshookConn(c, t)
695 // The port number for incoming peer connections. 0 if the client isn't
697 func (cl *Client) incomingPeerPort() int {
698 return cl.LocalPort()
701 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
702 if c.headerEncrypted {
705 rw, c.cryptoMethod, err = mse.InitiateHandshake(
712 cl.config.CryptoProvides,
716 return xerrors.Errorf("header obfuscation handshake: %w", err)
719 ih, err := cl.connBtHandshake(c, &t.infoHash)
721 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
723 if ih != t.infoHash {
724 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
729 // Calls f with any secret keys.
730 func (cl *Client) forSkeys(f func([]byte) bool) {
733 if false { // Emulate the bug from #114
735 for ih := range cl.torrents {
739 for range cl.torrents {
746 for ih := range cl.torrents {
753 // Do encryption and bittorrent handshakes as receiver.
754 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
755 defer perf.ScopeTimerErr(&err)()
757 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
759 if err == nil || err == mse.ErrNoSecretKeyMatch {
760 if c.headerEncrypted {
761 torrent.Add("handshakes received encrypted", 1)
763 torrent.Add("handshakes received unencrypted", 1)
766 torrent.Add("handshakes received with error while handling encryption", 1)
769 if err == mse.ErrNoSecretKeyMatch {
774 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
775 err = errors.New("connection not have required header obfuscation")
778 ih, err := cl.connBtHandshake(c, nil)
780 err = xerrors.Errorf("during bt handshake: %w", err)
789 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
790 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
795 c.PeerExtensionBytes = res.PeerExtensionBits
796 c.PeerID = res.PeerID
797 c.completedHandshake = time.Now()
801 func (cl *Client) runReceivedConn(c *connection) {
802 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
806 t, err := cl.receiveHandshakes(c)
809 "error receiving handshakes: %s", err,
813 "network", c.network,
815 torrent.Add("error receiving handshake", 1)
817 cl.onBadAccept(c.remoteAddr)
822 torrent.Add("received handshake for unloaded torrent", 1)
824 cl.onBadAccept(c.remoteAddr)
828 torrent.Add("received handshake for loaded torrent", 1)
831 cl.runHandshookConn(c, t)
834 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
836 if c.PeerID == cl.peerID {
839 addr := c.conn.RemoteAddr().String()
840 cl.dopplegangerAddrs[addr] = struct{}{}
842 // Because the remote address is not necessarily the same as its
843 // client's torrent listen address, we won't record the remote address
844 // as a doppleganger. Instead, the initiator can record *us* as the
849 c.conn.SetWriteDeadline(time.Time{})
850 c.r = deadlineReader{c.conn, c.r}
851 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
852 if connIsIpv6(c.conn) {
853 torrent.Add("completed handshake over ipv6", 1)
855 if err := t.addConnection(c); err != nil {
856 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
859 defer t.dropConnection(c)
860 go c.writer(time.Minute)
861 cl.sendInitialMessages(c, t)
862 err := c.mainReadLoop()
863 if err != nil && cl.config.Debug {
864 cl.logger.Printf("error during connection main read loop: %s", err)
868 // See the order given in Transmission's tr_peerMsgsNew.
869 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
870 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
871 conn.Post(pp.Message{
873 ExtendedID: pp.HandshakeExtendedID,
874 ExtendedPayload: func() []byte {
875 msg := pp.ExtendedHandshakeMessage{
876 M: map[pp.ExtensionName]pp.ExtensionNumber{
877 pp.ExtensionNameMetadata: metadataExtendedId,
879 V: cl.config.ExtendedHandshakeClientVersion,
880 Reqq: 64, // TODO: Really?
881 YourIp: pp.CompactIp(conn.remoteAddr.IP),
882 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
883 Port: cl.incomingPeerPort(),
884 MetadataSize: torrent.metadataSize(),
885 // TODO: We can figured these out specific to the socket
887 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
888 Ipv6: cl.config.PublicIp6.To16(),
890 if !cl.config.DisablePEX {
891 msg.M[pp.ExtensionNamePex] = pexExtendedId
893 return bencode.MustMarshal(msg)
898 if conn.fastEnabled() {
899 if torrent.haveAllPieces() {
900 conn.Post(pp.Message{Type: pp.HaveAll})
901 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
903 } else if !torrent.haveAnyPieces() {
904 conn.Post(pp.Message{Type: pp.HaveNone})
905 conn.sentHaves.Clear()
911 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
912 conn.Post(pp.Message{
919 func (cl *Client) dhtPort() (ret uint16) {
920 cl.eachDhtServer(func(s *dht.Server) {
921 ret = uint16(missinggo.AddrPort(s.Addr()))
926 func (cl *Client) haveDhtServer() (ret bool) {
927 cl.eachDhtServer(func(_ *dht.Server) {
933 // Process incoming ut_metadata message.
934 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
936 err := bencode.Unmarshal(payload, &d)
937 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
938 } else if err != nil {
939 return fmt.Errorf("error unmarshalling bencode: %s", err)
941 msgType, ok := d["msg_type"]
943 return errors.New("missing msg_type field")
947 case pp.DataMetadataExtensionMsgType:
948 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
949 if !c.requestedMetadataPiece(piece) {
950 return fmt.Errorf("got unexpected piece %d", piece)
952 c.metadataRequests[piece] = false
953 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
954 if begin < 0 || begin >= len(payload) {
955 return fmt.Errorf("data has bad offset in payload: %d", begin)
957 t.saveMetadataPiece(piece, payload[begin:])
958 c.lastUsefulChunkReceived = time.Now()
959 return t.maybeCompleteMetadata()
960 case pp.RequestMetadataExtensionMsgType:
961 if !t.haveMetadataPiece(piece) {
962 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
965 start := (1 << 14) * piece
966 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
968 case pp.RejectMetadataExtensionMsgType:
971 return errors.New("unknown msg_type value")
975 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
979 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
982 if _, ok := cl.ipBlockRange(ip); ok {
985 if _, ok := cl.badPeerIPs[ip.String()]; ok {
991 // Return a Torrent ready for insertion into a Client.
992 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
993 // use provided storage, if provided
994 storageClient := cl.defaultStorage
995 if specStorage != nil {
996 storageClient = storage.NewClient(specStorage)
1002 peers: prioritizedPeers{
1004 getPrio: func(p Peer) peerPriority {
1005 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1008 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1010 halfOpen: make(map[string]Peer),
1011 pieceStateChanges: pubsub.NewPubSub(),
1013 storageOpener: storageClient,
1014 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1016 networkingEnabled: true,
1018 metadataChanged: sync.Cond{
1021 duplicateRequestTimeout: 1 * time.Second,
1023 t.logger = cl.logger.Clone().AddValue(t)
1024 t.setChunkSize(defaultChunkSize)
1028 // A file-like handle to some torrent data resource.
1029 type Handle interface {
1036 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1037 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1040 // Adds a torrent by InfoHash with a custom Storage implementation.
1041 // If the torrent already exists then this Storage is ignored and the
1042 // existing torrent returned with `new` set to `false`
1043 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1046 t, ok := cl.torrents[infoHash]
1052 t = cl.newTorrent(infoHash, specStorage)
1053 cl.eachDhtServer(func(s *dht.Server) {
1054 go t.dhtAnnouncer(s)
1056 cl.torrents[infoHash] = t
1057 cl.clearAcceptLimits()
1058 t.updateWantPeersEvent()
1059 // Tickle Client.waitAccept, new torrent may want conns.
1060 cl.event.Broadcast()
1064 // Add or merge a torrent spec. If the torrent is already present, the
1065 // trackers will be merged with the existing ones. If the Info isn't yet
1066 // known, it will be set. The display name is replaced if the new spec
1067 // provides one. Returns new if the torrent wasn't already in the client.
1068 // Note that any `Storage` defined on the spec will be ignored if the
1069 // torrent is already present (i.e. `new` return value is `true`)
1070 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1071 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1072 if spec.DisplayName != "" {
1073 t.SetDisplayName(spec.DisplayName)
1075 if spec.InfoBytes != nil {
1076 err = t.SetInfoBytes(spec.InfoBytes)
1083 if spec.ChunkSize != 0 {
1084 t.setChunkSize(pp.Integer(spec.ChunkSize))
1086 t.addTrackers(spec.Trackers)
1091 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1092 t, ok := cl.torrents[infoHash]
1094 err = fmt.Errorf("no such torrent")
1101 delete(cl.torrents, infoHash)
1105 func (cl *Client) allTorrentsCompleted() bool {
1106 for _, t := range cl.torrents {
1110 if !t.haveAllPieces() {
1117 // Returns true when all torrents are completely downloaded and false if the
1118 // client is stopped before that.
1119 func (cl *Client) WaitAll() bool {
1122 for !cl.allTorrentsCompleted() {
1123 if cl.closed.IsSet() {
1131 // Returns handles to all the torrents loaded in the Client.
1132 func (cl *Client) Torrents() []*Torrent {
1135 return cl.torrentsAsSlice()
1138 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1139 for _, t := range cl.torrents {
1140 ret = append(ret, t)
1145 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1146 spec, err := TorrentSpecFromMagnetURI(uri)
1150 T, _, err = cl.AddTorrentSpec(spec)
1154 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1155 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1157 slices.MakeInto(&ss, mi.Nodes)
1162 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1163 mi, err := metainfo.LoadFromFile(filename)
1167 return cl.AddTorrent(mi)
1170 func (cl *Client) DhtServers() []*dht.Server {
1171 return cl.dhtServers
1174 func (cl *Client) AddDHTNodes(nodes []string) {
1175 for _, n := range nodes {
1176 hmp := missinggo.SplitHostMaybePort(n)
1177 ip := net.ParseIP(hmp.Host)
1179 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1182 ni := krpc.NodeInfo{
1183 Addr: krpc.NodeAddr{
1188 cl.eachDhtServer(func(s *dht.Server) {
1194 func (cl *Client) banPeerIP(ip net.IP) {
1195 if cl.badPeerIPs == nil {
1196 cl.badPeerIPs = make(map[string]struct{})
1198 cl.badPeerIPs[ip.String()] = struct{}{}
1201 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1207 PeerMaxRequests: 250,
1208 writeBuffer: new(bytes.Buffer),
1209 remoteAddr: remoteAddr,
1212 c.writerCond.L = cl.locker()
1213 c.setRW(connStatsReadWriter{nc, c})
1214 c.r = &rateLimitedReader{
1215 l: cl.config.DownloadRateLimiter,
1221 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1231 Source: peerSourceDHTAnnouncePeer,
1235 func firstNotNil(ips ...net.IP) net.IP {
1236 for _, ip := range ips {
1244 func (cl *Client) eachListener(f func(socket) bool) {
1245 for _, s := range cl.conns {
1252 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1253 cl.eachListener(func(l socket) bool {
1260 func (cl *Client) publicIp(peer net.IP) net.IP {
1261 // TODO: Use BEP 10 to determine how peers are seeing us.
1262 if peer.To4() != nil {
1264 cl.config.PublicIp4,
1265 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1269 cl.config.PublicIp6,
1270 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1275 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1276 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1277 return f(missinggo.AddrIP(l.Addr()))
1281 // Our IP as a peer should see it.
1282 func (cl *Client) publicAddr(peer net.IP) IpPort {
1283 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1286 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1289 cl.eachListener(func(l socket) bool {
1290 ret = append(ret, l.Addr())
1296 func (cl *Client) onBadAccept(addr IpPort) {
1297 ip := maskIpForAcceptLimiting(addr.IP)
1298 if cl.acceptLimiter == nil {
1299 cl.acceptLimiter = make(map[ipStr]int)
1301 cl.acceptLimiter[ipStr(ip.String())]++
1304 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1305 if ip4 := ip.To4(); ip4 != nil {
1306 return ip4.Mask(net.CIDRMask(24, 32))
1311 func (cl *Client) clearAcceptLimits() {
1312 cl.acceptLimiter = nil
1315 func (cl *Client) acceptLimitClearer() {
1318 case <-cl.closed.LockedChan(cl.locker()):
1320 case <-time.After(15 * time.Minute):
1322 cl.clearAcceptLimits()
1328 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1329 if cl.config.DisableAcceptRateLimiting {
1332 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1335 func (cl *Client) rLock() {
1339 func (cl *Client) rUnlock() {
1343 func (cl *Client) lock() {
1347 func (cl *Client) unlock() {
1351 func (cl *Client) locker() sync.Locker {
1352 return clientLocker{cl}
1355 type clientLocker struct {
1359 func (cl clientLocker) Lock() {
1363 func (cl clientLocker) Unlock() {