19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
47 // An aggregate of stats over all connections. First in struct to ensure
48 // 64-bit alignment of fields. See #262.
53 closed missinggo.Event
59 defaultStorage *storage.Client
62 dhtServers []*dht.Server
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, &cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = log.Debug
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
162 return !m.HasValue(debugLogValue)
165 func (cl *Client) initLogger() {
166 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
187 dialRateLimiter: rate.NewLimiter(10, 10),
189 go cl.acceptLimitClearer()
197 cl.extensionBytes = defaultPeerExtensionBytes()
198 cl.event.L = cl.locker()
199 storageImpl := cfg.DefaultStorage
200 if storageImpl == nil {
201 // We'd use mmap but HFS+ doesn't support sparse files.
202 storageImpl = storage.NewFile(cfg.DataDir)
203 cl.onClose = append(cl.onClose, func() {
204 if err := storageImpl.Close(); err != nil {
205 cl.logger.Printf("error closing default storage: %s", err)
209 cl.defaultStorage = storage.NewClient(storageImpl)
210 if cfg.IPBlocklist != nil {
211 cl.ipBlockList = cfg.IPBlocklist
214 if cfg.PeerID != "" {
215 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
217 o := copy(cl.peerID[:], cfg.Bep20)
218 _, err = rand.Read(cl.peerID[o:])
220 panic("error generating peer id")
224 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
230 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
237 for _, s := range cl.conns {
238 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239 go cl.acceptConnections(s)
245 for _, s := range cl.conns {
246 if pc, ok := s.(net.PacketConn); ok {
247 ds, err := cl.newDhtServer(pc)
251 cl.dhtServers = append(cl.dhtServers, ds)
259 func (cl *Client) firewallCallback(net.Addr) bool {
261 block := !cl.wantConns()
264 torrent.Add("connections firewalled", 1)
266 torrent.Add("connections not firewalled", 1)
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272 for _, n := range allPeerNetworks {
273 if peerNetworkEnabled(n, cl.config) {
280 func (cl *Client) listenOnNetwork(n network) bool {
281 if n.Ipv4 && cl.config.DisableIPv4 {
284 if n.Ipv6 && cl.config.DisableIPv6 {
287 if n.Tcp && cl.config.DisableTCP {
290 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
296 func (cl *Client) listenNetworks() (ns []network) {
297 for _, n := range allPeerNetworks {
298 if cl.listenOnNetwork(n) {
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306 cfg := dht.ServerConfig{
307 IPBlocklist: cl.ipBlockList,
309 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310 PublicIP: func() net.IP {
311 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312 return cl.config.PublicIp6
314 return cl.config.PublicIp4
316 StartingNodes: cl.config.DhtStartingNodes,
317 ConnectionTracking: cl.config.ConnTracker,
318 OnQuery: cl.config.DHTOnQuery,
319 Logger: cl.logger.WithValues("dht", conn.LocalAddr().String()),
321 s, err = dht.NewServer(&cfg)
324 ts, err := s.Bootstrap()
326 cl.logger.Printf("error bootstrapping dht: %s", err)
328 log.Fstr("%v: completed bootstrap", s).AddValues(s, ts).Log(cl.logger)
334 func (cl *Client) Closed() <-chan struct{} {
340 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
341 for _, ds := range cl.dhtServers {
346 func (cl *Client) closeSockets() {
347 cl.eachListener(func(l socket) bool {
354 // Stops the client. All connections to peers are closed and all activity will
356 func (cl *Client) Close() {
360 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
362 for _, t := range cl.torrents {
365 for _, f := range cl.onClose {
371 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
372 if cl.ipBlockList == nil {
375 return cl.ipBlockList.Lookup(ip)
378 func (cl *Client) ipIsBlocked(ip net.IP) bool {
379 _, blocked := cl.ipBlockRange(ip)
383 func (cl *Client) wantConns() bool {
384 for _, t := range cl.torrents {
392 func (cl *Client) waitAccept() {
394 if cl.closed.IsSet() {
404 func (cl *Client) rejectAccepted(conn net.Conn) bool {
405 ra := conn.RemoteAddr()
406 rip := missinggo.AddrIP(ra)
407 if cl.config.DisableIPv4Peers && rip.To4() != nil {
410 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
413 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
416 if cl.rateLimitAccept(rip) {
419 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
422 func (cl *Client) acceptConnections(l net.Listener) {
424 conn, err := l.Accept()
425 torrent.Add("client listener accepts", 1)
426 conn = pproffd.WrapNetConn(conn)
428 closed := cl.closed.IsSet()
431 reject = cl.rejectAccepted(conn)
441 cl.logger.Printf("error accepting connection: %s", err)
446 torrent.Add("rejected accepted connections", 1)
449 go cl.incomingConnection(conn)
451 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
452 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
453 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
454 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
459 func (cl *Client) incomingConnection(nc net.Conn) {
461 if tc, ok := nc.(*net.TCPConn); ok {
464 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
465 c.Discovery = peerSourceIncoming
466 cl.runReceivedConn(c)
469 // Returns a handle to the given torrent, if it's present in the client.
470 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
473 t, ok = cl.torrents[ih]
477 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
478 return cl.torrents[ih]
481 type dialResult struct {
486 func countDialResult(err error) {
488 torrent.Add("successful dials", 1)
490 torrent.Add("unsuccessful dials", 1)
494 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
495 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
496 if ret < minDialTimeout {
502 // Returns whether an address is known to connect to a client with our own ID.
503 func (cl *Client) dopplegangerAddr(addr string) bool {
504 _, ok := cl.dopplegangerAddrs[addr]
508 // Returns a connection over UTP or TCP, whichever is first to connect.
509 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
511 t := perf.NewTimer(perf.CallerName(0))
514 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
516 t.Mark("returned conn over " + res.Network)
520 ctx, cancel := context.WithCancel(ctx)
521 // As soon as we return one connection, cancel the others.
524 resCh := make(chan dialResult, left)
528 cl.eachListener(func(s socket) bool {
530 network := s.Addr().Network()
531 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
535 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
538 cl.dialFromSocket(ctx, s, addr),
546 // Wait for a successful connection.
548 defer perf.ScopeTimer()()
549 for ; left > 0 && res.Conn == nil; left-- {
553 // There are still incompleted dials.
555 for ; left > 0; left-- {
556 conn := (<-resCh).Conn
563 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
565 //if res.Conn != nil {
566 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
568 // cl.logger.Printf("failed to dial %s", addr)
573 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
574 network := s.Addr().Network()
575 cte := cl.config.ConnTracker.Wait(
577 conntrack.Entry{network, s.Addr().String(), addr},
578 "dial torrent client",
581 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
582 // which dial errors allow us to forget the connection tracking entry handle.
583 if ctx.Err() != nil {
589 c, err := s.dial(ctx, addr)
590 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
591 // it now in case we close the connection forthwith.
592 if tc, ok := c.(*net.TCPConn); ok {
597 if err != nil && forgettableDialError(err) {
604 return closeWrapper{c, func() error {
611 func forgettableDialError(err error) bool {
612 return strings.Contains(err.Error(), "no suitable address found")
615 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
616 if _, ok := t.halfOpen[addr]; !ok {
617 panic("invariant broken")
619 delete(t.halfOpen, addr)
623 // Performs initiator handshakes and returns a connection. Returns nil
624 // *connection if no connection for valid reasons.
625 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
626 c = cl.newConnection(nc, true, remoteAddr, network)
627 c.headerEncrypted = encryptHeader
628 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
630 dl, ok := ctx.Deadline()
634 err = nc.SetDeadline(dl)
638 err = cl.initiateHandshakes(c, t)
642 // Returns nil connection and nil error if no connection could be established
643 // for valid reasons.
644 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
645 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
648 return t.dialTimeout()
651 dr := cl.dialFirst(dialCtx, addr.String())
654 if dialCtx.Err() != nil {
655 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
657 return nil, errors.New("dial failed")
659 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
666 // Returns nil connection and nil error if no connection could be established
667 // for valid reasons.
668 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
669 torrent.Add("establish outgoing connection", 1)
670 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
671 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
673 torrent.Add("initiated conn with preferred header obfuscation", 1)
676 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
677 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
678 // We should have just tried with the preferred header obfuscation. If it was required,
679 // there's nothing else to try.
682 // Try again with encryption if we didn't earlier, or without if we did.
683 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
685 torrent.Add("initiated conn with fallback header obfuscation", 1)
687 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
691 // Called to dial out and run a connection. The addr we're given is already
692 // considered half-open.
693 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
694 cl.dialRateLimiter.Wait(context.Background())
695 c, err := cl.establishOutgoingConn(t, addr)
698 // Don't release lock between here and addConnection, unless it's for
700 cl.noLongerHalfOpen(t, addr.String())
703 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
709 cl.runHandshookConn(c, t)
712 // The port number for incoming peer connections. 0 if the client isn't
714 func (cl *Client) incomingPeerPort() int {
715 return cl.LocalPort()
718 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
719 if c.headerEncrypted {
722 rw, c.cryptoMethod, err = mse.InitiateHandshake(
729 cl.config.CryptoProvides,
733 return xerrors.Errorf("header obfuscation handshake: %w", err)
736 ih, err := cl.connBtHandshake(c, &t.infoHash)
738 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
740 if ih != t.infoHash {
741 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
746 // Calls f with any secret keys.
747 func (cl *Client) forSkeys(f func([]byte) bool) {
750 if false { // Emulate the bug from #114
752 for ih := range cl.torrents {
756 for range cl.torrents {
763 for ih := range cl.torrents {
770 // Do encryption and bittorrent handshakes as receiver.
771 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
772 defer perf.ScopeTimerErr(&err)()
774 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
776 if err == nil || err == mse.ErrNoSecretKeyMatch {
777 if c.headerEncrypted {
778 torrent.Add("handshakes received encrypted", 1)
780 torrent.Add("handshakes received unencrypted", 1)
783 torrent.Add("handshakes received with error while handling encryption", 1)
786 if err == mse.ErrNoSecretKeyMatch {
791 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
792 err = errors.New("connection not have required header obfuscation")
795 ih, err := cl.connBtHandshake(c, nil)
797 err = xerrors.Errorf("during bt handshake: %w", err)
806 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
807 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
812 c.PeerExtensionBytes = res.PeerExtensionBits
813 c.PeerID = res.PeerID
814 c.completedHandshake = time.Now()
818 func (cl *Client) runReceivedConn(c *connection) {
819 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
823 t, err := cl.receiveHandshakes(c)
826 "error receiving handshakes: %s", err,
830 "network", c.network,
832 torrent.Add("error receiving handshake", 1)
834 cl.onBadAccept(c.remoteAddr)
839 torrent.Add("received handshake for unloaded torrent", 1)
841 cl.onBadAccept(c.remoteAddr)
845 torrent.Add("received handshake for loaded torrent", 1)
848 cl.runHandshookConn(c, t)
851 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
853 if c.PeerID == cl.peerID {
856 addr := c.conn.RemoteAddr().String()
857 cl.dopplegangerAddrs[addr] = struct{}{}
859 // Because the remote address is not necessarily the same as its
860 // client's torrent listen address, we won't record the remote address
861 // as a doppleganger. Instead, the initiator can record *us* as the
866 c.conn.SetWriteDeadline(time.Time{})
867 c.r = deadlineReader{c.conn, c.r}
868 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
869 if connIsIpv6(c.conn) {
870 torrent.Add("completed handshake over ipv6", 1)
872 if err := t.addConnection(c); err != nil {
873 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
876 defer t.dropConnection(c)
877 go c.writer(time.Minute)
878 cl.sendInitialMessages(c, t)
879 err := c.mainReadLoop()
880 if err != nil && cl.config.Debug {
881 cl.logger.Printf("error during connection main read loop: %s", err)
885 // See the order given in Transmission's tr_peerMsgsNew.
886 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
887 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
888 conn.Post(pp.Message{
890 ExtendedID: pp.HandshakeExtendedID,
891 ExtendedPayload: func() []byte {
892 msg := pp.ExtendedHandshakeMessage{
893 M: map[pp.ExtensionName]pp.ExtensionNumber{
894 pp.ExtensionNameMetadata: metadataExtendedId,
896 V: cl.config.ExtendedHandshakeClientVersion,
897 Reqq: 64, // TODO: Really?
898 YourIp: pp.CompactIp(conn.remoteAddr.IP),
899 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
900 Port: cl.incomingPeerPort(),
901 MetadataSize: torrent.metadataSize(),
902 // TODO: We can figured these out specific to the socket
904 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
905 Ipv6: cl.config.PublicIp6.To16(),
907 if !cl.config.DisablePEX {
908 msg.M[pp.ExtensionNamePex] = pexExtendedId
910 return bencode.MustMarshal(msg)
915 if conn.fastEnabled() {
916 if torrent.haveAllPieces() {
917 conn.Post(pp.Message{Type: pp.HaveAll})
918 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
920 } else if !torrent.haveAnyPieces() {
921 conn.Post(pp.Message{Type: pp.HaveNone})
922 conn.sentHaves.Clear()
928 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
929 conn.Post(pp.Message{
936 func (cl *Client) dhtPort() (ret uint16) {
937 cl.eachDhtServer(func(s *dht.Server) {
938 ret = uint16(missinggo.AddrPort(s.Addr()))
943 func (cl *Client) haveDhtServer() (ret bool) {
944 cl.eachDhtServer(func(_ *dht.Server) {
950 // Process incoming ut_metadata message.
951 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
953 err := bencode.Unmarshal(payload, &d)
954 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
955 } else if err != nil {
956 return fmt.Errorf("error unmarshalling bencode: %s", err)
958 msgType, ok := d["msg_type"]
960 return errors.New("missing msg_type field")
964 case pp.DataMetadataExtensionMsgType:
965 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
966 if !c.requestedMetadataPiece(piece) {
967 return fmt.Errorf("got unexpected piece %d", piece)
969 c.metadataRequests[piece] = false
970 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
971 if begin < 0 || begin >= len(payload) {
972 return fmt.Errorf("data has bad offset in payload: %d", begin)
974 t.saveMetadataPiece(piece, payload[begin:])
975 c.lastUsefulChunkReceived = time.Now()
976 return t.maybeCompleteMetadata()
977 case pp.RequestMetadataExtensionMsgType:
978 if !t.haveMetadataPiece(piece) {
979 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
982 start := (1 << 14) * piece
983 c.logger.Printf("sending metadata piece %d", piece)
984 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
986 case pp.RejectMetadataExtensionMsgType:
989 return errors.New("unknown msg_type value")
993 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
997 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1000 if _, ok := cl.ipBlockRange(ip); ok {
1003 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1009 // Return a Torrent ready for insertion into a Client.
1010 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1011 // use provided storage, if provided
1012 storageClient := cl.defaultStorage
1013 if specStorage != nil {
1014 storageClient = storage.NewClient(specStorage)
1020 peers: prioritizedPeers{
1022 getPrio: func(p Peer) peerPriority {
1023 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1026 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1028 halfOpen: make(map[string]Peer),
1029 pieceStateChanges: pubsub.NewPubSub(),
1031 storageOpener: storageClient,
1032 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1034 networkingEnabled: true,
1036 metadataChanged: sync.Cond{
1039 duplicateRequestTimeout: 1 * time.Second,
1041 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1042 return fmt.Sprintf("%v: %s", t, m.Text())
1044 t.setChunkSize(defaultChunkSize)
1048 // A file-like handle to some torrent data resource.
1049 type Handle interface {
1056 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1057 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1060 // Adds a torrent by InfoHash with a custom Storage implementation.
1061 // If the torrent already exists then this Storage is ignored and the
1062 // existing torrent returned with `new` set to `false`
1063 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1066 t, ok := cl.torrents[infoHash]
1072 t = cl.newTorrent(infoHash, specStorage)
1073 cl.eachDhtServer(func(s *dht.Server) {
1074 go t.dhtAnnouncer(s)
1076 cl.torrents[infoHash] = t
1077 cl.clearAcceptLimits()
1078 t.updateWantPeersEvent()
1079 // Tickle Client.waitAccept, new torrent may want conns.
1080 cl.event.Broadcast()
1084 // Add or merge a torrent spec. If the torrent is already present, the
1085 // trackers will be merged with the existing ones. If the Info isn't yet
1086 // known, it will be set. The display name is replaced if the new spec
1087 // provides one. Returns new if the torrent wasn't already in the client.
1088 // Note that any `Storage` defined on the spec will be ignored if the
1089 // torrent is already present (i.e. `new` return value is `true`)
1090 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1091 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1092 if spec.DisplayName != "" {
1093 t.SetDisplayName(spec.DisplayName)
1095 if spec.InfoBytes != nil {
1096 err = t.SetInfoBytes(spec.InfoBytes)
1103 if spec.ChunkSize != 0 {
1104 t.setChunkSize(pp.Integer(spec.ChunkSize))
1106 t.addTrackers(spec.Trackers)
1111 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1112 t, ok := cl.torrents[infoHash]
1114 err = fmt.Errorf("no such torrent")
1121 delete(cl.torrents, infoHash)
1125 func (cl *Client) allTorrentsCompleted() bool {
1126 for _, t := range cl.torrents {
1130 if !t.haveAllPieces() {
1137 // Returns true when all torrents are completely downloaded and false if the
1138 // client is stopped before that.
1139 func (cl *Client) WaitAll() bool {
1142 for !cl.allTorrentsCompleted() {
1143 if cl.closed.IsSet() {
1151 // Returns handles to all the torrents loaded in the Client.
1152 func (cl *Client) Torrents() []*Torrent {
1155 return cl.torrentsAsSlice()
1158 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1159 for _, t := range cl.torrents {
1160 ret = append(ret, t)
1165 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1166 spec, err := TorrentSpecFromMagnetURI(uri)
1170 T, _, err = cl.AddTorrentSpec(spec)
1174 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1175 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1177 slices.MakeInto(&ss, mi.Nodes)
1182 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1183 mi, err := metainfo.LoadFromFile(filename)
1187 return cl.AddTorrent(mi)
1190 func (cl *Client) DhtServers() []*dht.Server {
1191 return cl.dhtServers
1194 func (cl *Client) AddDHTNodes(nodes []string) {
1195 for _, n := range nodes {
1196 hmp := missinggo.SplitHostMaybePort(n)
1197 ip := net.ParseIP(hmp.Host)
1199 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1202 ni := krpc.NodeInfo{
1203 Addr: krpc.NodeAddr{
1208 cl.eachDhtServer(func(s *dht.Server) {
1214 func (cl *Client) banPeerIP(ip net.IP) {
1215 if cl.badPeerIPs == nil {
1216 cl.badPeerIPs = make(map[string]struct{})
1218 cl.badPeerIPs[ip.String()] = struct{}{}
1221 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1227 PeerMaxRequests: 250,
1228 writeBuffer: new(bytes.Buffer),
1229 remoteAddr: remoteAddr,
1232 c.logger = cl.logger.WithValues(c,
1233 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1234 ).WithText(func(m log.Msg) string {
1235 return fmt.Sprintf("%v: %s", c, m.Text())
1237 c.writerCond.L = cl.locker()
1238 c.setRW(connStatsReadWriter{nc, c})
1239 c.r = &rateLimitedReader{
1240 l: cl.config.DownloadRateLimiter,
1243 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1247 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1257 Source: peerSourceDhtAnnouncePeer,
1261 func firstNotNil(ips ...net.IP) net.IP {
1262 for _, ip := range ips {
1270 func (cl *Client) eachListener(f func(socket) bool) {
1271 for _, s := range cl.conns {
1278 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1279 cl.eachListener(func(l socket) bool {
1286 func (cl *Client) publicIp(peer net.IP) net.IP {
1287 // TODO: Use BEP 10 to determine how peers are seeing us.
1288 if peer.To4() != nil {
1290 cl.config.PublicIp4,
1291 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1295 cl.config.PublicIp6,
1296 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1301 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1302 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1303 return f(missinggo.AddrIP(l.Addr()))
1307 // Our IP as a peer should see it.
1308 func (cl *Client) publicAddr(peer net.IP) IpPort {
1309 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1312 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1315 cl.eachListener(func(l socket) bool {
1316 ret = append(ret, l.Addr())
1322 func (cl *Client) onBadAccept(addr IpPort) {
1323 ip := maskIpForAcceptLimiting(addr.IP)
1324 if cl.acceptLimiter == nil {
1325 cl.acceptLimiter = make(map[ipStr]int)
1327 cl.acceptLimiter[ipStr(ip.String())]++
1330 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1331 if ip4 := ip.To4(); ip4 != nil {
1332 return ip4.Mask(net.CIDRMask(24, 32))
1337 func (cl *Client) clearAcceptLimits() {
1338 cl.acceptLimiter = nil
1341 func (cl *Client) acceptLimitClearer() {
1344 case <-cl.closed.LockedChan(cl.locker()):
1346 case <-time.After(15 * time.Minute):
1348 cl.clearAcceptLimits()
1354 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1355 if cl.config.DisableAcceptRateLimiting {
1358 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1361 func (cl *Client) rLock() {
1365 func (cl *Client) rUnlock() {
1369 func (cl *Client) lock() {
1373 func (cl *Client) unlock() {
1377 func (cl *Client) locker() sync.Locker {
1378 return clientLocker{cl}
1381 func (cl *Client) String() string {
1382 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1385 type clientLocker struct {
1389 func (cl clientLocker) Lock() {
1393 func (cl clientLocker) Unlock() {