17 "github.com/anacrolix/dht/v2"
18 "github.com/anacrolix/dht/v2/krpc"
19 "github.com/anacrolix/log"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/perf"
22 "github.com/anacrolix/missinggo/pubsub"
23 "github.com/anacrolix/missinggo/slices"
24 "github.com/anacrolix/missinggo/v2/pproffd"
25 "github.com/anacrolix/sync"
26 "github.com/anacrolix/torrent/tracker"
27 "github.com/anacrolix/torrent/webtorrent"
28 "github.com/davecgh/go-spew/spew"
29 "github.com/dustin/go-humanize"
30 "github.com/google/btree"
31 "github.com/pion/datachannel"
32 "golang.org/x/time/rate"
33 "golang.org/x/xerrors"
35 "github.com/anacrolix/missinggo/v2"
36 "github.com/anacrolix/missinggo/v2/conntrack"
38 "github.com/anacrolix/torrent/bencode"
39 "github.com/anacrolix/torrent/iplist"
40 "github.com/anacrolix/torrent/metainfo"
41 "github.com/anacrolix/torrent/mse"
42 pp "github.com/anacrolix/torrent/peer_protocol"
43 "github.com/anacrolix/torrent/storage"
46 // Clients contain zero or more Torrents. A Client manages a blocklist, the
47 // TCP/UDP protocol ports, and DHT as desired.
49 // An aggregate of stats over all connections. First in struct to ensure
50 // 64-bit alignment of fields. See #262.
55 closed missinggo.Event
61 defaultStorage *storage.Client
65 dhtServers []DhtServer
66 ipBlockList iplist.Ranger
68 // Set of addresses that have our client ID. This intentionally will
69 // include ourselves if we end up trying to connect to our own address
70 // through legitimate channels.
71 dopplegangerAddrs map[string]struct{}
72 badPeerIPs map[string]struct{}
73 torrents map[InfoHash]*Torrent
75 acceptLimiter map[ipStr]int
76 dialRateLimiter *rate.Limiter
78 websocketTrackers websocketTrackers
83 func (cl *Client) BadPeerIPs() []string {
86 return cl.badPeerIPsLocked()
89 func (cl *Client) badPeerIPsLocked() []string {
90 return slices.FromMapKeys(cl.badPeerIPs).([]string)
93 func (cl *Client) PeerID() PeerID {
97 // Returns the port number for the first listener that has one. No longer assumes that all port
98 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
100 func (cl *Client) LocalPort() (port int) {
101 cl.eachListener(func(l Listener) bool {
102 port = addrPortOrZero(l.Addr())
108 func writeDhtServerStatus(w io.Writer, s DhtServer) {
109 dhtStats := s.Stats()
110 fmt.Fprintf(w, " ID: %x\n", s.ID())
111 spew.Fdump(w, dhtStats)
114 // Writes out a human readable status of the client, such as for writing to a
116 func (cl *Client) WriteStatus(_w io.Writer) {
119 w := bufio.NewWriter(_w)
121 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
122 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
123 fmt.Fprintf(w, "Extension bits: %v\n", cl.config.Extensions)
124 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
125 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126 cl.eachDhtServer(func(s DhtServer) {
127 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
128 writeDhtServerStatus(w, s)
130 spew.Fdump(w, &cl.stats)
131 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
133 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
134 return l.InfoHash().AsString() < r.InfoHash().AsString()
137 fmt.Fprint(w, "<unknown name>")
139 fmt.Fprint(w, t.name())
145 "%f%% of %d bytes (%s)",
146 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
148 humanize.Bytes(uint64(*t.length)))
150 w.WriteString("<missing metainfo>")
158 func (cl *Client) initLogger() {
159 cl.logger = cl.config.Logger.WithValues(cl)
160 if !cl.config.Debug {
161 cl.logger = cl.logger.FilterLevel(log.Info)
165 func (cl *Client) announceKey() int32 {
166 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
169 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
171 cfg = NewDefaultClientConfig()
181 dopplegangerAddrs: make(map[string]struct{}),
182 torrents: make(map[metainfo.Hash]*Torrent),
183 dialRateLimiter: rate.NewLimiter(10, 10),
185 go cl.acceptLimitClearer()
193 cl.event.L = cl.locker()
194 storageImpl := cfg.DefaultStorage
195 if storageImpl == nil {
196 // We'd use mmap by default but HFS+ doesn't support sparse files.
197 storageImplCloser := storage.NewFile(cfg.DataDir)
198 cl.onClose = append(cl.onClose, func() {
199 if err := storageImplCloser.Close(); err != nil {
200 cl.logger.Printf("error closing default storage: %s", err)
203 storageImpl = storageImplCloser
205 cl.defaultStorage = storage.NewClient(storageImpl)
206 if cfg.IPBlocklist != nil {
207 cl.ipBlockList = cfg.IPBlocklist
210 if cfg.PeerID != "" {
211 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
213 o := copy(cl.peerID[:], cfg.Bep20)
214 _, err = rand.Read(cl.peerID[o:])
216 panic("error generating peer id")
220 sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
228 for _, _s := range sockets {
229 s := _s // Go is fucking retarded.
230 cl.onClose = append(cl.onClose, func() { s.Close() })
231 if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
232 cl.dialers = append(cl.dialers, s)
233 cl.listeners = append(cl.listeners, s)
234 go cl.acceptConnections(s)
240 for _, s := range sockets {
241 if pc, ok := s.(net.PacketConn); ok {
242 ds, err := cl.newAnacrolixDhtServer(pc)
246 cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
247 cl.onClose = append(cl.onClose, func() { ds.Close() })
252 cl.websocketTrackers = websocketTrackers{
255 GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
258 t, ok := cl.torrents[infoHash]
260 return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
262 return t.announceRequest(event), nil
264 OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
267 t, ok := cl.torrents[dcc.InfoHash]
269 cl.logger.WithDefaultLevel(log.Warning).Printf(
270 "got webrtc conn for unloaded torrent with infohash %x",
276 go t.onWebRtcConn(dc, dcc)
283 func (cl *Client) AddDhtServer(d DhtServer) {
284 cl.dhtServers = append(cl.dhtServers, d)
287 // Adds a Dialer for outgoing connections. All Dialers are used when attempting to connect to a
288 // given address for any Torrent.
289 func (cl *Client) AddDialer(d Dialer) {
292 cl.dialers = append(cl.dialers, d)
293 for _, t := range cl.torrents {
298 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
300 func (cl *Client) AddListener(l Listener) {
301 cl.listeners = append(cl.listeners, l)
302 go cl.acceptConnections(l)
305 func (cl *Client) firewallCallback(net.Addr) bool {
307 block := !cl.wantConns()
310 torrent.Add("connections firewalled", 1)
312 torrent.Add("connections not firewalled", 1)
317 func (cl *Client) listenOnNetwork(n network) bool {
318 if n.Ipv4 && cl.config.DisableIPv4 {
321 if n.Ipv6 && cl.config.DisableIPv6 {
324 if n.Tcp && cl.config.DisableTCP {
327 if n.Udp && cl.config.DisableUTP && cl.config.NoDHT {
333 func (cl *Client) listenNetworks() (ns []network) {
334 for _, n := range allPeerNetworks {
335 if cl.listenOnNetwork(n) {
342 func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
343 cfg := dht.ServerConfig{
344 IPBlocklist: cl.ipBlockList,
346 OnAnnouncePeer: cl.onDHTAnnouncePeer,
347 PublicIP: func() net.IP {
348 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
349 return cl.config.PublicIp6
351 return cl.config.PublicIp4
353 StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
354 ConnectionTracking: cl.config.ConnTracker,
355 OnQuery: cl.config.DHTOnQuery,
356 Logger: cl.logger.WithText(func(m log.Msg) string {
357 return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
360 s, err = dht.NewServer(&cfg)
363 ts, err := s.Bootstrap()
365 cl.logger.Printf("error bootstrapping dht: %s", err)
367 log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
373 func (cl *Client) Closed() <-chan struct{} {
379 func (cl *Client) eachDhtServer(f func(DhtServer)) {
380 for _, ds := range cl.dhtServers {
385 // Stops the client. All connections to peers are closed and all activity will
387 func (cl *Client) Close() {
391 for _, t := range cl.torrents {
394 for i := range cl.onClose {
395 cl.onClose[len(cl.onClose)-1-i]()
400 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
401 if cl.ipBlockList == nil {
404 return cl.ipBlockList.Lookup(ip)
407 func (cl *Client) ipIsBlocked(ip net.IP) bool {
408 _, blocked := cl.ipBlockRange(ip)
412 func (cl *Client) wantConns() bool {
413 for _, t := range cl.torrents {
421 func (cl *Client) waitAccept() {
423 if cl.closed.IsSet() {
433 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
434 func (cl *Client) rejectAccepted(conn net.Conn) error {
435 ra := conn.RemoteAddr()
436 if rip := addrIpOrNil(ra); rip != nil {
437 if cl.config.DisableIPv4Peers && rip.To4() != nil {
438 return errors.New("ipv4 peers disabled")
440 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
441 return errors.New("ipv4 disabled")
444 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
445 return errors.New("ipv6 disabled")
447 if cl.rateLimitAccept(rip) {
448 return errors.New("source IP accepted rate limited")
450 if cl.badPeerIPPort(rip, missinggo.AddrPort(ra)) {
451 return errors.New("bad source addr")
457 func (cl *Client) acceptConnections(l net.Listener) {
459 conn, err := l.Accept()
460 torrent.Add("client listener accepts", 1)
461 conn = pproffd.WrapNetConn(conn)
463 closed := cl.closed.IsSet()
466 reject = cl.rejectAccepted(conn)
476 log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
481 torrent.Add("rejected accepted connections", 1)
482 log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
485 go cl.incomingConnection(conn)
487 log.Fmsg("accepted %q connection at %q from %q",
491 ).SetLevel(log.Debug).Log(cl.logger)
492 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
493 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
494 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
499 func regularConnString(nc net.Conn) string {
500 return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
503 func (cl *Client) incomingConnection(nc net.Conn) {
505 if tc, ok := nc.(*net.TCPConn); ok {
508 c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
509 regularConnString(nc))
510 c.Discovery = PeerSourceIncoming
511 cl.runReceivedConn(c)
514 // Returns a handle to the given torrent, if it's present in the client.
515 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
518 t, ok = cl.torrents[ih]
522 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
523 return cl.torrents[ih]
526 type dialResult struct {
531 func countDialResult(err error) {
533 torrent.Add("successful dials", 1)
535 torrent.Add("unsuccessful dials", 1)
539 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
540 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
541 if ret < minDialTimeout {
547 // Returns whether an address is known to connect to a client with our own ID.
548 func (cl *Client) dopplegangerAddr(addr string) bool {
549 _, ok := cl.dopplegangerAddrs[addr]
553 // Returns a connection over UTP or TCP, whichever is first to connect.
554 func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
556 t := perf.NewTimer(perf.CallerName(0))
559 t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
561 t.Mark("returned conn over " + res.Network)
565 ctx, cancel := context.WithCancel(ctx)
566 // As soon as we return one connection, cancel the others.
569 resCh := make(chan dialResult, left)
573 cl.eachDialer(func(s Dialer) bool {
576 //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
579 cl.dialFromSocket(ctx, s, addr),
580 s.LocalAddr().Network(),
587 // Wait for a successful connection.
589 defer perf.ScopeTimer()()
590 for ; left > 0 && res.Conn == nil; left-- {
594 // There are still incompleted dials.
596 for ; left > 0; left-- {
597 conn := (<-resCh).Conn
604 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
606 //if res.Conn != nil {
607 // cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
609 // cl.logger.Printf("failed to dial %s", addr)
614 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
615 network := s.LocalAddr().Network()
616 cte := cl.config.ConnTracker.Wait(
618 conntrack.Entry{network, s.LocalAddr().String(), addr},
619 "dial torrent client",
622 // Try to avoid committing to a dial if the context is complete as it's difficult to determine
623 // which dial errors allow us to forget the connection tracking entry handle.
624 if ctx.Err() != nil {
630 c, err := s.Dial(ctx, addr)
631 // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
632 // it now in case we close the connection forthwith.
633 if tc, ok := c.(*net.TCPConn); ok {
638 if err != nil && forgettableDialError(err) {
645 return closeWrapper{c, func() error {
652 func forgettableDialError(err error) bool {
653 return strings.Contains(err.Error(), "no suitable address found")
656 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
657 if _, ok := t.halfOpen[addr]; !ok {
658 panic("invariant broken")
660 delete(t.halfOpen, addr)
664 // Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
665 // for valid reasons.
666 func (cl *Client) initiateProtocolHandshakes(
670 outgoing, encryptHeader bool,
672 network, connString string,
674 c *PeerConn, err error,
676 c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
677 c.headerEncrypted = encryptHeader
678 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
680 dl, ok := ctx.Deadline()
684 err = nc.SetDeadline(dl)
688 err = cl.initiateHandshakes(c, t)
692 // Returns nil connection and nil error if no connection could be established for valid reasons.
693 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
694 dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
697 return t.dialTimeout()
700 dr := cl.dialFirst(dialCtx, addr.String())
703 if dialCtx.Err() != nil {
704 return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
706 return nil, errors.New("dial failed")
708 c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
715 // Returns nil connection and nil error if no connection could be established
716 // for valid reasons.
717 func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
718 torrent.Add("establish outgoing connection", 1)
719 obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
720 c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
722 torrent.Add("initiated conn with preferred header obfuscation", 1)
725 //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
726 if cl.config.HeaderObfuscationPolicy.RequirePreferred {
727 // We should have just tried with the preferred header obfuscation. If it was required,
728 // there's nothing else to try.
731 // Try again with encryption if we didn't earlier, or without if we did.
732 c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
734 torrent.Add("initiated conn with fallback header obfuscation", 1)
736 //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
740 // Called to dial out and run a connection. The addr we're given is already
741 // considered half-open.
742 func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
743 cl.dialRateLimiter.Wait(context.Background())
744 c, err := cl.establishOutgoingConn(t, addr)
747 // Don't release lock between here and addConnection, unless it's for
749 cl.noLongerHalfOpen(t, addr.String())
752 cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
759 t.runHandshookConnLoggingErr(c)
762 // The port number for incoming peer connections. 0 if the client isn't listening.
763 func (cl *Client) incomingPeerPort() int {
764 return cl.LocalPort()
767 func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
768 if c.headerEncrypted {
771 rw, c.cryptoMethod, err = mse.InitiateHandshake(
778 cl.config.CryptoProvides,
782 return xerrors.Errorf("header obfuscation handshake: %w", err)
785 ih, err := cl.connBtHandshake(c, &t.infoHash)
787 return xerrors.Errorf("bittorrent protocol handshake: %w", err)
789 if ih != t.infoHash {
790 return errors.New("bittorrent protocol handshake: peer infohash didn't match")
795 // Calls f with any secret keys.
796 func (cl *Client) forSkeys(f func([]byte) bool) {
799 if false { // Emulate the bug from #114
801 for ih := range cl.torrents {
805 for range cl.torrents {
812 for ih := range cl.torrents {
819 // Do encryption and bittorrent handshakes as receiver.
820 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
821 defer perf.ScopeTimerErr(&err)()
823 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
825 if err == nil || err == mse.ErrNoSecretKeyMatch {
826 if c.headerEncrypted {
827 torrent.Add("handshakes received encrypted", 1)
829 torrent.Add("handshakes received unencrypted", 1)
832 torrent.Add("handshakes received with error while handling encryption", 1)
835 if err == mse.ErrNoSecretKeyMatch {
840 if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
841 err = errors.New("connection not have required header obfuscation")
844 ih, err := cl.connBtHandshake(c, nil)
846 err = xerrors.Errorf("during bt handshake: %w", err)
855 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
856 res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
861 c.PeerExtensionBytes = res.PeerExtensionBits
862 c.PeerID = res.PeerID
863 c.completedHandshake = time.Now()
867 func (cl *Client) runReceivedConn(c *PeerConn) {
868 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
872 t, err := cl.receiveHandshakes(c)
875 "error receiving handshakes on %v: %s", c, err,
876 ).SetLevel(log.Debug).
878 "network", c.network,
880 torrent.Add("error receiving handshake", 1)
882 cl.onBadAccept(c.remoteAddr)
887 torrent.Add("received handshake for unloaded torrent", 1)
888 log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
890 cl.onBadAccept(c.remoteAddr)
894 torrent.Add("received handshake for loaded torrent", 1)
897 t.runHandshookConnLoggingErr(c)
900 // Client lock must be held before entering this.
901 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
903 if c.PeerID == cl.peerID {
906 addr := c.conn.RemoteAddr().String()
907 cl.dopplegangerAddrs[addr] = struct{}{}
909 // Because the remote address is not necessarily the same as its client's torrent listen
910 // address, we won't record the remote address as a doppleganger. Instead, the initiator
911 // can record *us* as the doppleganger.
913 return errors.New("local and remote peer ids are the same")
915 c.conn.SetWriteDeadline(time.Time{})
916 c.r = deadlineReader{c.conn, c.r}
917 completedHandshakeConnectionFlags.Add(c.ConnectionFlags(), 1)
918 if connIsIpv6(c.conn) {
919 torrent.Add("completed handshake over ipv6", 1)
921 if err := t.addConnection(c); err != nil {
922 return fmt.Errorf("adding connection: %w", err)
924 defer t.dropConnection(c)
925 go c.writer(time.Minute)
926 cl.sendInitialMessages(c, t)
927 err := c.mainReadLoop()
929 return fmt.Errorf("main read loop: %w", err)
934 // See the order given in Transmission's tr_peerMsgsNew.
935 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
936 if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
937 conn.post(pp.Message{
939 ExtendedID: pp.HandshakeExtendedID,
940 ExtendedPayload: func() []byte {
941 msg := pp.ExtendedHandshakeMessage{
942 M: map[pp.ExtensionName]pp.ExtensionNumber{
943 pp.ExtensionNameMetadata: metadataExtendedId,
945 V: cl.config.ExtendedHandshakeClientVersion,
946 Reqq: 64, // TODO: Really?
947 YourIp: pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
948 Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
949 Port: cl.incomingPeerPort(),
950 MetadataSize: torrent.metadataSize(),
951 // TODO: We can figured these out specific to the socket
953 Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
954 Ipv6: cl.config.PublicIp6.To16(),
956 if !cl.config.DisablePEX {
957 msg.M[pp.ExtensionNamePex] = pexExtendedId
959 return bencode.MustMarshal(msg)
964 if conn.fastEnabled() {
965 if torrent.haveAllPieces() {
966 conn.post(pp.Message{Type: pp.HaveAll})
967 conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
969 } else if !torrent.haveAnyPieces() {
970 conn.post(pp.Message{Type: pp.HaveNone})
971 conn.sentHaves.Clear()
977 if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
978 conn.post(pp.Message{
985 func (cl *Client) dhtPort() (ret uint16) {
986 cl.eachDhtServer(func(s DhtServer) {
987 ret = uint16(missinggo.AddrPort(s.Addr()))
992 func (cl *Client) haveDhtServer() (ret bool) {
993 cl.eachDhtServer(func(_ DhtServer) {
999 // Process incoming ut_metadata message.
1000 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
1001 var d map[string]int
1002 err := bencode.Unmarshal(payload, &d)
1003 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
1004 } else if err != nil {
1005 return fmt.Errorf("error unmarshalling bencode: %s", err)
1007 msgType, ok := d["msg_type"]
1009 return errors.New("missing msg_type field")
1013 case pp.DataMetadataExtensionMsgType:
1014 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
1015 if !c.requestedMetadataPiece(piece) {
1016 return fmt.Errorf("got unexpected piece %d", piece)
1018 c.metadataRequests[piece] = false
1019 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1020 if begin < 0 || begin >= len(payload) {
1021 return fmt.Errorf("data has bad offset in payload: %d", begin)
1023 t.saveMetadataPiece(piece, payload[begin:])
1024 c.lastUsefulChunkReceived = time.Now()
1025 return t.maybeCompleteMetadata()
1026 case pp.RequestMetadataExtensionMsgType:
1027 if !t.haveMetadataPiece(piece) {
1028 c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1031 start := (1 << 14) * piece
1032 c.logger.Printf("sending metadata piece %d", piece)
1033 c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1035 case pp.RejectMetadataExtensionMsgType:
1038 return errors.New("unknown msg_type value")
1042 func (cl *Client) badPeerAddr(addr net.Addr) bool {
1043 if ipa, ok := tryIpPortFromNetAddr(addr); ok {
1044 return cl.badPeerIPPort(ipa.IP, ipa.Port)
1049 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1053 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1056 if _, ok := cl.ipBlockRange(ip); ok {
1059 if _, ok := cl.badPeerIPs[ip.String()]; ok {
1065 // Return a Torrent ready for insertion into a Client.
1066 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1067 // use provided storage, if provided
1068 storageClient := cl.defaultStorage
1069 if specStorage != nil {
1070 storageClient = storage.NewClient(specStorage)
1076 peers: prioritizedPeers{
1078 getPrio: func(p PeerInfo) peerPriority {
1079 return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
1082 conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
1084 halfOpen: make(map[string]PeerInfo),
1085 pieceStateChanges: pubsub.NewPubSub(),
1087 storageOpener: storageClient,
1088 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
1090 networkingEnabled: true,
1091 metadataChanged: sync.Cond{
1094 webSeeds: make(map[string]*peer),
1096 t._pendingPieces.NewSet = priorityBitmapStableNewSet
1097 t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
1098 t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
1099 return fmt.Sprintf("%v: %s", t, m.Text())
1101 t.setChunkSize(defaultChunkSize)
1105 // A file-like handle to some torrent data resource.
1106 type Handle interface {
1113 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1114 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1117 // Adds a torrent by InfoHash with a custom Storage implementation.
1118 // If the torrent already exists then this Storage is ignored and the
1119 // existing torrent returned with `new` set to `false`
1120 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1123 t, ok := cl.torrents[infoHash]
1129 t = cl.newTorrent(infoHash, specStorage)
1130 cl.eachDhtServer(func(s DhtServer) {
1131 go t.dhtAnnouncer(s)
1133 cl.torrents[infoHash] = t
1134 cl.clearAcceptLimits()
1135 t.updateWantPeersEvent()
1136 // Tickle Client.waitAccept, new torrent may want conns.
1137 cl.event.Broadcast()
1141 // Add or merge a torrent spec. If the torrent is already present, the
1142 // trackers will be merged with the existing ones. If the Info isn't yet
1143 // known, it will be set. The display name is replaced if the new spec
1144 // provides one. Returns new if the torrent wasn't already in the client.
1145 // Note that any `Storage` defined on the spec will be ignored if the
1146 // torrent is already present (i.e. `new` return value is `true`)
1147 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1148 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1149 if spec.DisplayName != "" {
1150 t.SetDisplayName(spec.DisplayName)
1152 if spec.InfoBytes != nil {
1153 err = t.SetInfoBytes(spec.InfoBytes)
1158 cl.AddDHTNodes(spec.DhtNodes)
1161 for _, url := range spec.Webseeds {
1164 if spec.ChunkSize != 0 {
1165 t.setChunkSize(pp.Integer(spec.ChunkSize))
1167 t.addTrackers(spec.Trackers)
1172 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1173 t, ok := cl.torrents[infoHash]
1175 err = fmt.Errorf("no such torrent")
1182 delete(cl.torrents, infoHash)
1186 func (cl *Client) allTorrentsCompleted() bool {
1187 for _, t := range cl.torrents {
1191 if !t.haveAllPieces() {
1198 // Returns true when all torrents are completely downloaded and false if the
1199 // client is stopped before that.
1200 func (cl *Client) WaitAll() bool {
1203 for !cl.allTorrentsCompleted() {
1204 if cl.closed.IsSet() {
1212 // Returns handles to all the torrents loaded in the Client.
1213 func (cl *Client) Torrents() []*Torrent {
1216 return cl.torrentsAsSlice()
1219 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1220 for _, t := range cl.torrents {
1221 ret = append(ret, t)
1226 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1227 spec, err := TorrentSpecFromMagnetURI(uri)
1231 T, _, err = cl.AddTorrentSpec(spec)
1235 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1236 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1240 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1241 mi, err := metainfo.LoadFromFile(filename)
1245 return cl.AddTorrent(mi)
1248 func (cl *Client) DhtServers() []DhtServer {
1249 return cl.dhtServers
1252 func (cl *Client) AddDHTNodes(nodes []string) {
1253 for _, n := range nodes {
1254 hmp := missinggo.SplitHostMaybePort(n)
1255 ip := net.ParseIP(hmp.Host)
1257 cl.logger.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1260 ni := krpc.NodeInfo{
1261 Addr: krpc.NodeAddr{
1266 cl.eachDhtServer(func(s DhtServer) {
1272 func (cl *Client) banPeerIP(ip net.IP) {
1273 cl.logger.Printf("banning ip %v", ip)
1274 if cl.badPeerIPs == nil {
1275 cl.badPeerIPs = make(map[string]struct{})
1277 cl.badPeerIPs[ip.String()] = struct{}{}
1280 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
1286 PeerMaxRequests: 250,
1288 remoteAddr: remoteAddr,
1290 connString: connString,
1293 writeBuffer: new(bytes.Buffer),
1296 c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
1297 return fmt.Sprintf("%v: %s", c, m.Text())
1299 c.writerCond.L = cl.locker()
1300 c.setRW(connStatsReadWriter{nc, c})
1301 c.r = &rateLimitedReader{
1302 l: cl.config.DownloadRateLimiter,
1305 c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
1309 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) {
1316 t.addPeers([]PeerInfo{{
1317 Addr: ipPortAddr{ip, port},
1318 Source: PeerSourceDhtAnnouncePeer,
1322 func firstNotNil(ips ...net.IP) net.IP {
1323 for _, ip := range ips {
1331 func (cl *Client) eachDialer(f func(Dialer) bool) {
1332 for _, s := range cl.dialers {
1339 func (cl *Client) eachListener(f func(Listener) bool) {
1340 for _, s := range cl.listeners {
1347 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1348 cl.eachListener(func(l Listener) bool {
1355 func (cl *Client) publicIp(peer net.IP) net.IP {
1356 // TODO: Use BEP 10 to determine how peers are seeing us.
1357 if peer.To4() != nil {
1359 cl.config.PublicIp4,
1360 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1365 cl.config.PublicIp6,
1366 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1370 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1371 l := cl.findListener(
1372 func(l net.Listener) bool {
1373 return f(addrIpOrNil(l.Addr()))
1379 return addrIpOrNil(l.Addr())
1382 // Our IP as a peer should see it.
1383 func (cl *Client) publicAddr(peer net.IP) IpPort {
1384 return IpPort{IP: cl.publicIp(peer), Port: uint16(cl.incomingPeerPort())}
1387 // ListenAddrs addresses currently being listened to.
1388 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1391 cl.eachListener(func(l Listener) bool {
1392 ret = append(ret, l.Addr())
1398 func (cl *Client) onBadAccept(addr net.Addr) {
1399 ipa, ok := tryIpPortFromNetAddr(addr)
1403 ip := maskIpForAcceptLimiting(ipa.IP)
1404 if cl.acceptLimiter == nil {
1405 cl.acceptLimiter = make(map[ipStr]int)
1407 cl.acceptLimiter[ipStr(ip.String())]++
1410 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1411 if ip4 := ip.To4(); ip4 != nil {
1412 return ip4.Mask(net.CIDRMask(24, 32))
1417 func (cl *Client) clearAcceptLimits() {
1418 cl.acceptLimiter = nil
1421 func (cl *Client) acceptLimitClearer() {
1424 case <-cl.closed.LockedChan(cl.locker()):
1426 case <-time.After(15 * time.Minute):
1428 cl.clearAcceptLimits()
1434 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1435 if cl.config.DisableAcceptRateLimiting {
1438 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
1441 func (cl *Client) rLock() {
1445 func (cl *Client) rUnlock() {
1449 func (cl *Client) lock() {
1453 func (cl *Client) unlock() {
1457 func (cl *Client) locker() *lockWithDeferreds {
1461 func (cl *Client) String() string {
1462 return fmt.Sprintf("<%[1]T %[1]p>", cl)