19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo/bitmap"
23 "github.com/anacrolix/missinggo/perf"
24 "github.com/anacrolix/missinggo/pproffd"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/slices"
27 "github.com/anacrolix/missinggo/v2"
28 "github.com/anacrolix/missinggo/v2/conntrack"
29 "github.com/anacrolix/sync"
30 "github.com/davecgh/go-spew/spew"
31 "github.com/dustin/go-humanize"
32 "github.com/google/btree"
33 "golang.org/x/time/rate"
34 "golang.org/x/xerrors"
36 "github.com/anacrolix/torrent/bencode"
37 "github.com/anacrolix/torrent/iplist"
38 "github.com/anacrolix/torrent/metainfo"
39 "github.com/anacrolix/torrent/mse"
40 pp "github.com/anacrolix/torrent/peer_protocol"
41 "github.com/anacrolix/torrent/storage"
44 // Clients contain zero or more Torrents. A Client manages a blocklist, the
45 // TCP/UDP protocol ports, and DHT as desired.
47 // An aggregate of stats over all connections. First in struct to ensure
48 // 64-bit alignment of fields. See #262.
53 closed missinggo.Event
59 defaultStorage *storage.Client
62 dhtServers []*dht.Server
63 ipBlockList iplist.Ranger
64 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
65 extensionBytes pp.PeerExtensionBits
67 // Set of addresses that have our client ID. This intentionally will
68 // include ourselves if we end up trying to connect to our own address
69 // through legitimate channels.
70 dopplegangerAddrs map[string]struct{}
71 badPeerIPs map[string]struct{}
72 torrents map[InfoHash]*Torrent
74 acceptLimiter map[ipStr]int
75 dialRateLimiter *rate.Limiter
80 func (cl *Client) BadPeerIPs() []string {
83 return cl.badPeerIPsLocked()
86 func (cl *Client) badPeerIPsLocked() []string {
87 return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 func (cl *Client) PeerID() PeerID {
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, &cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = log.Debug
158 func (cl *Client) debugLogFilter(m log.Msg) bool {
162 return !m.HasValue(debugLogValue)
165 func (cl *Client) initLogger() {
166 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
169 func (cl *Client) announceKey() int32 {
170 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
173 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
175 cfg = NewDefaultClientConfig()
185 dopplegangerAddrs: make(map[string]struct{}),
186 torrents: make(map[metainfo.Hash]*Torrent),
187 dialRateLimiter: rate.NewLimiter(10, 10),
189 go cl.acceptLimitClearer()
197 cl.extensionBytes = defaultPeerExtensionBytes()
198 cl.event.L = cl.locker()
199 storageImpl := cfg.DefaultStorage
200 if storageImpl == nil {
201 // We'd use mmap but HFS+ doesn't support sparse files.
202 storageImpl = storage.NewFile(cfg.DataDir)
203 cl.onClose = append(cl.onClose, func() {
204 if err := storageImpl.Close(); err != nil {
205 cl.logger.Printf("error closing default storage: %s", err)
209 cl.defaultStorage = storage.NewClient(storageImpl)
210 if cfg.IPBlocklist != nil {
211 cl.ipBlockList = cfg.IPBlocklist
214 if cfg.PeerID != "" {
215 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
217 o := copy(cl.peerID[:], cfg.Bep20)
218 _, err = rand.Read(cl.peerID[o:])
220 panic("error generating peer id")
224 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
225 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
226 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
230 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
237 for _, s := range cl.conns {
238 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
239 go cl.acceptConnections(s)
245 for _, s := range cl.conns {
246 if pc, ok := s.(net.PacketConn); ok {
247 ds, err := cl.newDhtServer(pc)
251 cl.dhtServers = append(cl.dhtServers, ds)
259 func (cl *Client) firewallCallback(net.Addr) bool {
261 block := !cl.wantConns()
264 torrent.Add("connections firewalled", 1)
266 torrent.Add("connections not firewalled", 1)
271 func (cl *Client) enabledPeerNetworks() (ns []network) {
272 for _, n := range allPeerNetworks {
273 if peerNetworkEnabled(n, cl.config) {
280 func (cl *Client) listenOnNetwork(n network) bool {
281 if n.Ipv4 && cl.config.DisableIPv4 {
284 if n.Ipv6 && cl.config.DisableIPv6 {
287 if n.Tcp && cl.config.DisableTCP {
290 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
296 func (cl *Client) listenNetworks() (ns []network) {
297 for _, n := range allPeerNetworks {
298 if cl.listenOnNetwork(n) {
305 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
306 cfg := dht.ServerConfig{
307 IPBlocklist: cl.ipBlockList,
309 OnAnnouncePeer: cl.onDHTAnnouncePeer,
310 PublicIP: func() net.IP {
311 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
312 return cl.config.PublicIp6
314 return cl.config.PublicIp4
316 StartingNodes: cl.config.DhtStartingNodes,
317 ConnectionTracking: cl.config.ConnTracker,
318 OnQuery: cl.config.DHTOnQuery,
319 Logger: cl.logger.WithValues("dht", conn.LocalAddr().String()),
321 s, err = dht.NewServer(&cfg)
324 ts, err := s.Bootstrap()
326 cl.logger.Printf("error bootstrapping dht: %s", err)
328 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
334 func (cl *Client) Closed() <-chan struct{} {
340 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
341 for _, ds := range cl.dhtServers {
346 func (cl *Client) closeSockets() {
347 cl.eachListener(func(l socket) bool {
354 // Stops the client. All connections to peers are closed and all activity will
356 func (cl *Client) Close() {
360 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
362 for _, t := range cl.torrents {
365 for _, f := range cl.onClose {
371 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
372 if cl.ipBlockList == nil {
375 return cl.ipBlockList.Lookup(ip)
378 func (cl *Client) ipIsBlocked(ip net.IP) bool {
379 _, blocked := cl.ipBlockRange(ip)
383 func (cl *Client) wantConns() bool {
384 for _, t := range cl.torrents {
392 func (cl *Client) waitAccept() {
394 if cl.closed.IsSet() {
404 func (cl *Client) rejectAccepted(conn net.Conn) bool {
405 ra := conn.RemoteAddr()
406 rip := missinggo.AddrIP(ra)
407 if cl.config.DisableIPv4Peers && rip.To4() != nil {
410 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
413 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
416 if cl.rateLimitAccept(rip) {
419 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
422 func (cl *Client) acceptConnections(l net.Listener) {
424 conn, err := l.Accept()
425 torrent.Add("client listener accepts", 1)
426 conn = pproffd.WrapNetConn(conn)
428 closed := cl.closed.IsSet()
431 reject = cl.rejectAccepted(conn)
441 cl.logger.Printf("error accepting connection: %s", err)
446 torrent.Add("rejected accepted connections", 1)
449 go cl.incomingConnection(conn)
451 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
452 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
453 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
454 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
459 func (cl *Client) incomingConnection(nc net.Conn) {
461 if tc, ok := nc.(*net.TCPConn); ok {
464 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
465 c.Discovery = peerSourceIncoming
466 cl.runReceivedConn(c)
469 // Returns a handle to the given torrent, if it's present in the client.
470 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
473 t, ok = cl.torrents[ih]
477 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
478 return cl.torrents[ih]
481 type dialResult struct {
486 func countDialResult(err error) {
488 torrent.Add("successful dials", 1)
490 torrent.Add("unsuccessful dials", 1)
494 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
495 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
496 if ret < minDialTimeout {
502 // Returns whether an address is known to connect to a client with our own ID.
503 func (cl *Client) dopplegangerAddr(addr string) bool {
504 _, ok := cl.dopplegangerAddrs[addr]
508 // Returns a connection over UTP or TCP, whichever is first to connect.
509 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
511 t := perf.NewTimer(perf.CallerName(0))
514 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
516 t.Mark("returned conn over " + res.Network)
520 ctx, cancel := context.WithCancel(ctx)
521 // As soon as we return one connection, cancel the others.
524 resCh := make(chan dialResult, left)
528 cl.eachListener(func(s socket) bool {
530 network := s.Addr().Network()
531 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
535 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
538 cl.dialFromSocket(ctx, s, addr),
546 // Wait for a successful connection.
548 defer perf.ScopeTimer()()
549 for ; left > 0 && res.Conn == nil; left-- {
553 // There are still incompleted dials.
555 for ; left > 0; left-- {
556 conn := (<-resCh).Conn
563 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
565 //if res.Conn != nil {
566 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
568 // cl.logger.Printf("failed to dial %s", addr)
573 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
574 network := s.Addr().Network()
575 cte := cl.config.ConnTracker.Wait(
577 conntrack.Entry{network, s.Addr().String(), addr},
578 "dial torrent client",
581 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
582 // which dial errors allow us to forget the connection tracking entry handle.
583 if ctx.Err() != nil {
589 c, err := s.dial(ctx, addr)
590 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
591 // it now in case we close the connection forthwith.
592 if tc, ok := c.(*net.TCPConn); ok {
597 if err != nil && forgettableDialError(err) {
604 return closeWrapper{c, func() error {
611 func forgettableDialError(err error) bool {
612 return strings.Contains(err.Error(), "no suitable address found")
615 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
616 if _, ok := t.halfOpen[addr]; !ok {
617 panic("invariant broken")
619 delete(t.halfOpen, addr)
623 // Performs initiator handshakes and returns a connection. Returns nil
624 // *connection if no connection for valid reasons.
625 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
626 c = cl.newConnection(nc, true, remoteAddr, network)
627 c.headerEncrypted = encryptHeader
628 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
630 dl, ok := ctx.Deadline()
634 err = nc.SetDeadline(dl)
638 err = cl.initiateHandshakes(c, t)
642 // Returns nil connection and nil error if no connection could be established
643 // for valid reasons.
644 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
645 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
648 return t.dialTimeout()
651 dr := cl.dialFirst(dialCtx, addr.String())
654 if dialCtx.Err() != nil {
655 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
657 return nil, errors.New("dial failed")
659 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
666 // Returns nil connection and nil error if no connection could be established
667 // for valid reasons.
668 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
669 torrent.Add("establish outgoing connection", 1)
670 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
671 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
673 torrent.Add("initiated conn with preferred header obfuscation", 1)
676 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
677 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
678 // We should have just tried with the preferred header obfuscation. If it was required,
679 // there's nothing else to try.
682 // Try again with encryption if we didn't earlier, or without if we did.
683 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
685 torrent.Add("initiated conn with fallback header obfuscation", 1)
687 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
691 // Called to dial out and run a connection. The addr we're given is already
692 // considered half-open.
693 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
694 cl.dialRateLimiter.Wait(context.Background())
695 c, err := cl.establishOutgoingConn(t, addr)
698 // Don't release lock between here and addConnection, unless it's for
700 cl.noLongerHalfOpen(t, addr.String())
703 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
710 cl.runHandshookConn(c, t)
713 // The port number for incoming peer connections. 0 if the client isn't
715 func (cl *Client) incomingPeerPort() int {
716 return cl.LocalPort()
719 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
720 if c.headerEncrypted {
723 rw, c.cryptoMethod, err = mse.InitiateHandshake(
730 cl.config.CryptoProvides,
734 return xerrors.Errorf("header obfuscation handshake: %w", err)
737 ih, err := cl.connBtHandshake(c, &t.infoHash)
739 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
741 if ih != t.infoHash {
742 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
747 // Calls f with any secret keys.
748 func (cl *Client) forSkeys(f func([]byte) bool) {
751 if false { // Emulate the bug from #114
753 for ih := range cl.torrents {
757 for range cl.torrents {
764 for ih := range cl.torrents {
771 // Do encryption and bittorrent handshakes as receiver.
772 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
773 defer perf.ScopeTimerErr(&err)()
775 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
777 if err == nil || err == mse.ErrNoSecretKeyMatch {
778 if c.headerEncrypted {
779 torrent.Add("handshakes received encrypted", 1)
781 torrent.Add("handshakes received unencrypted", 1)
784 torrent.Add("handshakes received with error while handling encryption", 1)
787 if err == mse.ErrNoSecretKeyMatch {
792 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
793 err = errors.New("connection not have required header obfuscation")
796 ih, err := cl.connBtHandshake(c, nil)
798 err = xerrors.Errorf("during bt handshake: %w", err)
807 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
808 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
813 c.PeerExtensionBytes = res.PeerExtensionBits
814 c.PeerID = res.PeerID
815 c.completedHandshake = time.Now()
819 func (cl *Client) runReceivedConn(c *connection) {
820 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
824 t, err := cl.receiveHandshakes(c)
827 "error receiving handshakes: %s", err,
831 "network", c.network,
833 torrent.Add("error receiving handshake", 1)
835 cl.onBadAccept(c.remoteAddr)
840 torrent.Add("received handshake for unloaded torrent", 1)
842 cl.onBadAccept(c.remoteAddr)
846 torrent.Add("received handshake for loaded torrent", 1)
849 cl.runHandshookConn(c, t)
852 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
854 if c.PeerID == cl.peerID {
857 addr := c.conn.RemoteAddr().String()
858 cl.dopplegangerAddrs[addr] = struct{}{}
860 // Because the remote address is not necessarily the same as its
861 // client's torrent listen address, we won't record the remote address
862 // as a doppleganger. Instead, the initiator can record *us* as the
867 c.conn.SetWriteDeadline(time.Time{})
868 c.r = deadlineReader{c.conn, c.r}
869 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
870 if connIsIpv6(c.conn) {
871 torrent.Add("completed handshake over ipv6", 1)
873 if err := t.addConnection(c); err != nil {
874 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
877 defer t.dropConnection(c)
878 go c.writer(time.Minute)
879 cl.sendInitialMessages(c, t)
880 err := c.mainReadLoop()
881 if err != nil && cl.config.Debug {
882 cl.logger.Printf("error during connection main read loop: %s", err)
886 // See the order given in Transmission's tr_peerMsgsNew.
887 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
888 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
889 conn.Post(pp.Message{
891 ExtendedID: pp.HandshakeExtendedID,
892 ExtendedPayload: func() []byte {
893 msg := pp.ExtendedHandshakeMessage{
894 M: map[pp.ExtensionName]pp.ExtensionNumber{
895 pp.ExtensionNameMetadata: metadataExtendedId,
897 V: cl.config.ExtendedHandshakeClientVersion,
898 Reqq: 64, // TODO: Really?
899 YourIp: pp.CompactIp(conn.remoteAddr.IP),
900 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
901 Port: cl.incomingPeerPort(),
902 MetadataSize: torrent.metadataSize(),
903 // TODO: We can figured these out specific to the socket
905 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
906 Ipv6: cl.config.PublicIp6.To16(),
908 if !cl.config.DisablePEX {
909 msg.M[pp.ExtensionNamePex] = pexExtendedId
911 return bencode.MustMarshal(msg)
916 if conn.fastEnabled() {
917 if torrent.haveAllPieces() {
918 conn.Post(pp.Message{Type: pp.HaveAll})
919 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
921 } else if !torrent.haveAnyPieces() {
922 conn.Post(pp.Message{Type: pp.HaveNone})
923 conn.sentHaves.Clear()
929 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
930 conn.Post(pp.Message{
937 func (cl *Client) dhtPort() (ret uint16) {
938 cl.eachDhtServer(func(s *dht.Server) {
939 ret = uint16(missinggo.AddrPort(s.Addr()))
944 func (cl *Client) haveDhtServer() (ret bool) {
945 cl.eachDhtServer(func(_ *dht.Server) {
951 // Process incoming ut_metadata message.
952 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
954 err := bencode.Unmarshal(payload, &d)
955 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
956 } else if err != nil {
957 return fmt.Errorf("error unmarshalling bencode: %s", err)
959 msgType, ok := d["msg_type"]
961 return errors.New("missing msg_type field")
965 case pp.DataMetadataExtensionMsgType:
966 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
967 if !c.requestedMetadataPiece(piece) {
968 return fmt.Errorf("got unexpected piece %d", piece)
970 c.metadataRequests[piece] = false
971 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
972 if begin < 0 || begin >= len(payload) {
973 return fmt.Errorf("data has bad offset in payload: %d", begin)
975 t.saveMetadataPiece(piece, payload[begin:])
976 c.lastUsefulChunkReceived = time.Now()
977 return t.maybeCompleteMetadata()
978 case pp.RequestMetadataExtensionMsgType:
979 if !t.haveMetadataPiece(piece) {
980 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
983 start := (1 << 14) * piece
984 c.logger.Printf("sending metadata piece %d", piece)
985 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
987 case pp.RejectMetadataExtensionMsgType:
990 return errors.New("unknown msg_type value")
994 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
998 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1001 if _, ok := cl.ipBlockRange(ip); ok {
1004 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1010 // Return a Torrent ready for insertion into a Client.
1011 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1012 // use provided storage, if provided
1013 storageClient := cl.defaultStorage
1014 if specStorage != nil {
1015 storageClient = storage.NewClient(specStorage)
1021 peers: prioritizedPeers{
1023 getPrio: func(p Peer) peerPriority {
1024 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1027 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1029 halfOpen: make(map[string]Peer),
1030 pieceStateChanges: pubsub.NewPubSub(),
1032 storageOpener: storageClient,
1033 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1035 networkingEnabled: true,
1037 metadataChanged: sync.Cond{
1040 duplicateRequestTimeout: 1 * time.Second,
1042 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1043 return fmt.Sprintf("%v: %s", t, m.Text())
1045 t.setChunkSize(defaultChunkSize)
1049 // A file-like handle to some torrent data resource.
1050 type Handle interface {
1057 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1058 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1061 // Adds a torrent by InfoHash with a custom Storage implementation.
1062 // If the torrent already exists then this Storage is ignored and the
1063 // existing torrent returned with `new` set to `false`
1064 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1067 t, ok := cl.torrents[infoHash]
1073 t = cl.newTorrent(infoHash, specStorage)
1074 cl.eachDhtServer(func(s *dht.Server) {
1075 go t.dhtAnnouncer(s)
1077 cl.torrents[infoHash] = t
1078 cl.clearAcceptLimits()
1079 t.updateWantPeersEvent()
1080 // Tickle Client.waitAccept, new torrent may want conns.
1081 cl.event.Broadcast()
1085 // Add or merge a torrent spec. If the torrent is already present, the
1086 // trackers will be merged with the existing ones. If the Info isn't yet
1087 // known, it will be set. The display name is replaced if the new spec
1088 // provides one. Returns new if the torrent wasn't already in the client.
1089 // Note that any `Storage` defined on the spec will be ignored if the
1090 // torrent is already present (i.e. `new` return value is `true`)
1091 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1092 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1093 if spec.DisplayName != "" {
1094 t.SetDisplayName(spec.DisplayName)
1096 if spec.InfoBytes != nil {
1097 err = t.SetInfoBytes(spec.InfoBytes)
1104 if spec.ChunkSize != 0 {
1105 t.setChunkSize(pp.Integer(spec.ChunkSize))
1107 t.addTrackers(spec.Trackers)
1112 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1113 t, ok := cl.torrents[infoHash]
1115 err = fmt.Errorf("no such torrent")
1122 delete(cl.torrents, infoHash)
1126 func (cl *Client) allTorrentsCompleted() bool {
1127 for _, t := range cl.torrents {
1131 if !t.haveAllPieces() {
1138 // Returns true when all torrents are completely downloaded and false if the
1139 // client is stopped before that.
1140 func (cl *Client) WaitAll() bool {
1143 for !cl.allTorrentsCompleted() {
1144 if cl.closed.IsSet() {
1152 // Returns handles to all the torrents loaded in the Client.
1153 func (cl *Client) Torrents() []*Torrent {
1156 return cl.torrentsAsSlice()
1159 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1160 for _, t := range cl.torrents {
1161 ret = append(ret, t)
1166 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1167 spec, err := TorrentSpecFromMagnetURI(uri)
1171 T, _, err = cl.AddTorrentSpec(spec)
1175 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1176 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1178 slices.MakeInto(&ss, mi.Nodes)
1183 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1184 mi, err := metainfo.LoadFromFile(filename)
1188 return cl.AddTorrent(mi)
1191 func (cl *Client) DhtServers() []*dht.Server {
1192 return cl.dhtServers
1195 func (cl *Client) AddDHTNodes(nodes []string) {
1196 for _, n := range nodes {
1197 hmp := missinggo.SplitHostMaybePort(n)
1198 ip := net.ParseIP(hmp.Host)
1200 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1203 ni := krpc.NodeInfo{
1204 Addr: krpc.NodeAddr{
1209 cl.eachDhtServer(func(s *dht.Server) {
1215 func (cl *Client) banPeerIP(ip net.IP) {
1216 cl.logger.Printf("banning ip %v", ip)
1217 if cl.badPeerIPs == nil {
1218 cl.badPeerIPs = make(map[string]struct{})
1220 cl.badPeerIPs[ip.String()] = struct{}{}
1223 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1229 PeerMaxRequests: 250,
1230 writeBuffer: new(bytes.Buffer),
1231 remoteAddr: remoteAddr,
1234 c.logger = cl.logger.WithValues(c,
1235 log.Debug, // I want messages to default to debug, and can set it here as it's only used by new code
1236 ).WithText(func(m log.Msg) string {
1237 return fmt.Sprintf("%v: %s", c, m.Text())
1239 c.writerCond.L = cl.locker()
1240 c.setRW(connStatsReadWriter{nc, c})
1241 c.r = &rateLimitedReader{
1242 l: cl.config.DownloadRateLimiter,
1245 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1249 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1259 Source: peerSourceDhtAnnouncePeer,
1263 func firstNotNil(ips ...net.IP) net.IP {
1264 for _, ip := range ips {
1272 func (cl *Client) eachListener(f func(socket) bool) {
1273 for _, s := range cl.conns {
1280 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1281 cl.eachListener(func(l socket) bool {
1288 func (cl *Client) publicIp(peer net.IP) net.IP {
1289 // TODO: Use BEP 10 to determine how peers are seeing us.
1290 if peer.To4() != nil {
1292 cl.config.PublicIp4,
1293 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1297 cl.config.PublicIp6,
1298 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1303 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1304 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1305 return f(missinggo.AddrIP(l.Addr()))
1309 // Our IP as a peer should see it.
1310 func (cl *Client) publicAddr(peer net.IP) IpPort {
1311 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1314 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1317 cl.eachListener(func(l socket) bool {
1318 ret = append(ret, l.Addr())
1324 func (cl *Client) onBadAccept(addr IpPort) {
1325 ip := maskIpForAcceptLimiting(addr.IP)
1326 if cl.acceptLimiter == nil {
1327 cl.acceptLimiter = make(map[ipStr]int)
1329 cl.acceptLimiter[ipStr(ip.String())]++
1332 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1333 if ip4 := ip.To4(); ip4 != nil {
1334 return ip4.Mask(net.CIDRMask(24, 32))
1339 func (cl *Client) clearAcceptLimits() {
1340 cl.acceptLimiter = nil
1343 func (cl *Client) acceptLimitClearer() {
1346 case <-cl.closed.LockedChan(cl.locker()):
1348 case <-time.After(15 * time.Minute):
1350 cl.clearAcceptLimits()
1356 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1357 if cl.config.DisableAcceptRateLimiting {
1360 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1363 func (cl *Client) rLock() {
1367 func (cl *Client) rUnlock() {
1371 func (cl *Client) lock() {
1375 func (cl *Client) unlock() {
1379 func (cl *Client) locker() sync.Locker {
1380 return clientLocker{cl}
1383 func (cl *Client) String() string {
1384 return fmt.Sprintf("<%[1]T %[1]p>", cl)
1387 type clientLocker struct {
1391 func (cl clientLocker) Lock() {
1395 func (cl clientLocker) Unlock() {