From 9a9dcf25560408bab7d7595b62140ed7725bd96c Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Thu, 20 Apr 2023 10:42:38 +0300 Subject: [PATCH] Initial draft client --- README | 1 + cmd/btrtrc/README | 25 +++ cmd/btrtrc/USAGE | 73 +++++++ cmd/btrtrc/colour.go | 25 +++ cmd/btrtrc/fifos.go | 435 ++++++++++++++++++++++++++++++++++++++++++ cmd/btrtrc/main.go | 96 ++++++++++ cmd/btrtrc/pc.go | 68 +++++++ cmd/btrtrc/seed-feed | 12 ++ cmd/btrtrc/sort.go | 52 +++++ cmd/btrtrc/status.go | 49 +++++ cmd/btrtrc/txstats.go | 74 +++++++ cmd/btrtrc/verify.go | 54 ++++++ peer.go | 22 +++ storage/file-piece.go | 2 +- storage/file.go | 119 ++++++++---- storage/shortener.go | 30 +++ 16 files changed, 1103 insertions(+), 34 deletions(-) create mode 120000 README create mode 100644 cmd/btrtrc/README create mode 100644 cmd/btrtrc/USAGE create mode 100644 cmd/btrtrc/colour.go create mode 100644 cmd/btrtrc/fifos.go create mode 100644 cmd/btrtrc/main.go create mode 100644 cmd/btrtrc/pc.go create mode 100755 cmd/btrtrc/seed-feed create mode 100644 cmd/btrtrc/sort.go create mode 100644 cmd/btrtrc/status.go create mode 100644 cmd/btrtrc/txstats.go create mode 100644 cmd/btrtrc/verify.go create mode 100644 storage/shortener.go diff --git a/README b/README new file mode 120000 index 00000000..a4787965 --- /dev/null +++ b/README @@ -0,0 +1 @@ +cmd/btrtrc/README \ No newline at end of file diff --git a/cmd/btrtrc/README b/cmd/btrtrc/README new file mode 100644 index 00000000..86f1c25d --- /dev/null +++ b/cmd/btrtrc/README @@ -0,0 +1,25 @@ +btrtrc -- better BitTorrent client + +This is a fork of https://github.com/anacrolix/torrent BitTorrent +library with own cmd/btrtrc client implementation. Comparing to +cmd/torrent it has much less configuration options, mainly hardcoding +the most of them. But what advantages does it have? + +* Optimized file-based storage: + * linearized I/O operations prevent creation of huge quantity of threads + * cached file descriptors save a lot of syscalls +* Shortened long filenames in file-based storage +* Ability to specify both IPv4 and IPv6 addresses to announce +* Ability to specify DHT bootstrap nodes +* Dynamic addition and removing of the torrents +* Much richer and nicer coloured status output +* Ability to calculate seed ratio, by remembering outgoing traffic amount +* Simpler piece completion database per each torrent in separate files +* Ability to sequentially create/verify torrent's piece completion database +* Optional commit disables any WebTorrent/WebRTC/WebSocket usage, + because those technologies suck + +Look at USAGE file further. + +btrtrc is free software: see the file LICENSE in the root of the +repository for copying conditions. diff --git a/cmd/btrtrc/USAGE b/cmd/btrtrc/USAGE new file mode 100644 index 00000000..8800d9b3 --- /dev/null +++ b/cmd/btrtrc/USAGE @@ -0,0 +1,73 @@ +You have to specify correct -bind, -4 and -6 addresses. If -4/-6 is +empty, then it won't be announced. + +Each second the current time, overall downloaded/uploaded traffic +amount, number of active peers and current download/upload speed in +KiB/sec will be shown. + +fifos subdirectory will be created with following FIFO files: + +* fifos/add -- expects newline delimited paths to .torrent files or + magnet: links +* fifos/del -- expects torrent's info hashes to remove torrents from the + client +* fifos/list -- prints coloured output of all registered torrents in the + client. It shows: + * info hash (that you can use in all other FIFOs) + * name (name of the file or root directory as a rule) + * total size + * completion percentage + * seed ratio + * download/upload speeds in KiB/sec + * number of: total/pending/active/seeder peers + * (optionally) estimated completion time +* fifos/dht -- prints DHT server statistics +* fifos/files/HASH -- prints torrent's file list with completion ratio +* fifos/peers/HASH -- prints torrent's connected peers information: + * peer's ID + * status flags, that are concatenated string of: + i -- am interested + c -- am chocking + - + E -- RC4 encryption + e -- header encryption + Tr -- tracker + I -- incoming + Hg -- DHT get_peers + Ha -- DHT announce_peer + X -- PEX + M -- direct (through magnet:) + U -- UTP + - + i -- he interested + c -- he chocking + * number of completed pieces + * download/upload speeds in KiB/sec during current session + * amount of downloaded/uploaded traffic during current session + * remote address with port + * client's name +* fifos/top-seed -- list torrents sorted by total transfer data amount + +For each torrent, corresponding .torrent file will be created. +Additional symbolic link with torrent's name will lead to HASH.torrent. +HASH.bf file is piece completion database. HASH.tx contains overall +outgoing payload traffic amount and it is updated each 10sec. + +If you massively add a bunch of unverified torrents, then +github.com/anacrolix/torrent will deal with them in parallel. That means +hash verification is done in random order from HDD's point of view, that +is rather slow. If you are going to seed many torrents, then it is +highly advisable to pre-verify them in advance, by using -verify option +with path to all .torrent files. + +Example usage for starting torrents seeding: + + $ cat > seedfile < fifos/add + $ cat fifos/list diff --git a/cmd/btrtrc/colour.go b/cmd/btrtrc/colour.go new file mode 100644 index 00000000..eb664e1a --- /dev/null +++ b/cmd/btrtrc/colour.go @@ -0,0 +1,25 @@ +package main + +import ( + "bytes" + + "golang.org/x/term" +) + +var ( + Blue string + Green string + Magenta string + Red string + Reset string +) + +func init() { + var b bytes.Buffer + t := term.NewTerminal(&b, "") + Blue = string(t.Escape.Blue) + Green = string(t.Escape.Green) + Magenta = string(t.Escape.Magenta) + Red = string(t.Escape.Red) + Reset = string(t.Escape.Reset) +} diff --git a/cmd/btrtrc/fifos.go b/cmd/btrtrc/fifos.go new file mode 100644 index 00000000..c723be04 --- /dev/null +++ b/cmd/btrtrc/fifos.go @@ -0,0 +1,435 @@ +package main + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "log" + "os" + "path" + "sort" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/anacrolix/dht/v2" + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" + "github.com/anacrolix/torrent/types/infohash" + "github.com/dustin/go-humanize" +) + +const ( + MaxListNameWidth = 40 + PeersDir = "peers" + FilesDir = "files" +) + +type TorrentStat struct { + stats torrent.ConnStats + rxSpeed int64 + txSpeed int64 +} + +var ( + FIFOsDir = "fifos" + TorrentStats = map[metainfo.Hash]TorrentStat{} + TorrentStatsM sync.RWMutex + Torrents []metainfo.Hash + TorrentsM sync.RWMutex +) + +func recreateFIFO(pth string) { + os.Remove(pth) + if err := syscall.Mkfifo(pth, 0666); err != nil { + log.Fatalln(err) + } +} + +func shortenName(name string) string { + s := []rune(name) + if len(s) > MaxListNameWidth { + s = s[:MaxListNameWidth] + } + return string(s) +} + +func fifoList(c *torrent.Client) { + pth := path.Join(FIFOsDir, "list") + recreateFIFO(pth) + for { + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + continue + } + ts := make([]*torrent.Torrent, 0, len(Torrents)) + TorrentsM.RLock() + for _, h := range Torrents { + t, _ := c.Torrent(h) + if t == nil || t.Info() == nil { + fmt.Fprintf(fd, "%s not ready\n", t.Name()) + continue + } + ts = append(ts, t) + } + TorrentsM.RUnlock() + for _, t := range ts { + stats := t.Stats() + done := t.BytesCompleted() * 100 / t.Length() + percColour := Red + if done == 100 { + percColour = Green + } + tx := stats.BytesWrittenData.Int64() + tx += TxStats[t.InfoHash()] + ratio := float64(tx) / float64(t.Length()) + TorrentStatsM.RLock() + prev := TorrentStats[t.InfoHash()] + TorrentStatsM.RUnlock() + var eta string + if done < 100 && prev.rxSpeed > 0 { + etaRaw := time.Duration((t.Length() - t.BytesCompleted()) / prev.rxSpeed) + etaRaw *= time.Second + eta = etaRaw.String() + } + fmt.Fprintf(fd, + "%s%s%s %s%40s%s %8s %s%3d%%%s %4.1f %s%d%s/%s%d%s %d/%d/%d/%d %s\n", + Blue, t.InfoHash().HexString(), Reset, + Green, shortenName(t.Name()), Reset, + humanize.IBytes(uint64(t.Length())), + percColour, done, Reset, + ratio, + Green, prev.rxSpeed/1024, Reset, + Magenta, prev.txSpeed/1024, Reset, + stats.TotalPeers, + stats.PendingPeers, + stats.ActivePeers, + stats.ConnectedSeeders, + eta, + ) + } + fd.Close() + time.Sleep(time.Second) + } +} + +func mustParseInt(s string) int { + i, err := strconv.Atoi(s) + if err != nil { + log.Fatalln(err) + } + return i +} + +func fifoPeerList(t *torrent.Torrent) { + pth := path.Join(FIFOsDir, PeersDir, t.InfoHash().HexString()) + recreateFIFO(pth) + for { + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + if os.IsNotExist(err) { + break + } + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + continue + } + pcs := t.PeerConns() + sort.Sort(ByPeerID(pcs)) + for _, pc := range pcs { + cols := strings.Split(pc.CompletedString(), "/") + done := (mustParseInt(cols[0]) * 100) / mustParseInt(cols[1]) + doneColour := Red + if done == 100 { + doneColour = Green + } + stats := pc.Peer.Stats() + fmt.Fprintf(fd, + "%s%s%s %10s %s%3d%%%s %s%d%s/%s%d%s %s / %s | %s%s%s %q\n", + Blue, hex.EncodeToString(pc.PeerID[:]), Reset, + pc.StatusFlags(), + doneColour, done, Reset, + Green, int(pc.DownloadRate()/1024), Reset, + Magenta, int(pc.UploadRate()/1024), Reset, + humanize.IBytes(uint64(stats.BytesReadData.Int64())), + humanize.IBytes(uint64(stats.BytesWrittenData.Int64())), + Green, pc.RemoteAddr, Reset, + pc.PeerClientName, + ) + } + fd.Close() + time.Sleep(time.Second) + } +} + +func fifoFileList(t *torrent.Torrent) { + pth := path.Join(FIFOsDir, FilesDir, t.InfoHash().HexString()) + recreateFIFO(pth) + for { + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + if os.IsNotExist(err) { + break + } + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + continue + } + for n, f := range t.Files() { + var done int64 + if f.Length() > 0 { + done = (f.BytesCompleted() * 100) / f.Length() + } + percColour := Green + if done < 100 { + percColour = Red + } + fmt.Fprintf(fd, + "%5d %8s %3d%% | %s%s%s\n", + n, humanize.IBytes(uint64(f.Length())), done, + percColour, f.Path(), Reset, + ) + } + fd.Close() + time.Sleep(time.Second) + } +} + +func fifoDHTList(c *torrent.Client) { + pth := path.Join(FIFOsDir, "dht") + recreateFIFO(pth) + for { + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + if os.IsNotExist(err) { + break + } + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + continue + } + for _, s := range c.DhtServers() { + stats := s.Stats().(dht.ServerStats) + fmt.Fprintf( + fd, "%s%s%s all:%d good:%d await:%d succ:%d bad:%d\n", + Green, s.Addr().String(), Reset, + stats.Nodes, + stats.GoodNodes, + stats.OutstandingTransactions, + stats.SuccessfulOutboundAnnouncePeerQueries, + stats.BadNodes, + ) + } + fd.Close() + time.Sleep(time.Second) + } +} + +type topTorrent struct { + infoHash metainfo.Hash + name string + tx int64 + ratio float64 +} + +func fifoTopSeed(c *torrent.Client) { + pth := path.Join(FIFOsDir, "top-seed") + recreateFIFO(pth) + for { + fd, err := os.OpenFile(pth, os.O_WRONLY|os.O_APPEND, os.FileMode(0666)) + if err != nil { + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + continue + } + var ts []*topTorrent + for _, t := range c.Torrents() { + if t.Info() == nil { + continue + } + stats := t.Stats() + top := topTorrent{ + infoHash: t.InfoHash(), + name: t.Name(), + tx: stats.BytesWrittenData.Int64() + TxStats[t.InfoHash()], + } + top.ratio = float64(top.tx) / float64(t.Length()) + ts = append(ts, &top) + } + sort.Sort(ByTxTraffic(ts)) + for _, t := range ts { + fmt.Fprintf(fd, + "%s%s%s %s%40s%s %s%4.1f%s %s\n", + Blue, t.infoHash.HexString(), Reset, + Green, shortenName(t.name), Reset, + Magenta, t.ratio, Reset, + humanize.IBytes(uint64(t.tx)), + ) + } + fd.Close() + time.Sleep(time.Second) + } +} + +type stringAddr string + +func (stringAddr) Network() string { return "" } +func (me stringAddr) String() string { return string(me) } + +func resolveTestPeers(addrs []string) (ret []torrent.PeerInfo) { + for _, ta := range addrs { + ret = append(ret, torrent.PeerInfo{Addr: stringAddr(ta)}) + } + return +} + +func readLinesFromFIFO(pth string) []string { + fd, err := os.OpenFile(pth, os.O_RDONLY, os.FileMode(0666)) + if err != nil { + log.Println("OpenFile:", pth, err) + time.Sleep(time.Second) + return nil + } + var lines []string + scanner := bufio.NewScanner(fd) + for scanner.Scan() { + t := scanner.Text() + if len(t) > 0 { + lines = append(lines, t) + } + } + fd.Close() + return lines +} + +func saveTorrent(t *torrent.Torrent) error { + pth := storage.PathShortener(t.Name()) + TorrentExt + if _, err := os.Stat(pth); err == nil { + return nil + } + var b bytes.Buffer + t.Metainfo().Write(&b) + return os.WriteFile(pth, b.Bytes(), 0666) +} + +func fifoAdd(c *torrent.Client) { + pth := path.Join(FIFOsDir, "add") + recreateFIFO(pth) + for { + for _, what := range readLinesFromFIFO(pth) { + cols := strings.Fields(what) + what = cols[0] + var t *torrent.Torrent + var err error + if strings.HasPrefix(what, "magnet:") { + t, err = c.AddMagnet(what) + if err != nil { + log.Println("AddMagnet:", what, err) + continue + } + } else { + metaInfo, err := metainfo.LoadFromFile(what) + if err != nil { + log.Println("LoadFromFile:", what, err) + continue + } + t, err = c.AddTorrent(metaInfo) + if err != nil { + log.Println("AddTorrent:", what, err) + continue + } + } + if len(cols) > 1 { + t.AddPeers(resolveTestPeers(cols[1:])) + } + TorrentsM.Lock() + for _, h := range Torrents { + if h.HexString() == t.InfoHash().HexString() { + goto OldOne + } + } + Torrents = append(Torrents, t.InfoHash()) + OldOne: + TorrentsM.Unlock() + go fifoPeerList(t) + go fifoFileList(t) + log.Println("added:", t.InfoHash().HexString(), t.Name()) + go func() { + <-t.GotInfo() + if err = saveTorrent(t); err != nil { + log.Println("saveTorrent:", err) + } + txStatsLoad(t.InfoHash()) + t.DownloadAll() + }() + } + time.Sleep(time.Second) + } +} + +func fifoDel(c *torrent.Client) { + pth := path.Join(FIFOsDir, "del") + recreateFIFO(pth) + for { + for _, what := range readLinesFromFIFO(pth) { + raw, err := hex.DecodeString(what) + if err != nil { + log.Println(err) + continue + } + if len(raw) != infohash.Size { + log.Println("bad length") + continue + } + var i infohash.T + copy(i[:], raw) + TorrentsM.Lock() + for n, h := range Torrents { + if h.HexString() == i.HexString() { + Torrents = append(Torrents[:n], Torrents[n+1:]...) + break + } + } + TorrentsM.Unlock() + t, ok := c.Torrent(i) + if !ok { + log.Println("no such torrent", what) + continue + } + txStatsDump(t) + txStatsDel(t.InfoHash()) + t.Drop() + for _, where := range []string{"files", "peers"} { + pth := path.Join(where, t.InfoHash().HexString()) + os.Remove(pth) + fd, err := os.Open(pth) + if err == nil { + fd.Close() + } + } + log.Println("deleted:", what, t.Name()) + } + time.Sleep(time.Second) + } +} + +func fifosPrepare() { + os.MkdirAll(path.Join(FIFOsDir, PeersDir), 0777) + os.MkdirAll(path.Join(FIFOsDir, FilesDir), 0777) +} + +func fifosCleanup() { + os.Remove(path.Join(FIFOsDir, "list")) + os.Remove(path.Join(FIFOsDir, "dht")) + os.Remove(path.Join(FIFOsDir, "add")) + os.Remove(path.Join(FIFOsDir, "del")) + os.Remove(path.Join(FIFOsDir, "top-seed")) + os.RemoveAll(path.Join(FIFOsDir, PeersDir)) + os.RemoveAll(path.Join(FIFOsDir, FilesDir)) +} diff --git a/cmd/btrtrc/main.go b/cmd/btrtrc/main.go new file mode 100644 index 00000000..96d9c3fb --- /dev/null +++ b/cmd/btrtrc/main.go @@ -0,0 +1,96 @@ +package main + +import ( + "flag" + "log" + "net" + "os" + "os/signal" + "strings" + "sync" + "syscall" + + "github.com/anacrolix/dht/v2" + analog "github.com/anacrolix/log" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/storage" +) + +const TorrentExt = ".torrent" + +var ( + Cancel = make(chan struct{}) + Jobs sync.WaitGroup +) + +func main() { + log.SetFlags(log.Ldate | log.Ltime) + fifosDir := flag.String("fifos", "fifos", "Path to fifos/") + dhtBoot := flag.String("dht", "dht.cypherpunks.ru:8991", "Comma-separated list of DHT bootstrap nodes") + addr := flag.String("bind", "[::]:6881", "Address to bind to") + pub4 := flag.String("4", "", "External IPv4 address") + pub6 := flag.String("6", "", "External IPv6 address") + debug := flag.Bool("debug", false, "Enable debug messages") + noDHT := flag.Bool("nodht", false, "Disable DHT") + verify := flag.Bool("verify", false, "Force verification of provided torrents") + flag.Parse() + + FIFOsDir = *fifosDir + dht.DefaultGlobalBootstrapHostPorts = strings.Split(*dhtBoot, ",") + cc := torrent.NewDefaultClientConfig() + cc.Debug = *debug + cc.DisableAcceptRateLimiting = true + cc.NoDefaultPortForwarding = true + cc.DisableWebtorrent = true + cc.Logger = analog.Default.WithNames("main", "client") + cc.HTTPUserAgent = "btrtrc/0.1.0" + cc.ExtendedHandshakeClientVersion = "btrtrc/0.1.0" + cc.DefaultStorage = storage.NewFileWithCompletion(".", NewBFPieceCompletion()) + if *verify { + doVerify(cc, flag.Args()) + return + } + cc.Seed = true + if *pub4 == "" { + cc.DisableIPv4 = true + } else { + cc.PublicIp4 = net.ParseIP(*pub4).To4() + } + if *pub6 == "" { + cc.DisableIPv6 = true + } else { + cc.PublicIp6 = net.ParseIP(*pub6).To16() + } + cc.NoDHT = *noDHT + cc.SetListenAddr(*addr) + client, err := torrent.NewClient(cc) + if err != nil { + log.Fatalln("torrent.NewClient:", err) + } + defer client.Close() + + needsShutdown := make(chan os.Signal) + signal.Notify(needsShutdown, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-needsShutdown + close(Cancel) + client.Close() + }() + + fifosPrepare() + log.Println("started", client.PublicIPs()) + Jobs.Add(1) + go overallStatus(client) + go fifoList(client) + go fifoTopSeed(client) + go fifoDHTList(client) + go fifoAdd(client) + go fifoDel(client) + Jobs.Add(1) + go txStatsDumper(client) + <-client.Closed() + Jobs.Wait() + fifosCleanup() + log.Println("finished") +} diff --git a/cmd/btrtrc/pc.go b/cmd/btrtrc/pc.go new file mode 100644 index 00000000..56e174a4 --- /dev/null +++ b/cmd/btrtrc/pc.go @@ -0,0 +1,68 @@ +package main + +import ( + "os" + "sync" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" +) + +type BFPieceCompletion struct { + sync.Mutex +} + +func NewBFPieceCompletion() *BFPieceCompletion { + return &BFPieceCompletion{} +} + +func (self *BFPieceCompletion) Get(pk metainfo.PieceKey) (c storage.Completion, rerr error) { + self.Lock() + defer self.Unlock() + fd, err := os.OpenFile(pk.InfoHash.HexString()+".bf", os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + rerr = err + return + } + defer fd.Close() + b := []byte{0} + _, err = fd.ReadAt(b, int64(pk.Index)) + if err != nil { + return + } + c.Ok = true + switch string(b) { + case "0": + c.Complete = false + case "1": + c.Complete = true + default: + c.Ok = false + } + return +} + +func (self *BFPieceCompletion) Set(pk metainfo.PieceKey, complete bool) error { + if c, err := self.Get(pk); err == nil && c.Ok && c.Complete == complete { + return nil + } + self.Lock() + defer self.Unlock() + fd, err := os.OpenFile(pk.InfoHash.HexString()+".bf", os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return err + } + defer fd.Close() + b := []byte{'1'} + if !complete { + b[0] = '0' + } + if _, err = fd.WriteAt(b, int64(pk.Index)); err != nil { + return err + } + return nil +} + +func (self *BFPieceCompletion) Close() error { + return nil +} diff --git a/cmd/btrtrc/seed-feed b/cmd/btrtrc/seed-feed new file mode 100755 index 00000000..c286dfe9 --- /dev/null +++ b/cmd/btrtrc/seed-feed @@ -0,0 +1,12 @@ +#!/bin/sh -e +# Feed seed-file if daemon is restarted + +while : ; do + cur=`stat -f %m fifos/add || :` + if [ "$cur" != "$prev" ] ; then + date + grep -v "^#" seed > fifos/add + prev=`stat -f %m fifos/add || :` + fi + sleep 60 +done diff --git a/cmd/btrtrc/sort.go b/cmd/btrtrc/sort.go new file mode 100644 index 00000000..3837d1f3 --- /dev/null +++ b/cmd/btrtrc/sort.go @@ -0,0 +1,52 @@ +package main + +import ( + "encoding/hex" + + "github.com/anacrolix/torrent" +) + +type ByInfoHash []*torrent.Torrent + +func (a ByInfoHash) Len() int { + return len(a) +} + +func (a ByInfoHash) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByInfoHash) Less(i, j int) bool { + return a[i].InfoHash().HexString() < a[j].InfoHash().HexString() +} + +type ByTxTraffic []*topTorrent + +func (a ByTxTraffic) Len() int { + return len(a) +} + +func (a ByTxTraffic) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByTxTraffic) Less(i, j int) bool { + if a[i].tx == a[j].tx { + return a[i].infoHash.HexString() < a[j].infoHash.HexString() + } + return a[i].tx < a[j].tx +} + +type ByPeerID []*torrent.PeerConn + +func (a ByPeerID) Len() int { + return len(a) +} + +func (a ByPeerID) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByPeerID) Less(i, j int) bool { + return hex.EncodeToString(a[i].PeerID[:]) < hex.EncodeToString(a[j].PeerID[:]) +} diff --git a/cmd/btrtrc/status.go b/cmd/btrtrc/status.go new file mode 100644 index 00000000..8cb0cbc4 --- /dev/null +++ b/cmd/btrtrc/status.go @@ -0,0 +1,49 @@ +package main + +import ( + "log" + "time" + + "github.com/anacrolix/torrent" + "github.com/dustin/go-humanize" +) + +func overallStatus(c *torrent.Client) { + tick := time.Tick(time.Second) + var prev torrent.ConnStats + for { + select { + case <-Cancel: + Jobs.Done() + return + case <-tick: + } + stats := c.ConnStats() + var peers int + for _, t := range c.Torrents() { + if t.Info() == nil { + continue + } + tStats := t.Stats() + cur := tStats.Copy() + TorrentStatsM.Lock() + prev := TorrentStats[t.InfoHash()].stats + TorrentStats[t.InfoHash()] = TorrentStat{ + stats: cur, + rxSpeed: cur.BytesReadData.Int64() - prev.BytesReadData.Int64(), + txSpeed: cur.BytesWrittenData.Int64() - prev.BytesWrittenData.Int64(), + } + TorrentStatsM.Unlock() + peers += tStats.ActivePeers + } + log.Printf( + "%s / %s | %d | %s%d%s / %s%d%s", + humanize.IBytes(uint64(stats.BytesRead.Int64())), + humanize.IBytes(uint64(stats.BytesWritten.Int64())), + peers, + Green, (stats.BytesRead.Int64()-prev.BytesRead.Int64())/1024, Reset, + Magenta, (stats.BytesWritten.Int64()-prev.BytesWritten.Int64())/1024, Reset, + ) + prev = stats + } +} diff --git a/cmd/btrtrc/txstats.go b/cmd/btrtrc/txstats.go new file mode 100644 index 00000000..f05a4b8f --- /dev/null +++ b/cmd/btrtrc/txstats.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" + "os" + "strconv" + "sync" + "time" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" +) + +const TxExt = ".tx" + +var ( + TxStats = map[metainfo.Hash]int64{} + TxStatsM sync.Mutex +) + +func txStatsLoad(h metainfo.Hash) { + pth := h.HexString() + TxExt + data, err := os.ReadFile(pth) + if err != nil { + return + } + v, err := strconv.ParseInt(string(data[:len(data)-1]), 10, 64) + if err != nil { + log.Println("ParseInt:", pth, err) + return + } + TxStatsM.Lock() + TxStats[h] = v + TxStatsM.Unlock() +} + +func txStatsDel(h metainfo.Hash) { + TxStatsM.Lock() + delete(TxStats, h) + TxStatsM.Unlock() +} + +func txStatsDump(t *torrent.Torrent) { + stats := t.Stats() + TxStatsM.Lock() + s := stats.BytesWrittenData.Int64() + TxStats[t.InfoHash()] + pth := t.InfoHash().HexString() + TxExt + if err := os.WriteFile(pth, []byte(fmt.Sprintf("%d\n", s)), 0666); err != nil { + log.Println("WriteFile:", pth, err) + } + TxStatsM.Unlock() +} + +func txStatsDumpAll(c *torrent.Client) { + for _, t := range c.Torrents() { + if t.Info() != nil { + txStatsDump(t) + } + } +} + +func txStatsDumper(c *torrent.Client) { + tick := time.Tick(10 * time.Second) + for { + txStatsDumpAll(c) + select { + case <-Cancel: + Jobs.Done() + return + case <-tick: + } + } +} diff --git a/cmd/btrtrc/verify.go b/cmd/btrtrc/verify.go new file mode 100644 index 00000000..b2a8e6ff --- /dev/null +++ b/cmd/btrtrc/verify.go @@ -0,0 +1,54 @@ +package main + +import ( + "fmt" + "log" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" +) + +func doVerify(cc *torrent.ClientConfig, pths []string) { + cc.DisableTrackers = true + cc.NoDHT = true + cc.NoUpload = true + cc.DisableUTP = true + cc.DisableTCP = true + cc.DisableIPv6 = true + cc.DisableIPv4 = true + cc.AcceptPeerConnections = false + cc.DisableWebseeds = true + client, err := torrent.NewClient(cc) + if err != nil { + log.Fatalln("torrent.NewClient:", err) + } + for _, pth := range pths { + metaInfo, err := metainfo.LoadFromFile(pth) + if err != nil { + log.Fatalln("LoadFromFile:", err) + } + t, err := client.AddTorrent(metaInfo) + if err != nil { + log.Fatalln("AddTorrent:", err) + } + <-t.GotInfo() + if err = saveTorrent(t); err != nil { + log.Println("saveTorrent:", err) + } + go func() { + sub := t.SubscribePieceStateChanges() + defer sub.Close() + var last int + for piece := range sub.Values { + if piece.Hashing && piece.Index > last { + fmt.Printf("\r%s: %d / %d", pth, piece.Index, t.NumPieces()) + last = piece.Index + } + } + }() + t.VerifyData() + fmt.Printf("\n") + } + client.Close() + <-client.Closed() +} diff --git a/peer.go b/peer.go index d5ed19e5..abf45d4b 100644 --- a/peer.go +++ b/peer.go @@ -239,6 +239,10 @@ func (cn *Peer) statusFlags() (ret string) { return } +func (cn *Peer) StatusFlags() string { + return cn.statusFlags() +} + func (cn *Peer) downloadRate() float64 { num := cn._stats.BytesReadUsefulData.Int64() if num == 0 { @@ -254,6 +258,16 @@ func (cn *Peer) DownloadRate() float64 { return cn.downloadRate() } +func (cn *Peer) UploadRate() float64 { + cn.locker().RLock() + defer cn.locker().RUnlock() + num := cn._stats.BytesWrittenData.Int64() + if num == 0 { + return 0 + } + return float64(num) / time.Now().Sub(cn.completedHandshake).Seconds() +} + func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) { var last Option[pieceIndex] var count int @@ -532,6 +546,14 @@ func (cn *Peer) allStats(f func(*ConnStats)) { } } +func (cn *Peer) Stats() *ConnStats { + return cn.stats() +} + +func (cn *Peer) CompletedString() string { + return cn.completedString() +} + func (cn *Peer) readBytes(n int64) { cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } diff --git a/storage/file-piece.go b/storage/file-piece.go index 47772017..4a04fec5 100644 --- a/storage/file-piece.go +++ b/storage/file-piece.go @@ -33,7 +33,7 @@ func (fs *filePieceImpl) Completion() Completion { if c.Complete { // If it's allegedly complete, check that its constituent files have the necessary length. for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) { - s, err := os.Stat(fs.files[fi.fileIndex].path) + s, err := os.Stat(PathShortener(fs.files[fi.fileIndex].path)) if err != nil || s.Size() < fi.length { verified = false break diff --git a/storage/file.go b/storage/file.go index b8739647..6871cad3 100644 --- a/storage/file.go +++ b/storage/file.go @@ -5,6 +5,8 @@ import ( "io" "os" "path/filepath" + "sync" + "time" "github.com/anacrolix/missinggo/v2" @@ -13,6 +15,45 @@ import ( "github.com/anacrolix/torrent/segments" ) +const fdCacheAliveTime = 10 + +type fdCacheEntry struct { + last int64 + fd *os.File + sync.Mutex +} + +var ( + fdRCache = map[string]*fdCacheEntry{} + fdRCacheM sync.Mutex + fdWCache = map[string]*fdCacheEntry{} + fdWCacheM sync.Mutex + fdMkdirAllCache = map[string]struct{}{} + fdCacheCleanerM sync.Once +) + +func fdCacheCleaner() { + cleaner := func(c map[string]*fdCacheEntry, m *sync.Mutex) { + now := time.Now().Unix() + m.Lock() + for k, v := range c { + if now-v.last > fdCacheAliveTime { + go func() { + v.Lock() + v.fd.Close() + v.Unlock() + }() + } + delete(c, k) + } + m.Unlock() + } + for range time.Tick(fdCacheAliveTime * time.Second) { + cleaner(fdRCache, &fdRCacheM) + cleaner(fdWCache, &fdWCacheM) + } +} + // File-based storage for torrents, that isn't yet bound to a particular torrent. type fileClientImpl struct { opts NewFileClientOpts @@ -48,6 +89,7 @@ func NewFileOpts(opts NewFileClientOpts) ClientImplCloser { if opts.PieceCompletion == nil { opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir) } + fdCacheCleanerM.Do(func() { go fdCacheCleaner() }) return fileClientImpl{opts} } @@ -73,7 +115,7 @@ func (fs fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash length: fileInfo.Length, } if f.length == 0 { - err = CreateNativeZeroLengthFile(f.path) + err = CreateNativeZeroLengthFile(PathShortener(f.path)) if err != nil { err = fmt.Errorf("creating zero length file: %w", err) return @@ -142,30 +184,32 @@ type fileTorrentImplIO struct { // Returns EOF on short or missing file. func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { - f, err := os.Open(file.path) - if os.IsNotExist(err) { - // File missing is treated the same as a short file. - err = io.EOF - return - } - if err != nil { - return + fdRCacheM.Lock() + pth := PathShortener(file.path) + centry := fdRCache[pth] + if centry == nil { + var fd *os.File + fd, err = os.Open(pth) + if os.IsNotExist(err) { + // File missing is treated the same as a short file. + err = io.EOF + } + if err != nil { + fdRCacheM.Unlock() + return + } + centry = &fdCacheEntry{fd: fd} + fdRCache[pth] = centry } - defer f.Close() + fdRCacheM.Unlock() // Limit the read to within the expected bounds of this file. if int64(len(b)) > file.length-off { b = b[:file.length-off] } - for off < file.length && len(b) != 0 { - n1, err1 := f.ReadAt(b, off) - b = b[n1:] - n += n1 - off += int64(n1) - if n1 == 0 { - err = err1 - break - } - } + centry.Lock() + centry.last = time.Now().Unix() + n, err = centry.fd.ReadAt(b, off) + centry.Unlock() return } @@ -185,24 +229,33 @@ func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) { } func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) { - // log.Printf("write at %v: %v bytes", off, len(p)) fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { - name := fst.fts.files[i].path - os.MkdirAll(filepath.Dir(name), 0o777) - var f *os.File - f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) - if err != nil { - return false + name := PathShortener(fst.fts.files[i].path) + _, ok := fdMkdirAllCache[filepath.Dir(name)] + if !ok { + os.MkdirAll(filepath.Dir(name), 0o777) + fdMkdirAllCache[filepath.Dir(name)] = struct{}{} + } + fdWCacheM.Lock() + centry := fdWCache[name] + if centry == nil { + var fd *os.File + fd, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666) + if err != nil { + fdWCacheM.Unlock() + return false + } + centry = &fdCacheEntry{fd: fd} + fdWCache[name] = centry } + fdWCacheM.Unlock() var n1 int - n1, err = f.WriteAt(p[:e.Length], e.Start) - // log.Printf("%v %v wrote %v: %v", i, e, n1, err) - closeErr := f.Close() + centry.Lock() + centry.last = time.Now().Unix() + n1, err = centry.fd.WriteAt(p[:e.Length], e.Start) + centry.Unlock() n += n1 p = p[n1:] - if err == nil { - err = closeErr - } if err == nil && int64(n1) != e.Length { err = io.ErrShortWrite } diff --git a/storage/shortener.go b/storage/shortener.go new file mode 100644 index 00000000..e3ed5df9 --- /dev/null +++ b/storage/shortener.go @@ -0,0 +1,30 @@ +package storage + +import ( + "crypto/sha1" + "encoding/hex" + "path" + "strings" + "unicode/utf8" +) + +const MaxFilenameLen = 200 + +func PathShortener(pth string) string { + parts := strings.Split(pth, "/") + for i, part := range parts { + if len(part) <= MaxFilenameLen { + continue + } + n := 0 + var short []rune + for (n < len(part)) && (len(string(short)) <= MaxFilenameLen) { + r, w := utf8.DecodeRuneInString(part[n:]) + n += w + short = append(short, r) + } + h := sha1.Sum([]byte(part)) + parts[i] = string(short[:len(short)-1]) + "-" + hex.EncodeToString(h[:]) + } + return path.Join(parts...) +} -- 2.44.0