19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
47 // An aggregate of stats over all connections. First in struct to ensure
48 // 64-bit alignment of fields. See #262.
53 closed missinggo.Event
59 defaultStorage *storage.Client
62 dhtServers []*dht.Server
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, &cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = log.Debug
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
162 return !m.HasValue(debugLogValue)
165 func (cl *Client) initLogger() {
166 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
187 dialRateLimiter: rate.NewLimiter(10, 10),
189 go cl.acceptLimitClearer()
197 cl.extensionBytes = defaultPeerExtensionBytes()
198 cl.event.L = cl.locker()
199 storageImpl := cfg.DefaultStorage
200 if storageImpl == nil {
201 // We'd use mmap but HFS+ doesn't support sparse files.
202 storageImpl = storage.NewFile(cfg.DataDir)
203 cl.onClose = append(cl.onClose, func() {
204 if err := storageImpl.Close(); err != nil {
205 cl.logger.Printf("error closing default storage: %s", err)
209 cl.defaultStorage = storage.NewClient(storageImpl)
210 if cfg.IPBlocklist != nil {
211 cl.ipBlockList = cfg.IPBlocklist
214 if cfg.PeerID != "" {
215 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
217 o := copy(cl.peerID[:], cfg.Bep20)
218 _, err = rand.Read(cl.peerID[o:])
220 panic("error generating peer id")
224 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
230 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
237 for _, s := range cl.conns {
238 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239 go cl.acceptConnections(s)
245 for _, s := range cl.conns {
246 if pc, ok := s.(net.PacketConn); ok {
247 ds, err := cl.newDhtServer(pc)
251 cl.dhtServers = append(cl.dhtServers, ds)
259 func (cl *Client) firewallCallback(net.Addr) bool {
261 block := !cl.wantConns()
264 torrent.Add("connections firewalled", 1)
266 torrent.Add("connections not firewalled", 1)
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272 for _, n := range allPeerNetworks {
273 if peerNetworkEnabled(n, cl.config) {
280 func (cl *Client) listenOnNetwork(n network) bool {
281 if n.Ipv4 && cl.config.DisableIPv4 {
284 if n.Ipv6 && cl.config.DisableIPv6 {
287 if n.Tcp && cl.config.DisableTCP {
290 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
296 func (cl *Client) listenNetworks() (ns []network) {
297 for _, n := range allPeerNetworks {
298 if cl.listenOnNetwork(n) {
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306 cfg := dht.ServerConfig{
307 IPBlocklist: cl.ipBlockList,
309 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310 PublicIP: func() net.IP {
311 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312 return cl.config.PublicIp6
314 return cl.config.PublicIp4
316 StartingNodes: cl.config.DhtStartingNodes,
317 ConnectionTracking: cl.config.ConnTracker,
318 OnQuery: cl.config.DHTOnQuery,
320 s, err = dht.NewServer(&cfg)
323 ts, err := s.Bootstrap()
325 cl.logger.Printf("error bootstrapping dht: %s", err)
327 log.Fstr("%v: completed bootstrap", s).AddValues(s, ts).Log(cl.logger)
333 func (cl *Client) Closed() <-chan struct{} {
339 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
340 for _, ds := range cl.dhtServers {
345 func (cl *Client) closeSockets() {
346 cl.eachListener(func(l socket) bool {
353 // Stops the client. All connections to peers are closed and all activity will
355 func (cl *Client) Close() {
359 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
361 for _, t := range cl.torrents {
364 for _, f := range cl.onClose {
370 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
371 if cl.ipBlockList == nil {
374 return cl.ipBlockList.Lookup(ip)
377 func (cl *Client) ipIsBlocked(ip net.IP) bool {
378 _, blocked := cl.ipBlockRange(ip)
382 func (cl *Client) wantConns() bool {
383 for _, t := range cl.torrents {
391 func (cl *Client) waitAccept() {
393 if cl.closed.IsSet() {
403 func (cl *Client) rejectAccepted(conn net.Conn) bool {
404 ra := conn.RemoteAddr()
405 rip := missinggo.AddrIP(ra)
406 if cl.config.DisableIPv4Peers && rip.To4() != nil {
409 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
412 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
415 if cl.rateLimitAccept(rip) {
418 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
421 func (cl *Client) acceptConnections(l net.Listener) {
423 conn, err := l.Accept()
424 torrent.Add("client listener accepts", 1)
425 conn = pproffd.WrapNetConn(conn)
427 closed := cl.closed.IsSet()
430 reject = cl.rejectAccepted(conn)
440 cl.logger.Printf("error accepting connection: %s", err)
445 torrent.Add("rejected accepted connections", 1)
448 go cl.incomingConnection(conn)
450 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
451 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
452 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
453 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
458 func (cl *Client) incomingConnection(nc net.Conn) {
460 if tc, ok := nc.(*net.TCPConn); ok {
463 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
464 c.Discovery = peerSourceIncoming
465 cl.runReceivedConn(c)
468 // Returns a handle to the given torrent, if it's present in the client.
469 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
472 t, ok = cl.torrents[ih]
476 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
477 return cl.torrents[ih]
480 type dialResult struct {
485 func countDialResult(err error) {
487 torrent.Add("successful dials", 1)
489 torrent.Add("unsuccessful dials", 1)
493 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
494 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
495 if ret < minDialTimeout {
501 // Returns whether an address is known to connect to a client with our own ID.
502 func (cl *Client) dopplegangerAddr(addr string) bool {
503 _, ok := cl.dopplegangerAddrs[addr]
507 // Returns a connection over UTP or TCP, whichever is first to connect.
508 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
510 t := perf.NewTimer(perf.CallerName(0))
513 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
515 t.Mark("returned conn over " + res.Network)
519 ctx, cancel := context.WithCancel(ctx)
520 // As soon as we return one connection, cancel the others.
523 resCh := make(chan dialResult, left)
527 cl.eachListener(func(s socket) bool {
529 network := s.Addr().Network()
530 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
534 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
537 cl.dialFromSocket(ctx, s, addr),
545 // Wait for a successful connection.
547 defer perf.ScopeTimer()()
548 for ; left > 0 && res.Conn == nil; left-- {
552 // There are still incompleted dials.
554 for ; left > 0; left-- {
555 conn := (<-resCh).Conn
562 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
564 //if res.Conn != nil {
565 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
567 // cl.logger.Printf("failed to dial %s", addr)
572 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
573 network := s.Addr().Network()
574 cte := cl.config.ConnTracker.Wait(
576 conntrack.Entry{network, s.Addr().String(), addr},
577 "dial torrent client",
580 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
581 // which dial errors allow us to forget the connection tracking entry handle.
582 if ctx.Err() != nil {
588 c, err := s.dial(ctx, addr)
589 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
590 // it now in case we close the connection forthwith.
591 if tc, ok := c.(*net.TCPConn); ok {
596 if err != nil && forgettableDialError(err) {
603 return closeWrapper{c, func() error {
610 func forgettableDialError(err error) bool {
611 return strings.Contains(err.Error(), "no suitable address found")
614 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
615 if _, ok := t.halfOpen[addr]; !ok {
616 panic("invariant broken")
618 delete(t.halfOpen, addr)
622 // Performs initiator handshakes and returns a connection. Returns nil
623 // *connection if no connection for valid reasons.
624 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
625 c = cl.newConnection(nc, true, remoteAddr, network)
626 c.headerEncrypted = encryptHeader
627 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
629 dl, ok := ctx.Deadline()
633 err = nc.SetDeadline(dl)
637 err = cl.initiateHandshakes(c, t)
641 // Returns nil connection and nil error if no connection could be established
642 // for valid reasons.
643 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
644 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
647 return t.dialTimeout()
650 dr := cl.dialFirst(dialCtx, addr.String())
653 if dialCtx.Err() != nil {
654 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
656 return nil, errors.New("dial failed")
658 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
665 // Returns nil connection and nil error if no connection could be established
666 // for valid reasons.
667 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
668 torrent.Add("establish outgoing connection", 1)
669 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
670 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
672 torrent.Add("initiated conn with preferred header obfuscation", 1)
675 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
676 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
677 // We should have just tried with the preferred header obfuscation. If it was required,
678 // there's nothing else to try.
681 // Try again with encryption if we didn't earlier, or without if we did.
682 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
684 torrent.Add("initiated conn with fallback header obfuscation", 1)
686 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
690 // Called to dial out and run a connection. The addr we're given is already
691 // considered half-open.
692 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
693 cl.dialRateLimiter.Wait(context.Background())
694 c, err := cl.establishOutgoingConn(t, addr)
697 // Don't release lock between here and addConnection, unless it's for
699 cl.noLongerHalfOpen(t, addr.String())
702 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
708 cl.runHandshookConn(c, t)
711 // The port number for incoming peer connections. 0 if the client isn't
713 func (cl *Client) incomingPeerPort() int {
714 return cl.LocalPort()
717 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
718 if c.headerEncrypted {
721 rw, c.cryptoMethod, err = mse.InitiateHandshake(
728 cl.config.CryptoProvides,
732 return xerrors.Errorf("header obfuscation handshake: %w", err)
735 ih, err := cl.connBtHandshake(c, &t.infoHash)
737 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
739 if ih != t.infoHash {
740 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
745 // Calls f with any secret keys.
746 func (cl *Client) forSkeys(f func([]byte) bool) {
749 if false { // Emulate the bug from #114
751 for ih := range cl.torrents {
755 for range cl.torrents {
762 for ih := range cl.torrents {
769 // Do encryption and bittorrent handshakes as receiver.
770 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
771 defer perf.ScopeTimerErr(&err)()
773 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
775 if err == nil || err == mse.ErrNoSecretKeyMatch {
776 if c.headerEncrypted {
777 torrent.Add("handshakes received encrypted", 1)
779 torrent.Add("handshakes received unencrypted", 1)
782 torrent.Add("handshakes received with error while handling encryption", 1)
785 if err == mse.ErrNoSecretKeyMatch {
790 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
791 err = errors.New("connection not have required header obfuscation")
794 ih, err := cl.connBtHandshake(c, nil)
796 err = xerrors.Errorf("during bt handshake: %w", err)
805 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
806 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
811 c.PeerExtensionBytes = res.PeerExtensionBits
812 c.PeerID = res.PeerID
813 c.completedHandshake = time.Now()
817 func (cl *Client) runReceivedConn(c *connection) {
818 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
822 t, err := cl.receiveHandshakes(c)
825 "error receiving handshakes: %s", err,
829 "network", c.network,
831 torrent.Add("error receiving handshake", 1)
833 cl.onBadAccept(c.remoteAddr)
838 torrent.Add("received handshake for unloaded torrent", 1)
840 cl.onBadAccept(c.remoteAddr)
844 torrent.Add("received handshake for loaded torrent", 1)
847 cl.runHandshookConn(c, t)
850 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
852 if c.PeerID == cl.peerID {
855 addr := c.conn.RemoteAddr().String()
856 cl.dopplegangerAddrs[addr] = struct{}{}
858 // Because the remote address is not necessarily the same as its
859 // client's torrent listen address, we won't record the remote address
860 // as a doppleganger. Instead, the initiator can record *us* as the
865 c.conn.SetWriteDeadline(time.Time{})
866 c.r = deadlineReader{c.conn, c.r}
867 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
868 if connIsIpv6(c.conn) {
869 torrent.Add("completed handshake over ipv6", 1)
871 if err := t.addConnection(c); err != nil {
872 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
875 defer t.dropConnection(c)
876 go c.writer(time.Minute)
877 cl.sendInitialMessages(c, t)
878 err := c.mainReadLoop()
879 if err != nil && cl.config.Debug {
880 cl.logger.Printf("error during connection main read loop: %s", err)
884 // See the order given in Transmission's tr_peerMsgsNew.
885 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
886 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
887 conn.Post(pp.Message{
889 ExtendedID: pp.HandshakeExtendedID,
890 ExtendedPayload: func() []byte {
891 msg := pp.ExtendedHandshakeMessage{
892 M: map[pp.ExtensionName]pp.ExtensionNumber{
893 pp.ExtensionNameMetadata: metadataExtendedId,
895 V: cl.config.ExtendedHandshakeClientVersion,
896 Reqq: 64, // TODO: Really?
897 YourIp: pp.CompactIp(conn.remoteAddr.IP),
898 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
899 Port: cl.incomingPeerPort(),
900 MetadataSize: torrent.metadataSize(),
901 // TODO: We can figured these out specific to the socket
903 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
904 Ipv6: cl.config.PublicIp6.To16(),
906 if !cl.config.DisablePEX {
907 msg.M[pp.ExtensionNamePex] = pexExtendedId
909 return bencode.MustMarshal(msg)
914 if conn.fastEnabled() {
915 if torrent.haveAllPieces() {
916 conn.Post(pp.Message{Type: pp.HaveAll})
917 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
919 } else if !torrent.haveAnyPieces() {
920 conn.Post(pp.Message{Type: pp.HaveNone})
921 conn.sentHaves.Clear()
927 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
928 conn.Post(pp.Message{
935 func (cl *Client) dhtPort() (ret uint16) {
936 cl.eachDhtServer(func(s *dht.Server) {
937 ret = uint16(missinggo.AddrPort(s.Addr()))
942 func (cl *Client) haveDhtServer() (ret bool) {
943 cl.eachDhtServer(func(_ *dht.Server) {
949 // Process incoming ut_metadata message.
950 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
952 err := bencode.Unmarshal(payload, &d)
953 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
954 } else if err != nil {
955 return fmt.Errorf("error unmarshalling bencode: %s", err)
957 msgType, ok := d["msg_type"]
959 return errors.New("missing msg_type field")
963 case pp.DataMetadataExtensionMsgType:
964 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
965 if !c.requestedMetadataPiece(piece) {
966 return fmt.Errorf("got unexpected piece %d", piece)
968 c.metadataRequests[piece] = false
969 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
970 if begin < 0 || begin >= len(payload) {
971 return fmt.Errorf("data has bad offset in payload: %d", begin)
973 t.saveMetadataPiece(piece, payload[begin:])
974 c.lastUsefulChunkReceived = time.Now()
975 return t.maybeCompleteMetadata()
976 case pp.RequestMetadataExtensionMsgType:
977 if !t.haveMetadataPiece(piece) {
978 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
981 start := (1 << 14) * piece
982 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
984 case pp.RejectMetadataExtensionMsgType:
987 return errors.New("unknown msg_type value")
991 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
995 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
998 if _, ok := cl.ipBlockRange(ip); ok {
1001 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1007 // Return a Torrent ready for insertion into a Client.
1008 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1009 // use provided storage, if provided
1010 storageClient := cl.defaultStorage
1011 if specStorage != nil {
1012 storageClient = storage.NewClient(specStorage)
1018 peers: prioritizedPeers{
1020 getPrio: func(p Peer) peerPriority {
1021 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1024 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1026 halfOpen: make(map[string]Peer),
1027 pieceStateChanges: pubsub.NewPubSub(),
1029 storageOpener: storageClient,
1030 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1032 networkingEnabled: true,
1034 metadataChanged: sync.Cond{
1037 duplicateRequestTimeout: 1 * time.Second,
1039 t.logger = cl.logger.WithValues(t)
1040 t.setChunkSize(defaultChunkSize)
1044 // A file-like handle to some torrent data resource.
1045 type Handle interface {
1052 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1053 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1056 // Adds a torrent by InfoHash with a custom Storage implementation.
1057 // If the torrent already exists then this Storage is ignored and the
1058 // existing torrent returned with `new` set to `false`
1059 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1062 t, ok := cl.torrents[infoHash]
1068 t = cl.newTorrent(infoHash, specStorage)
1069 cl.eachDhtServer(func(s *dht.Server) {
1070 go t.dhtAnnouncer(s)
1072 cl.torrents[infoHash] = t
1073 cl.clearAcceptLimits()
1074 t.updateWantPeersEvent()
1075 // Tickle Client.waitAccept, new torrent may want conns.
1076 cl.event.Broadcast()
1080 // Add or merge a torrent spec. If the torrent is already present, the
1081 // trackers will be merged with the existing ones. If the Info isn't yet
1082 // known, it will be set. The display name is replaced if the new spec
1083 // provides one. Returns new if the torrent wasn't already in the client.
1084 // Note that any `Storage` defined on the spec will be ignored if the
1085 // torrent is already present (i.e. `new` return value is `true`)
1086 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1087 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1088 if spec.DisplayName != "" {
1089 t.SetDisplayName(spec.DisplayName)
1091 if spec.InfoBytes != nil {
1092 err = t.SetInfoBytes(spec.InfoBytes)
1099 if spec.ChunkSize != 0 {
1100 t.setChunkSize(pp.Integer(spec.ChunkSize))
1102 t.addTrackers(spec.Trackers)
1107 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1108 t, ok := cl.torrents[infoHash]
1110 err = fmt.Errorf("no such torrent")
1117 delete(cl.torrents, infoHash)
1121 func (cl *Client) allTorrentsCompleted() bool {
1122 for _, t := range cl.torrents {
1126 if !t.haveAllPieces() {
1133 // Returns true when all torrents are completely downloaded and false if the
1134 // client is stopped before that.
1135 func (cl *Client) WaitAll() bool {
1138 for !cl.allTorrentsCompleted() {
1139 if cl.closed.IsSet() {
1147 // Returns handles to all the torrents loaded in the Client.
1148 func (cl *Client) Torrents() []*Torrent {
1151 return cl.torrentsAsSlice()
1154 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1155 for _, t := range cl.torrents {
1156 ret = append(ret, t)
1161 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1162 spec, err := TorrentSpecFromMagnetURI(uri)
1166 T, _, err = cl.AddTorrentSpec(spec)
1170 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1171 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1173 slices.MakeInto(&ss, mi.Nodes)
1178 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1179 mi, err := metainfo.LoadFromFile(filename)
1183 return cl.AddTorrent(mi)
1186 func (cl *Client) DhtServers() []*dht.Server {
1187 return cl.dhtServers
1190 func (cl *Client) AddDHTNodes(nodes []string) {
1191 for _, n := range nodes {
1192 hmp := missinggo.SplitHostMaybePort(n)
1193 ip := net.ParseIP(hmp.Host)
1195 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1198 ni := krpc.NodeInfo{
1199 Addr: krpc.NodeAddr{
1204 cl.eachDhtServer(func(s *dht.Server) {
1210 func (cl *Client) banPeerIP(ip net.IP) {
1211 if cl.badPeerIPs == nil {
1212 cl.badPeerIPs = make(map[string]struct{})
1214 cl.badPeerIPs[ip.String()] = struct{}{}
1217 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1223 PeerMaxRequests: 250,
1224 writeBuffer: new(bytes.Buffer),
1225 remoteAddr: remoteAddr,
1228 c.writerCond.L = cl.locker()
1229 c.setRW(connStatsReadWriter{nc, c})
1230 c.r = &rateLimitedReader{
1231 l: cl.config.DownloadRateLimiter,
1237 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1247 Source: peerSourceDHTAnnouncePeer,
1251 func firstNotNil(ips ...net.IP) net.IP {
1252 for _, ip := range ips {
1260 func (cl *Client) eachListener(f func(socket) bool) {
1261 for _, s := range cl.conns {
1268 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1269 cl.eachListener(func(l socket) bool {
1276 func (cl *Client) publicIp(peer net.IP) net.IP {
1277 // TODO: Use BEP 10 to determine how peers are seeing us.
1278 if peer.To4() != nil {
1280 cl.config.PublicIp4,
1281 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1285 cl.config.PublicIp6,
1286 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1291 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1292 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1293 return f(missinggo.AddrIP(l.Addr()))
1297 // Our IP as a peer should see it.
1298 func (cl *Client) publicAddr(peer net.IP) IpPort {
1299 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1302 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1305 cl.eachListener(func(l socket) bool {
1306 ret = append(ret, l.Addr())
1312 func (cl *Client) onBadAccept(addr IpPort) {
1313 ip := maskIpForAcceptLimiting(addr.IP)
1314 if cl.acceptLimiter == nil {
1315 cl.acceptLimiter = make(map[ipStr]int)
1317 cl.acceptLimiter[ipStr(ip.String())]++
1320 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1321 if ip4 := ip.To4(); ip4 != nil {
1322 return ip4.Mask(net.CIDRMask(24, 32))
1327 func (cl *Client) clearAcceptLimits() {
1328 cl.acceptLimiter = nil
1331 func (cl *Client) acceptLimitClearer() {
1334 case <-cl.closed.LockedChan(cl.locker()):
1336 case <-time.After(15 * time.Minute):
1338 cl.clearAcceptLimits()
1344 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1345 if cl.config.DisableAcceptRateLimiting {
1348 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1351 func (cl *Client) rLock() {
1355 func (cl *Client) rUnlock() {
1359 func (cl *Client) lock() {
1363 func (cl *Client) unlock() {
1367 func (cl *Client) locker() sync.Locker {
1368 return clientLocker{cl}
1371 type clientLocker struct {
1375 func (cl clientLocker) Lock() {
1379 func (cl clientLocker) Unlock() {