17 "github.com/anacrolix/missinggo/perf"
19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/dht/krpc"
21 "github.com/anacrolix/log"
22 "github.com/anacrolix/missinggo"
23 "github.com/anacrolix/missinggo/pproffd"
24 "github.com/anacrolix/missinggo/pubsub"
25 "github.com/anacrolix/missinggo/slices"
26 "github.com/anacrolix/sync"
27 "github.com/davecgh/go-spew/spew"
28 "github.com/dustin/go-humanize"
29 "github.com/google/btree"
31 "github.com/anacrolix/torrent/bencode"
32 "github.com/anacrolix/torrent/iplist"
33 "github.com/anacrolix/torrent/metainfo"
34 "github.com/anacrolix/torrent/mse"
35 pp "github.com/anacrolix/torrent/peer_protocol"
36 "github.com/anacrolix/torrent/storage"
39 // Clients contain zero or more Torrents. A Client manages a blocklist, the
40 // TCP/UDP protocol ports, and DHT as desired.
42 // An aggregate of stats over all connections. First in struct to ensure
43 // 64-bit alignment of fields. See #262.
47 closed missinggo.Event
54 defaultStorage *storage.Client
57 dhtServers []*dht.Server
58 ipBlockList iplist.Ranger
59 // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
60 extensionBytes pp.PeerExtensionBits
62 // Set of addresses that have our client ID. This intentionally will
63 // include ourselves if we end up trying to connect to our own address
64 // through legitimate channels.
65 dopplegangerAddrs map[string]struct{}
66 badPeerIPs map[string]struct{}
67 torrents map[metainfo.Hash]*Torrent
69 acceptLimiter map[ipStr]int
74 func (cl *Client) BadPeerIPs() []string {
77 return cl.badPeerIPsLocked()
80 func (cl *Client) badPeerIPsLocked() []string {
81 return slices.FromMapKeys(cl.badPeerIPs).([]string)
84 func (cl *Client) PeerID() PeerID {
88 type torrentAddr string
90 func (torrentAddr) Network() string { return "" }
92 func (me torrentAddr) String() string { return string(me) }
94 func (cl *Client) LocalPort() (port int) {
95 cl.eachListener(func(l socket) bool {
96 _port := missinggo.AddrPort(l.Addr())
102 } else if port != _port {
103 panic("mismatched ports")
110 func writeDhtServerStatus(w io.Writer, s *dht.Server) {
111 dhtStats := s.Stats()
112 fmt.Fprintf(w, "\t# Nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
113 fmt.Fprintf(w, "\tServer ID: %x\n", s.ID())
114 fmt.Fprintf(w, "\tAnnounces: %d\n", dhtStats.SuccessfulOutboundAnnouncePeerQueries)
115 fmt.Fprintf(w, "\tOutstanding transactions: %d\n", dhtStats.OutstandingTransactions)
118 // Writes out a human readable status of the client, such as for writing to a
120 func (cl *Client) WriteStatus(_w io.Writer) {
122 defer cl.mu.RUnlock()
123 w := bufio.NewWriter(_w)
125 fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
126 fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
127 fmt.Fprintf(w, "Announce key: %x\n", cl.announceKey())
128 fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
129 cl.eachDhtServer(func(s *dht.Server) {
130 fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
131 writeDhtServerStatus(w, s)
133 spew.Fdump(w, cl.stats)
134 fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
136 for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137 return l.InfoHash().AsString() < r.InfoHash().AsString()
140 fmt.Fprint(w, "<unknown name>")
142 fmt.Fprint(w, t.name())
146 fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
148 w.WriteString("<missing metainfo>")
156 const debugLogValue = "debug"
158 func (cl *Client) debugLogFilter(m *log.Msg) bool {
159 if !cl.config.Debug {
160 _, ok := m.Values()[debugLogValue]
166 func (cl *Client) initLogger() {
167 cl.logger = log.Default.Clone().AddValue(cl).AddFilter(log.NewFilter(cl.debugLogFilter))
170 func (cl *Client) announceKey() int32 {
171 return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
174 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
176 cfg = NewDefaultClientConfig()
184 halfOpenLimit: cfg.HalfOpenConnsPerTorrent,
186 dopplegangerAddrs: make(map[string]struct{}),
187 torrents: make(map[metainfo.Hash]*Torrent),
189 go cl.acceptLimitClearer()
197 cl.extensionBytes = defaultPeerExtensionBytes()
199 storageImpl := cfg.DefaultStorage
200 if storageImpl == nil {
201 // We'd use mmap but HFS+ doesn't support sparse files.
202 storageImpl = storage.NewFile(cfg.DataDir)
203 cl.onClose = append(cl.onClose, func() {
204 if err := storageImpl.Close(); err != nil {
205 log.Printf("error closing default storage: %s", err)
209 cl.defaultStorage = storage.NewClient(storageImpl)
210 if cfg.IPBlocklist != nil {
211 cl.ipBlockList = cfg.IPBlocklist
214 if cfg.PeerID != "" {
215 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
217 o := copy(cl.peerID[:], cfg.Bep20)
218 _, err = rand.Read(cl.peerID[o:])
220 panic("error generating peer id")
224 cl.conns, err = listenAll(cl.enabledPeerNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.config.ProxyURL)
231 for _, s := range cl.conns {
232 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
233 go cl.acceptConnections(s)
239 for _, s := range cl.conns {
240 if pc, ok := s.(net.PacketConn); ok {
241 ds, err := cl.newDhtServer(pc)
245 cl.dhtServers = append(cl.dhtServers, ds)
253 func (cl *Client) enabledPeerNetworks() (ns []string) {
254 for _, n := range allPeerNetworks {
255 if peerNetworkEnabled(n, cl.config) {
262 func (cl *Client) newDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
263 cfg := dht.ServerConfig{
264 IPBlocklist: cl.ipBlockList,
266 OnAnnouncePeer: cl.onDHTAnnouncePeer,
267 PublicIP: func() net.IP {
268 if connIsIpv6(conn) && cl.config.PublicIp6 != nil {
269 return cl.config.PublicIp6
271 return cl.config.PublicIp4
273 StartingNodes: cl.config.DhtStartingNodes,
275 s, err = dht.NewServer(&cfg)
278 if _, err := s.Bootstrap(); err != nil {
279 log.Printf("error bootstrapping dht: %s", err)
286 func firstNonEmptyString(ss ...string) string {
287 for _, s := range ss {
295 func (cl *Client) Closed() <-chan struct{} {
301 func (cl *Client) eachDhtServer(f func(*dht.Server)) {
302 for _, ds := range cl.dhtServers {
307 func (cl *Client) closeSockets() {
308 cl.eachListener(func(l socket) bool {
315 // Stops the client. All connections to peers are closed and all activity will
317 func (cl *Client) Close() {
321 cl.eachDhtServer(func(s *dht.Server) { s.Close() })
323 for _, t := range cl.torrents {
326 for _, f := range cl.onClose {
332 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
333 if cl.ipBlockList == nil {
336 return cl.ipBlockList.Lookup(ip)
339 func (cl *Client) ipIsBlocked(ip net.IP) bool {
340 _, blocked := cl.ipBlockRange(ip)
344 func (cl *Client) waitAccept() {
346 for _, t := range cl.torrents {
351 if cl.closed.IsSet() {
358 func (cl *Client) rejectAccepted(conn net.Conn) bool {
359 ra := conn.RemoteAddr()
360 rip := missinggo.AddrIP(ra)
361 if cl.config.DisableIPv4Peers && rip.To4() != nil {
364 if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
367 if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
370 if cl.rateLimitAccept(rip) {
373 return cl.badPeerIPPort(rip, missinggo.AddrPort(ra))
376 func (cl *Client) acceptConnections(l net.Listener) {
378 conn, err := l.Accept()
379 conn = pproffd.WrapNetConn(conn)
381 closed := cl.closed.IsSet()
384 reject = cl.rejectAccepted(conn)
395 // I think something harsher should happen here? Our accept
396 // routine just fucked off.
401 torrent.Add("rejected accepted connections", 1)
404 go cl.incomingConnection(conn)
406 log.Fmsg("accepted %s connection from %s", conn.RemoteAddr().Network(), conn.RemoteAddr()).AddValue(debugLogValue).Log(cl.logger)
407 torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(missinggo.AddrIP(conn.RemoteAddr()))), 1)
408 torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
409 torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
414 func (cl *Client) incomingConnection(nc net.Conn) {
416 if tc, ok := nc.(*net.TCPConn); ok {
419 c := cl.newConnection(nc, false)
420 c.Discovery = peerSourceIncoming
421 cl.runReceivedConn(c)
424 // Returns a handle to the given torrent, if it's present in the client.
425 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
428 t, ok = cl.torrents[ih]
432 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
433 return cl.torrents[ih]
436 type dialResult struct {
440 func countDialResult(err error) {
442 torrent.Add("successful dials", 1)
444 torrent.Add("unsuccessful dials", 1)
448 func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
449 ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
450 if ret < minDialTimeout {
456 // Returns whether an address is known to connect to a client with our own ID.
457 func (cl *Client) dopplegangerAddr(addr string) bool {
458 _, ok := cl.dopplegangerAddrs[addr]
462 func ipNetworkSuffix(allowIpv4, allowIpv6 bool) string {
464 case allowIpv4 && allowIpv6:
466 case allowIpv4 && !allowIpv6:
468 case !allowIpv4 && allowIpv6:
471 panic("unhandled ip network combination")
475 func dialUTP(ctx context.Context, addr string, sock utpSocket) (c net.Conn, err error) {
476 return sock.DialContext(ctx, "", addr)
479 var allPeerNetworks = []string{"tcp4", "tcp6", "udp4", "udp6"}
481 func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
482 c := func(s string) bool {
483 return strings.Contains(network, s)
486 if c("udp") || c("utp") {
490 if cfg.DisableTCP && c("tcp") {
493 if cfg.DisableIPv6 && c("6") {
499 // Returns a connection over UTP or TCP, whichever is first to connect.
500 func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
501 ctx, cancel := context.WithCancel(ctx)
502 // As soon as we return one connection, cancel the others.
505 resCh := make(chan dialResult, left)
506 dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
509 c, err := f(ctx, addr)
510 // This is a bit optimistic, but it looks non-trivial to thread
511 // this through the proxy code. Set it now in case we close the
512 // connection forthwith.
513 if tc, ok := c.(*net.TCPConn); ok {
517 resCh <- dialResult{c}
523 cl.eachListener(func(s socket) bool {
524 if peerNetworkEnabled(s.Addr().Network(), cl.config) {
531 // Wait for a successful connection.
533 defer perf.ScopeTimer()()
534 for ; left > 0 && res.Conn == nil; left-- {
538 // There are still incompleted dials.
540 for ; left > 0; left-- {
541 conn := (<-resCh).Conn
548 go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
553 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
554 if _, ok := t.halfOpen[addr]; !ok {
555 panic("invariant broken")
557 delete(t.halfOpen, addr)
561 // Performs initiator handshakes and returns a connection. Returns nil
562 // *connection if no connection for valid reasons.
563 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
564 c = cl.newConnection(nc, true)
565 c.headerEncrypted = encryptHeader
566 ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
568 dl, ok := ctx.Deadline()
572 err = nc.SetDeadline(dl)
576 ok, err = cl.initiateHandshakes(c, t)
583 // Returns nil connection and nil error if no connection could be established
584 // for valid reasons.
585 func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
586 nc := cl.dialFirst(ctx, addr)
591 if c == nil || err != nil {
595 return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
598 // Returns nil connection and nil error if no connection could be established
599 // for valid reasons.
600 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
601 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
603 obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
604 c, err = cl.establishOutgoingConnEx(t, addr, ctx, obfuscatedHeaderFirst)
609 torrent.Add("initiated conn with preferred header obfuscation", 1)
612 if cl.config.ForceEncryption {
613 // We should have just tried with an obfuscated header. A plaintext
614 // header can't result in an encrypted connection, so we're done.
615 if !obfuscatedHeaderFirst {
616 panic(cl.config.EncryptionPolicy)
620 // Try again with encryption if we didn't earlier, or without if we did.
621 c, err = cl.establishOutgoingConnEx(t, addr, ctx, !obfuscatedHeaderFirst)
623 torrent.Add("initiated conn with fallback header obfuscation", 1)
628 // Called to dial out and run a connection. The addr we're given is already
629 // considered half-open.
630 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
631 c, err := cl.establishOutgoingConn(t, addr)
634 // Don't release lock between here and addConnection, unless it's for
636 cl.noLongerHalfOpen(t, addr)
639 log.Printf("error establishing outgoing connection: %s", err)
648 cl.runHandshookConn(c, t)
651 // The port number for incoming peer connections. 0 if the client isn't
653 func (cl *Client) incomingPeerPort() int {
654 return cl.LocalPort()
657 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
658 if c.headerEncrypted {
660 rw, c.cryptoMethod, err = mse.InitiateHandshake(
667 func() mse.CryptoMethod {
669 case cl.config.ForceEncryption:
670 return mse.CryptoMethodRC4
671 case cl.config.DisableEncryption:
672 return mse.CryptoMethodPlaintext
674 return mse.AllSupportedCrypto
683 ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
684 if ih != t.infoHash {
690 // Calls f with any secret keys.
691 func (cl *Client) forSkeys(f func([]byte) bool) {
694 for ih := range cl.torrents {
701 // Do encryption and bittorrent handshakes as receiver.
702 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
703 defer perf.ScopeTimerErr(&err)()
705 rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
708 if err == mse.ErrNoSecretKeyMatch {
713 if cl.config.ForceEncryption && !c.headerEncrypted {
714 err = errors.New("connection not encrypted")
717 ih, ok, err := cl.connBTHandshake(c, nil)
719 err = fmt.Errorf("error during bt handshake: %s", err)
731 // Returns !ok if handshake failed for valid reasons.
732 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
733 res, ok, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
734 if err != nil || !ok {
738 c.PeerExtensionBytes = res.PeerExtensionBits
739 c.PeerID = res.PeerID
740 c.completedHandshake = time.Now()
744 func (cl *Client) runReceivedConn(c *connection) {
745 err := c.conn.SetDeadline(time.Now().Add(cl.config.HandshakesTimeout))
749 t, err := cl.receiveHandshakes(c)
752 "error receiving handshakes: %s", err,
756 "network", c.remoteAddr().Network(),
758 torrent.Add("error receiving handshake", 1)
760 cl.onBadAccept(c.remoteAddr())
765 torrent.Add("received handshake for unloaded torrent", 1)
767 cl.onBadAccept(c.remoteAddr())
771 torrent.Add("received handshake for loaded torrent", 1)
774 cl.runHandshookConn(c, t)
777 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
779 if c.PeerID == cl.peerID {
782 addr := c.conn.RemoteAddr().String()
783 cl.dopplegangerAddrs[addr] = struct{}{}
785 // Because the remote address is not necessarily the same as its
786 // client's torrent listen address, we won't record the remote address
787 // as a doppleganger. Instead, the initiator can record *us* as the
792 c.conn.SetWriteDeadline(time.Time{})
793 c.r = deadlineReader{c.conn, c.r}
794 completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
795 if connIsIpv6(c.conn) {
796 torrent.Add("completed handshake over ipv6", 1)
798 if err := t.addConnection(c); err != nil {
799 log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
802 defer t.dropConnection(c)
803 go c.writer(time.Minute)
804 cl.sendInitialMessages(c, t)
805 err := c.mainReadLoop()
806 if err != nil && cl.config.Debug {
807 log.Printf("error during connection main read loop: %s", err)
811 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
813 if conn.fastEnabled() {
814 if torrent.haveAllPieces() {
815 conn.Post(pp.Message{Type: pp.HaveAll})
816 conn.sentHaves.AddRange(0, conn.t.NumPieces())
818 } else if !torrent.haveAnyPieces() {
819 conn.Post(pp.Message{Type: pp.HaveNone})
820 conn.sentHaves.Clear()
826 if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
827 conn.Post(pp.Message{
829 ExtendedID: pp.HandshakeExtendedID,
830 ExtendedPayload: func() []byte {
831 d := map[string]interface{}{
832 "m": func() (ret map[string]int) {
833 ret = make(map[string]int, 2)
834 ret["ut_metadata"] = metadataExtendedId
835 if !cl.config.DisablePEX {
836 ret["ut_pex"] = pexExtendedId
840 "v": cl.config.ExtendedHandshakeClientVersion,
841 // No upload queue is implemented yet.
844 if !cl.config.DisableEncryption {
847 if torrent.metadataSizeKnown() {
848 d["metadata_size"] = torrent.metadataSize()
850 if p := cl.incomingPeerPort(); p != 0 {
853 yourip, err := addrCompactIP(conn.remoteAddr())
855 log.Printf("error calculating yourip field value in extension handshake: %s", err)
859 // log.Printf("sending %v", d)
860 b, err := bencode.Marshal(d)
868 if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.haveDhtServer() {
869 conn.Post(pp.Message{
876 func (cl *Client) dhtPort() (ret uint16) {
877 cl.eachDhtServer(func(s *dht.Server) {
878 ret = uint16(missinggo.AddrPort(s.Addr()))
883 func (cl *Client) haveDhtServer() (ret bool) {
884 cl.eachDhtServer(func(_ *dht.Server) {
890 // Process incoming ut_metadata message.
891 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
893 err := bencode.Unmarshal(payload, &d)
894 if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
895 } else if err != nil {
896 return fmt.Errorf("error unmarshalling bencode: %s", err)
898 msgType, ok := d["msg_type"]
900 return errors.New("missing msg_type field")
904 case pp.DataMetadataExtensionMsgType:
905 if !c.requestedMetadataPiece(piece) {
906 return fmt.Errorf("got unexpected piece %d", piece)
908 c.metadataRequests[piece] = false
909 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
910 if begin < 0 || begin >= len(payload) {
911 return fmt.Errorf("data has bad offset in payload: %d", begin)
913 t.saveMetadataPiece(piece, payload[begin:])
914 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
915 c.lastUsefulChunkReceived = time.Now()
916 return t.maybeCompleteMetadata()
917 case pp.RequestMetadataExtensionMsgType:
918 if !t.haveMetadataPiece(piece) {
919 c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
922 start := (1 << 14) * piece
923 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
925 case pp.RejectMetadataExtensionMsgType:
928 return errors.New("unknown msg_type value")
932 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
936 if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
939 if _, ok := cl.ipBlockRange(ip); ok {
942 if _, ok := cl.badPeerIPs[ip.String()]; ok {
948 // Return a Torrent ready for insertion into a Client.
949 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
950 // use provided storage, if provided
951 storageClient := cl.defaultStorage
952 if specStorage != nil {
953 storageClient = storage.NewClient(specStorage)
959 peers: prioritizedPeers{
961 getPrio: func(p Peer) peerPriority {
962 return bep40PriorityIgnoreError(cl.publicAddr(p.IP), p.addr())
965 conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
967 halfOpen: make(map[string]Peer),
968 pieceStateChanges: pubsub.NewPubSub(),
970 storageOpener: storageClient,
971 maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
973 networkingEnabled: true,
975 metadataChanged: sync.Cond{
978 duplicateRequestTimeout: 1 * time.Second,
980 t.logger = cl.logger.Clone().AddValue(t)
981 t.setChunkSize(defaultChunkSize)
985 // A file-like handle to some torrent data resource.
986 type Handle interface {
993 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
994 return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
997 // Adds a torrent by InfoHash with a custom Storage implementation.
998 // If the torrent already exists then this Storage is ignored and the
999 // existing torrent returned with `new` set to `false`
1000 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1002 defer cl.mu.Unlock()
1003 t, ok := cl.torrents[infoHash]
1009 t = cl.newTorrent(infoHash, specStorage)
1010 cl.eachDhtServer(func(s *dht.Server) {
1011 go t.dhtAnnouncer(s)
1013 cl.torrents[infoHash] = t
1014 cl.clearAcceptLimits()
1015 t.updateWantPeersEvent()
1016 // Tickle Client.waitAccept, new torrent may want conns.
1017 cl.event.Broadcast()
1021 // Add or merge a torrent spec. If the torrent is already present, the
1022 // trackers will be merged with the existing ones. If the Info isn't yet
1023 // known, it will be set. The display name is replaced if the new spec
1024 // provides one. Returns new if the torrent wasn't already in the client.
1025 // Note that any `Storage` defined on the spec will be ignored if the
1026 // torrent is already present (i.e. `new` return value is `true`)
1027 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1028 t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1029 if spec.DisplayName != "" {
1030 t.SetDisplayName(spec.DisplayName)
1032 if spec.InfoBytes != nil {
1033 err = t.SetInfoBytes(spec.InfoBytes)
1039 defer cl.mu.Unlock()
1040 if spec.ChunkSize != 0 {
1041 t.setChunkSize(pp.Integer(spec.ChunkSize))
1043 t.addTrackers(spec.Trackers)
1048 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1049 t, ok := cl.torrents[infoHash]
1051 err = fmt.Errorf("no such torrent")
1058 delete(cl.torrents, infoHash)
1062 func (cl *Client) allTorrentsCompleted() bool {
1063 for _, t := range cl.torrents {
1067 if !t.haveAllPieces() {
1074 // Returns true when all torrents are completely downloaded and false if the
1075 // client is stopped before that.
1076 func (cl *Client) WaitAll() bool {
1078 defer cl.mu.Unlock()
1079 for !cl.allTorrentsCompleted() {
1080 if cl.closed.IsSet() {
1088 // Returns handles to all the torrents loaded in the Client.
1089 func (cl *Client) Torrents() []*Torrent {
1091 defer cl.mu.Unlock()
1092 return cl.torrentsAsSlice()
1095 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1096 for _, t := range cl.torrents {
1097 ret = append(ret, t)
1102 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1103 spec, err := TorrentSpecFromMagnetURI(uri)
1107 T, _, err = cl.AddTorrentSpec(spec)
1111 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1112 T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1114 slices.MakeInto(&ss, mi.Nodes)
1119 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1120 mi, err := metainfo.LoadFromFile(filename)
1124 return cl.AddTorrent(mi)
1127 func (cl *Client) DhtServers() []*dht.Server {
1128 return cl.dhtServers
1131 func (cl *Client) AddDHTNodes(nodes []string) {
1132 for _, n := range nodes {
1133 hmp := missinggo.SplitHostMaybePort(n)
1134 ip := net.ParseIP(hmp.Host)
1136 log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1139 ni := krpc.NodeInfo{
1140 Addr: krpc.NodeAddr{
1145 cl.eachDhtServer(func(s *dht.Server) {
1151 func (cl *Client) banPeerIP(ip net.IP) {
1152 if cl.badPeerIPs == nil {
1153 cl.badPeerIPs = make(map[string]struct{})
1155 cl.badPeerIPs[ip.String()] = struct{}{}
1158 func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
1164 PeerMaxRequests: 250,
1165 writeBuffer: new(bytes.Buffer),
1167 c.writerCond.L = &cl.mu
1168 c.setRW(connStatsReadWriter{nc, c})
1169 c.r = &rateLimitedReader{
1170 l: cl.config.DownloadRateLimiter,
1176 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1178 defer cl.mu.Unlock()
1186 Source: peerSourceDHTAnnouncePeer,
1190 func firstNotNil(ips ...net.IP) net.IP {
1191 for _, ip := range ips {
1199 func (cl *Client) eachListener(f func(socket) bool) {
1200 for _, s := range cl.conns {
1207 func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
1208 cl.eachListener(func(l socket) bool {
1215 func (cl *Client) publicIp(peer net.IP) net.IP {
1216 // TODO: Use BEP 10 to determine how peers are seeing us.
1217 if peer.To4() != nil {
1219 cl.config.PublicIp4,
1220 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() != nil }),
1224 cl.config.PublicIp6,
1225 cl.findListenerIp(func(ip net.IP) bool { return ip.To4() == nil }),
1230 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
1231 return missinggo.AddrIP(cl.findListener(func(l net.Listener) bool {
1232 return f(missinggo.AddrIP(l.Addr()))
1236 // Our IP as a peer should see it.
1237 func (cl *Client) publicAddr(peer net.IP) ipPort {
1238 return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
1241 func (cl *Client) ListenAddrs() (ret []net.Addr) {
1243 defer cl.mu.Unlock()
1244 cl.eachListener(func(l socket) bool {
1245 ret = append(ret, l.Addr())
1251 func (cl *Client) onBadAccept(addr net.Addr) {
1252 ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
1253 if cl.acceptLimiter == nil {
1254 cl.acceptLimiter = make(map[ipStr]int)
1256 cl.acceptLimiter[ipStr(ip.String())]++
1259 func maskIpForAcceptLimiting(ip net.IP) net.IP {
1260 if ip4 := ip.To4(); ip4 != nil {
1261 return ip4.Mask(net.CIDRMask(24, 32))
1266 func (cl *Client) clearAcceptLimits() {
1267 cl.acceptLimiter = nil
1270 func (cl *Client) acceptLimitClearer() {
1273 case <-cl.closed.LockedChan(&cl.mu):
1275 case <-time.After(15 * time.Minute):
1277 cl.clearAcceptLimits()
1283 func (cl *Client) rateLimitAccept(ip net.IP) bool {
1284 if cl.config.DisableAcceptRateLimiting {
1287 return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0