19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/dht/v2/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/bitmap"
24 "github.com/anacrolix/missinggo/conntrack"
25 "github.com/anacrolix/missinggo/perf"
26 "github.com/anacrolix/missinggo/pproffd"
27 "github.com/anacrolix/missinggo/pubsub"
28 "github.com/anacrolix/missinggo/slices"
29 "github.com/anacrolix/sync"
30 "github.com/anacrolix/torrent/bencode"
31 "github.com/anacrolix/torrent/iplist"
32 "github.com/anacrolix/torrent/metainfo"
33 "github.com/anacrolix/torrent/mse"
34 pp "github.com/anacrolix/torrent/peer_protocol"
35 "github.com/anacrolix/torrent/storage"
36 "github.com/davecgh/go-spew/spew"
37 "github.com/dustin/go-humanize"
38 "github.com/google/btree"
39 "golang.org/x/time/rate"
40 "golang.org/x/xerrors"
43 // Clients contain zero or more Torrents. A Client manages a blocklist, the
44 // TCP/UDP protocol ports, and DHT as desired.
46 // An aggregate of stats over all connections. First in struct to ensure
47 // 64-bit alignment of fields. See #262.
52 closed missinggo.Event
58 defaultStorage *storage.Client
61 dhtServers []*dht.Server
62 ipBlockList iplist.Ranger
63 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
64 extensionBytes pp.PeerExtensionBits
66 // Set of addresses that have our client ID. This intentionally will
67 // include ourselves if we end up trying to connect to our own address
68 // through legitimate channels.
69 dopplegangerAddrs map[string]struct{}
70 badPeerIPs map[string]struct{}
71 torrents map[InfoHash]*Torrent
73 acceptLimiter map[ipStr]int
74 dialRateLimiter *rate.Limiter
79 func (cl *Client) BadPeerIPs() []string {
82 return cl.badPeerIPsLocked()
85 func (cl *Client) badPeerIPsLocked() []string {
86 return slices.FromMapKeys(cl.badPeerIPs).([]string)
89 func (cl *Client) PeerID() PeerID {
93 func (cl *Client) LocalPort() (port int) {
94 cl.eachListener(func(l socket) bool {
95 _port := missinggo.AddrPort(l.Addr())
101 } else if port != _port {
102 panic("mismatched ports")
109 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
110 dhtStats := s.Stats()
111 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
112 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
113 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
114 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
117 // Writes out a human readable status of the client, such as for writing to a
119 func (cl *Client) WriteStatus(_w io.Writer) {
122 w := bufio.NewWriter(_w)
124 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
125 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
126 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
127 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
128 cl.eachDhtServer(func(s *dht.Server) {
129 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
130 writeDhtServerStatus(w, s)
132 spew.Fdump(w, &cl.stats)
133 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136 return l.InfoHash().AsString() < r.InfoHash().AsString()
139 fmt.Fprint(w, "<unknown name>")
141 fmt.Fprint(w, t.name())
145 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147 w.WriteString("<missing metainfo>")
155 const debugLogValue = log.Debug
157 func (cl *Client) debugLogFilter(m log.Msg) bool {
161 return !m.HasValue(debugLogValue)
164 func (cl *Client) initLogger() {
165 cl.logger = cl.config.Logger.WithValues(cl).WithFilter(cl.debugLogFilter)
168 func (cl *Client) announceKey() int32 {
169 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
172 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
174 cfg = NewDefaultClientConfig()
184 dopplegangerAddrs: make(map[string]struct{}),
185 torrents: make(map[metainfo.Hash]*Torrent),
186 dialRateLimiter: rate.NewLimiter(10, 10),
188 go cl.acceptLimitClearer()
196 cl.extensionBytes = defaultPeerExtensionBytes()
197 cl.event.L = cl.locker()
198 storageImpl := cfg.DefaultStorage
199 if storageImpl == nil {
200 // We'd use mmap but HFS+ doesn't support sparse files.
201 storageImpl = storage.NewFile(cfg.DataDir)
202 cl.onClose = append(cl.onClose, func() {
203 if err := storageImpl.Close(); err != nil {
204 cl.logger.Printf("error closing default storage: %s", err)
208 cl.defaultStorage = storage.NewClient(storageImpl)
209 if cfg.IPBlocklist != nil {
210 cl.ipBlockList = cfg.IPBlocklist
213 if cfg.PeerID != "" {
214 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
216 o := copy(cl.peerID[:], cfg.Bep20)
217 _, err = rand.Read(cl.peerID[o:])
219 panic("error generating peer id")
223 if cl.config.HTTPProxy == nil && cl.config.ProxyURL != "" {
224 if fixedURL, err := url.Parse(cl.config.ProxyURL); err == nil {
225 cl.config.HTTPProxy = http.ProxyURL(fixedURL)
229 cl.conns, err = listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL, cl.firewallCallback)
236 for _, s := range cl.conns {
237 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
238 go cl.acceptConnections(s)
244 for _, s := range cl.conns {
245 if pc, ok := s.(net.PacketConn); ok {
246 ds, err := cl.newDhtServer(pc)
250 cl.dhtServers = append(cl.dhtServers, ds)
258 func (cl *Client) firewallCallback(net.Addr) bool {
260 block := !cl.wantConns()
263 torrent.Add("connections firewalled", 1)
265 torrent.Add("connections not firewalled", 1)
270 func (cl *Client) enabledPeerNetworks() (ns []network) {
271 for _, n := range allPeerNetworks {
272 if peerNetworkEnabled(n, cl.config) {
279 func (cl *Client) listenOnNetwork(n network) bool {
280 if n.Ipv4 && cl.config.DisableIPv4 {
283 if n.Ipv6 && cl.config.DisableIPv6 {
286 if n.Tcp && cl.config.DisableTCP {
289 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
295 func (cl *Client) listenNetworks() (ns []network) {
296 for _, n := range allPeerNetworks {
297 if cl.listenOnNetwork(n) {
304 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
305 cfg := dht.ServerConfig{
306 IPBlocklist: cl.ipBlockList,
308 OnAnnouncePeer: cl.onDHTAnnouncePeer,
309 PublicIP: func() net.IP {
310 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
311 return cl.config.PublicIp6
313 return cl.config.PublicIp4
315 StartingNodes: cl.config.DhtStartingNodes,
316 ConnectionTracking: cl.config.ConnTracker,
317 OnQuery: cl.config.DHTOnQuery,
319 s, err = dht.NewServer(&cfg)
322 ts, err := s.Bootstrap()
324 cl.logger.Printf("error bootstrapping dht: %s", err)
326 log.Str("completed bootstrap").AddValues(s, ts).Log(cl.logger)
332 func (cl *Client) Closed() <-chan struct{} {
338 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
339 for _, ds := range cl.dhtServers {
344 func (cl *Client) closeSockets() {
345 cl.eachListener(func(l socket) bool {
352 // Stops the client. All connections to peers are closed and all activity will
354 func (cl *Client) Close() {
358 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
360 for _, t := range cl.torrents {
363 for _, f := range cl.onClose {
369 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
370 if cl.ipBlockList == nil {
373 return cl.ipBlockList.Lookup(ip)
376 func (cl *Client) ipIsBlocked(ip net.IP) bool {
377 _, blocked := cl.ipBlockRange(ip)
381 func (cl *Client) wantConns() bool {
382 for _, t := range cl.torrents {
390 func (cl *Client) waitAccept() {
392 if cl.closed.IsSet() {
402 func (cl *Client) rejectAccepted(conn net.Conn) bool {
403 ra := conn.RemoteAddr()
404 rip := missinggo.AddrIP(ra)
405 if cl.config.DisableIPv4Peers && rip.To4() != nil {
408 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
411 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
414 if cl.rateLimitAccept(rip) {
417 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
420 func (cl *Client) acceptConnections(l net.Listener) {
422 conn, err := l.Accept()
423 torrent.Add("client listener accepts", 1)
424 conn = pproffd.WrapNetConn(conn)
426 closed := cl.closed.IsSet()
429 reject = cl.rejectAccepted(conn)
439 cl.logger.Printf("error accepting connection: %s", err)
444 torrent.Add("rejected accepted connections", 1)
447 go cl.incomingConnection(conn)
449 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
450 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
451 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
452 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
457 func (cl *Client) incomingConnection(nc net.Conn) {
459 if tc, ok := nc.(*net.TCPConn); ok {
462 c := cl.newConnection(nc, false, missinggo.IpPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
463 c.Discovery = peerSourceIncoming
464 cl.runReceivedConn(c)
467 // Returns a handle to the given torrent, if it's present in the client.
468 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
471 t, ok = cl.torrents[ih]
475 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
476 return cl.torrents[ih]
479 type dialResult struct {
484 func countDialResult(err error) {
486 torrent.Add("successful dials", 1)
488 torrent.Add("unsuccessful dials", 1)
492 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
493 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
494 if ret < minDialTimeout {
500 // Returns whether an address is known to connect to a client with our own ID.
501 func (cl *Client) dopplegangerAddr(addr string) bool {
502 _, ok := cl.dopplegangerAddrs[addr]
506 // Returns a connection over UTP or TCP, whichever is first to connect.
507 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
509 t := perf.NewTimer(perf.CallerName(0))
512 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
514 t.Mark("returned conn over " + res.Network)
518 ctx, cancel := context.WithCancel(ctx)
519 // As soon as we return one connection, cancel the others.
522 resCh := make(chan dialResult, left)
526 cl.eachListener(func(s socket) bool {
528 network := s.Addr().Network()
529 if !peerNetworkEnabled(parseNetworkString(network), cl.config) {
533 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
536 cl.dialFromSocket(ctx, s, addr),
544 // Wait for a successful connection.
546 defer perf.ScopeTimer()()
547 for ; left > 0 && res.Conn == nil; left-- {
551 // There are still incompleted dials.
553 for ; left > 0; left-- {
554 conn := (<-resCh).Conn
561 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
563 //if res.Conn != nil {
564 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
566 // cl.logger.Printf("failed to dial %s", addr)
571 func (cl *Client) dialFromSocket(ctx context.Context, s socket, addr string) net.Conn {
572 network := s.Addr().Network()
573 cte := cl.config.ConnTracker.Wait(
575 conntrack.Entry{network, s.Addr().String(), addr},
576 "dial torrent client",
579 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
580 // which dial errors allow us to forget the connection tracking entry handle.
581 if ctx.Err() != nil {
587 c, err := s.dial(ctx, addr)
588 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
589 // it now in case we close the connection forthwith.
590 if tc, ok := c.(*net.TCPConn); ok {
595 if err != nil && forgettableDialError(err) {
602 return closeWrapper{c, func() error {
609 func forgettableDialError(err error) bool {
610 return strings.Contains(err.Error(), "no suitable address found")
613 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
614 if _, ok := t.halfOpen[addr]; !ok {
615 panic("invariant broken")
617 delete(t.halfOpen, addr)
621 // Performs initiator handshakes and returns a connection. Returns nil
622 // *connection if no connection for valid reasons.
623 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr IpPort, network string) (c *connection, err error) {
624 c = cl.newConnection(nc, true, remoteAddr, network)
625 c.headerEncrypted = encryptHeader
626 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
628 dl, ok := ctx.Deadline()
632 err = nc.SetDeadline(dl)
636 err = cl.initiateHandshakes(c, t)
640 // Returns nil connection and nil error if no connection could be established
641 // for valid reasons.
642 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr IpPort, obfuscatedHeader bool) (*connection, error) {
643 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
646 return t.dialTimeout()
649 dr := cl.dialFirst(dialCtx, addr.String())
652 if dialCtx.Err() != nil {
653 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
655 return nil, errors.New("dial failed")
657 c, err := cl.handshakesConnection(context.Background(), nc, t, obfuscatedHeader, addr, dr.Network)
664 // Returns nil connection and nil error if no connection could be established
665 // for valid reasons.
666 func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection, err error) {
667 torrent.Add("establish outgoing connection", 1)
668 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
669 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
671 torrent.Add("initiated conn with preferred header obfuscation", 1)
674 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
675 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
676 // We should have just tried with the preferred header obfuscation. If it was required,
677 // there's nothing else to try.
680 // Try again with encryption if we didn't earlier, or without if we did.
681 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
683 torrent.Add("initiated conn with fallback header obfuscation", 1)
685 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
689 // Called to dial out and run a connection. The addr we're given is already
690 // considered half-open.
691 func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
692 cl.dialRateLimiter.Wait(context.Background())
693 c, err := cl.establishOutgoingConn(t, addr)
696 // Don't release lock between here and addConnection, unless it's for
698 cl.noLongerHalfOpen(t, addr.String())
701 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
707 cl.runHandshookConn(c, t)
710 // The port number for incoming peer connections. 0 if the client isn't
712 func (cl *Client) incomingPeerPort() int {
713 return cl.LocalPort()
716 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) error {
717 if c.headerEncrypted {
720 rw, c.cryptoMethod, err = mse.InitiateHandshake(
727 cl.config.CryptoProvides,
731 return xerrors.Errorf("header obfuscation handshake: %w", err)
734 ih, err := cl.connBtHandshake(c, &t.infoHash)
736 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
738 if ih != t.infoHash {
739 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
744 // Calls f with any secret keys.
745 func (cl *Client) forSkeys(f func([]byte) bool) {
748 if false { // Emulate the bug from #114
750 for ih := range cl.torrents {
754 for range cl.torrents {
761 for ih := range cl.torrents {
768 // Do encryption and bittorrent handshakes as receiver.
769 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
770 defer perf.ScopeTimerErr(&err)()
772 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
774 if err == nil || err == mse.ErrNoSecretKeyMatch {
775 if c.headerEncrypted {
776 torrent.Add("handshakes received encrypted", 1)
778 torrent.Add("handshakes received unencrypted", 1)
781 torrent.Add("handshakes received with error while handling encryption", 1)
784 if err == mse.ErrNoSecretKeyMatch {
789 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
790 err = errors.New("connection not have required header obfuscation")
793 ih, err := cl.connBtHandshake(c, nil)
795 err = xerrors.Errorf("during bt handshake: %w", err)
804 func (cl *Client) connBtHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
805 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
810 c.PeerExtensionBytes = res.PeerExtensionBits
811 c.PeerID = res.PeerID
812 c.completedHandshake = time.Now()
816 func (cl *Client) runReceivedConn(c *connection) {
817 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
821 t, err := cl.receiveHandshakes(c)
824 "error receiving handshakes: %s", err,
828 "network", c.network,
830 torrent.Add("error receiving handshake", 1)
832 cl.onBadAccept(c.remoteAddr)
837 torrent.Add("received handshake for unloaded torrent", 1)
839 cl.onBadAccept(c.remoteAddr)
843 torrent.Add("received handshake for loaded torrent", 1)
846 cl.runHandshookConn(c, t)
849 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
851 if c.PeerID == cl.peerID {
854 addr := c.conn.RemoteAddr().String()
855 cl.dopplegangerAddrs[addr] = struct{}{}
857 // Because the remote address is not necessarily the same as its
858 // client's torrent listen address, we won't record the remote address
859 // as a doppleganger. Instead, the initiator can record *us* as the
864 c.conn.SetWriteDeadline(time.Time{})
865 c.r = deadlineReader{c.conn, c.r}
866 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
867 if connIsIpv6(c.conn) {
868 torrent.Add("completed handshake over ipv6", 1)
870 if err := t.addConnection(c); err != nil {
871 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
874 defer t.dropConnection(c)
875 go c.writer(time.Minute)
876 cl.sendInitialMessages(c, t)
877 err := c.mainReadLoop()
878 if err != nil && cl.config.Debug {
879 cl.logger.Printf("error during connection main read loop: %s", err)
883 // See the order given in Transmission's tr_peerMsgsNew.
884 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
885 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
886 conn.Post(pp.Message{
888 ExtendedID: pp.HandshakeExtendedID,
889 ExtendedPayload: func() []byte {
890 msg := pp.ExtendedHandshakeMessage{
891 M: map[pp.ExtensionName]pp.ExtensionNumber{
892 pp.ExtensionNameMetadata: metadataExtendedId,
894 V: cl.config.ExtendedHandshakeClientVersion,
895 Reqq: 64, // TODO: Really?
896 YourIp: pp.CompactIp(conn.remoteAddr.IP),
897 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
898 Port: cl.incomingPeerPort(),
899 MetadataSize: torrent.metadataSize(),
900 // TODO: We can figured these out specific to the socket
902 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
903 Ipv6: cl.config.PublicIp6.To16(),
905 if !cl.config.DisablePEX {
906 msg.M[pp.ExtensionNamePex] = pexExtendedId
908 return bencode.MustMarshal(msg)
913 if conn.fastEnabled() {
914 if torrent.haveAllPieces() {
915 conn.Post(pp.Message{Type: pp.HaveAll})
916 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
918 } else if !torrent.haveAnyPieces() {
919 conn.Post(pp.Message{Type: pp.HaveNone})
920 conn.sentHaves.Clear()
926 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
927 conn.Post(pp.Message{
934 func (cl *Client) dhtPort() (ret uint16) {
935 cl.eachDhtServer(func(s *dht.Server) {
936 ret = uint16(missinggo.AddrPort(s.Addr()))
941 func (cl *Client) haveDhtServer() (ret bool) {
942 cl.eachDhtServer(func(_ *dht.Server) {
948 // Process incoming ut_metadata message.
949 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
951 err := bencode.Unmarshal(payload, &d)
952 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
953 } else if err != nil {
954 return fmt.Errorf("error unmarshalling bencode: %s", err)
956 msgType, ok := d["msg_type"]
958 return errors.New("missing msg_type field")
962 case pp.DataMetadataExtensionMsgType:
963 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
964 if !c.requestedMetadataPiece(piece) {
965 return fmt.Errorf("got unexpected piece %d", piece)
967 c.metadataRequests[piece] = false
968 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
969 if begin < 0 || begin >= len(payload) {
970 return fmt.Errorf("data has bad offset in payload: %d", begin)
972 t.saveMetadataPiece(piece, payload[begin:])
973 c.lastUsefulChunkReceived = time.Now()
974 return t.maybeCompleteMetadata()
975 case pp.RequestMetadataExtensionMsgType:
976 if !t.haveMetadataPiece(piece) {
977 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
980 start := (1 << 14) * piece
981 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
983 case pp.RejectMetadataExtensionMsgType:
986 return errors.New("unknown msg_type value")
990 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
994 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
997 if _, ok := cl.ipBlockRange(ip); ok {
1000 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1006 // Return a Torrent ready for insertion into a Client.
1007 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1008 // use provided storage, if provided
1009 storageClient := cl.defaultStorage
1010 if specStorage != nil {
1011 storageClient = storage.NewClient(specStorage)
1017 peers: prioritizedPeers{
1019 getPrio: func(p Peer) peerPriority {
1020 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
1023 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1025 halfOpen: make(map[string]Peer),
1026 pieceStateChanges: pubsub.NewPubSub(),
1028 storageOpener: storageClient,
1029 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1031 networkingEnabled: true,
1033 metadataChanged: sync.Cond{
1036 duplicateRequestTimeout: 1 * time.Second,
1038 t.logger = cl.logger.WithValues(t)
1039 t.setChunkSize(defaultChunkSize)
1043 // A file-like handle to some torrent data resource.
1044 type Handle interface {
1051 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1052 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1055 // Adds a torrent by InfoHash with a custom Storage implementation.
1056 // If the torrent already exists then this Storage is ignored and the
1057 // existing torrent returned with `new` set to `false`
1058 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1061 t, ok := cl.torrents[infoHash]
1067 t = cl.newTorrent(infoHash, specStorage)
1068 cl.eachDhtServer(func(s *dht.Server) {
1069 go t.dhtAnnouncer(s)
1071 cl.torrents[infoHash] = t
1072 cl.clearAcceptLimits()
1073 t.updateWantPeersEvent()
1074 // Tickle Client.waitAccept, new torrent may want conns.
1075 cl.event.Broadcast()
1079 // Add or merge a torrent spec. If the torrent is already present, the
1080 // trackers will be merged with the existing ones. If the Info isn't yet
1081 // known, it will be set. The display name is replaced if the new spec
1082 // provides one. Returns new if the torrent wasn't already in the client.
1083 // Note that any `Storage` defined on the spec will be ignored if the
1084 // torrent is already present (i.e. `new` return value is `true`)
1085 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1086 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1087 if spec.DisplayName != "" {
1088 t.SetDisplayName(spec.DisplayName)
1090 if spec.InfoBytes != nil {
1091 err = t.SetInfoBytes(spec.InfoBytes)
1098 if spec.ChunkSize != 0 {
1099 t.setChunkSize(pp.Integer(spec.ChunkSize))
1101 t.addTrackers(spec.Trackers)
1106 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1107 t, ok := cl.torrents[infoHash]
1109 err = fmt.Errorf("no such torrent")
1116 delete(cl.torrents, infoHash)
1120 func (cl *Client) allTorrentsCompleted() bool {
1121 for _, t := range cl.torrents {
1125 if !t.haveAllPieces() {
1132 // Returns true when all torrents are completely downloaded and false if the
1133 // client is stopped before that.
1134 func (cl *Client) WaitAll() bool {
1137 for !cl.allTorrentsCompleted() {
1138 if cl.closed.IsSet() {
1146 // Returns handles to all the torrents loaded in the Client.
1147 func (cl *Client) Torrents() []*Torrent {
1150 return cl.torrentsAsSlice()
1153 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1154 for _, t := range cl.torrents {
1155 ret = append(ret, t)
1160 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1161 spec, err := TorrentSpecFromMagnetURI(uri)
1165 T, _, err = cl.AddTorrentSpec(spec)
1169 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1170 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1172 slices.MakeInto(&ss, mi.Nodes)
1177 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1178 mi, err := metainfo.LoadFromFile(filename)
1182 return cl.AddTorrent(mi)
1185 func (cl *Client) DhtServers() []*dht.Server {
1186 return cl.dhtServers
1189 func (cl *Client) AddDHTNodes(nodes []string) {
1190 for _, n := range nodes {
1191 hmp := missinggo.SplitHostMaybePort(n)
1192 ip := net.ParseIP(hmp.Host)
1194 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1197 ni := krpc.NodeInfo{
1198 Addr: krpc.NodeAddr{
1203 cl.eachDhtServer(func(s *dht.Server) {
1209 func (cl *Client) banPeerIP(ip net.IP) {
1210 if cl.badPeerIPs == nil {
1211 cl.badPeerIPs = make(map[string]struct{})
1213 cl.badPeerIPs[ip.String()] = struct{}{}
1216 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr IpPort, network string) (c *connection) {
1222 PeerMaxRequests: 250,
1223 writeBuffer: new(bytes.Buffer),
1224 remoteAddr: remoteAddr,
1227 c.writerCond.L = cl.locker()
1228 c.setRW(connStatsReadWriter{nc, c})
1229 c.r = &rateLimitedReader{
1230 l: cl.config.DownloadRateLimiter,
1236 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1246 Source: peerSourceDHTAnnouncePeer,
1250 func firstNotNil(ips ...net.IP) net.IP {
1251 for _, ip := range ips {
1259 func (cl *Client) eachListener(f func(socket) bool) {
1260 for _, s := range cl.conns {
1267 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1268 cl.eachListener(func(l socket) bool {
1275 func (cl *Client) publicIp(peer net.IP) net.IP {
1276 // TODO: Use BEP 10 to determine how peers are seeing us.
1277 if peer.To4() != nil {
1279 cl.config.PublicIp4,
1280 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1284 cl.config.PublicIp6,
1285 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1290 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1291 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1292 return f(missinggo.AddrIP(l.Addr()))
1296 // Our IP as a peer should see it.
1297 func (cl *Client) publicAddr(peer net.IP) IpPort {
1298 return IpPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1301 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1304 cl.eachListener(func(l socket) bool {
1305 ret = append(ret, l.Addr())
1311 func (cl *Client) onBadAccept(addr IpPort) {
1312 ip := maskIpForAcceptLimiting(addr.IP)
1313 if cl.acceptLimiter == nil {
1314 cl.acceptLimiter = make(map[ipStr]int)
1316 cl.acceptLimiter[ipStr(ip.String())]++
1319 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1320 if ip4 := ip.To4(); ip4 != nil {
1321 return ip4.Mask(net.CIDRMask(24, 32))
1326 func (cl *Client) clearAcceptLimits() {
1327 cl.acceptLimiter = nil
1330 func (cl *Client) acceptLimitClearer() {
1333 case <-cl.closed.LockedChan(cl.locker()):
1335 case <-time.After(15 * time.Minute):
1337 cl.clearAcceptLimits()
1343 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1344 if cl.config.DisableAcceptRateLimiting {
1347 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1350 func (cl *Client) rLock() {
1354 func (cl *Client) rUnlock() {
1358 func (cl *Client) lock() {
1362 func (cl *Client) unlock() {
1366 func (cl *Client) locker() sync.Locker {
1367 return clientLocker{cl}
1370 type clientLocker struct {
1374 func (cl clientLocker) Lock() {
1378 func (cl clientLocker) Unlock() {