From 81b2fc3aa1f04bd483711149e6ad51a312ce3f2c Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 21 Mar 2020 10:29:14 +0300 Subject: [PATCH] Higher performance, two pass mode --- README | 93 +++++++++------ go.mod | 1 + go.sum | 2 + main.go | 361 ++++++++++++++++++++++++++++++++++++-------------------- 4 files changed, 294 insertions(+), 163 deletions(-) diff --git a/README b/README index b34dcd9..f8b2b8a 100644 --- a/README +++ b/README @@ -1,47 +1,53 @@ sgodup -- file deduplication utility ==================================== -DESCRIPTION AND USAGE +sgodup is utility for files deduplication. You supply two directories: +the base and one with possible duplicates, utility determines duplicate +files and replaces them with the links. It is aimed to have very high +performance. -sgodup is utility for duplicate files detection. You supply two -directories: the base and one with possible duplicates, utility -determines duplicate files and replaces them with the links. It -is aimed to have very high performance. +SINGLE PASS MODE +================ -There are just few arguments: +$ sgodup -basedir DIR -dupdir DIR -action ACTION \ + [-minsize NNN] [-chmod NNN] [-fsync] --basedir -- directory with files that are possible link targets - -dupdir -- directory with possible duplicates, which are replaced - with the links to basedir's files - -action -- * print: just print to stdout duplicate file path with - relative path to basedir's corresponding file - * symlink: create symbolic link with relative path to - basedir's corresponding file - * hardlink: create hard link instead - -chmod -- if specified, then chmod files in basedir and dupdir - during scan phase. Octal representation is expected - -fsync -- fsync directories where linking occurs +basedir is a directory with "original" files, that are possible link +targets. dupdir is a directory with possible duplicates, which are to be +replaced with the links to basedir's file. It is safe to specify same +directory as a basedir and dupdir. -There are three stages: +There are 3 stages this command will do: * basedir directory scan: collect all *regular* file paths, sizes and - inodes. If -chmod is specified, then apply it at once. Empty files are - ignored + inodes. If -chmod is specified, then apply it to them. Files smaller + than -minsize (by default it is equal to 1 bytes) are not taken for + duplication comparison * dupdir directory scan: same as above. If there is no basedir's file - with the same size, then skip dupdir's file (obviously it can not be + with the same size, then skip dupdir's one (obviously it can not be duplicate). Check that no basedir's files have the same inode, skip - dupdir's file otherwise, because it is already hardlinked -* deduplication stage. For each dupdir file, find basedir file with the - same size and compare their contents, to determine if dupdir's one is - the duplicate. Perform specified action if so. There are two separate - queues and processing cycles: - - * small files, up to 4 KiB (one disk sector): files are fully read and - compared in memory - * large files (everything else): read and compare first 4 KiB of files - in memory. If they are not equal, then this is not a duplicate. - Fully read each file's contents sequentially with 128 KiB chunks and - calculate BLAKE2b-512 digest otherwise + dupdir's file otherwise (it is hardlink) +* deduplication stage. For each dupdir file, find basedir one with the + same size and compare their contents, to determine if dupdir one is + the duplicate. Perform specified action if so. Comparing is done the + following way: + * read first 4 KiB (one disk sector) of each file + * if that sector differs, then files are not duplicates + * read each file's contents sequentially with 128 KiB chunks and + calculate BLAKE2b-512 digest + +Action can be the following: + +* print: print to stdout duplicate file path with corresponding relative + path to basedir's file +* symlink: create symbolic link with relative path to corresponding + basedir's file +* hardlink: create hard link instead +* ns: write to stdout series of netstring encoded pairs of duplicate + file path and its corresponding basedir's one. It is used in two pass + mode. Hint: it is highly compressible + +If -fsync is specified, then fsync directories where linking occurs. Progress is showed at each stage: how many files are counted/processed, total size of the files, how much space is deduplicated. @@ -58,9 +64,27 @@ total size of the files, how much space is deduplicated. [...] 2020/03/20 11:17:20 321,123 files deduplicated -It is safe to specify same directory as a basedir and dupdir. +TWO PASS MODE +============= + +$ sgodup -basedir DIR -dupdir DIR -action ns [-minsize NNN] [-chmod NNN] > state +$ sgodup -action ACTION [-fsync] -ns state + +If you are dealing with huge amount of small files, then simultaneous +reading (duplicate detection) and writing (duplicate files linking) on +the same disk can dramatically decrease performance. It is advisable to +separate the whole process on two stages: read-only duplicates +detection, write-only duplicates linking. + +Start sgodup with "-action ns" and redirect stdout output to some +temporary state file, for storing detected duplicate files information +in it. Then start again with "-ns state" option to relink files. SAFETY AND CONSISTENCY +====================== + +It was not tested on 32-bit platforms and probably won't work on them +correctly. POSIX has no ability to atomically replace regular file with with symbolic/hard link. So file is removed first, then link created. sgodup @@ -77,6 +101,7 @@ There are no warranties and any defined behaviour if directories (and files within) where utility is working with are modified. LICENCE +======= This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/go.mod b/go.mod index d87c9cc..42b833c 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,6 @@ go 1.14 require ( github.com/dustin/go-humanize v1.0.0 + go.cypherpunks.ru/netstring/v2 v2.0.0 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 ) diff --git a/go.sum b/go.sum index dc88a58..820ce5b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +go.cypherpunks.ru/netstring/v2 v2.0.0 h1:or1LDZO3fSd6iITGR3jJUfUrjvRgeNlUpEYI13qaRBk= +go.cypherpunks.ru/netstring/v2 v2.0.0/go.mod h1:6YDx4gW414SmHdvSBMKbHaB2/7w9WZ04NQb7XIUV/pA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/main.go b/main.go index f4fbf6c..9c5382c 100644 --- a/main.go +++ b/main.go @@ -28,40 +28,71 @@ import ( "os" "os/signal" "path/filepath" + "runtime" "strconv" "sync" "syscall" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/netstring/v2" "golang.org/x/crypto/blake2b" ) const ( - SizeBoundary = 1 << 12 // 4 KiB sector size - BufSize = 1 << 17 // ZFS default 128 KiB recordsize + Version = "0.1.0" + SectorSize = 1 << 12 // 4 KiB sector size + BufSize = 1 << 17 // ZFS default 128 KiB recordsize + + ActPrint = iota + ActNS = iota + ActSymlink = iota + ActHardlink = iota ) -var ( - canExit sync.Mutex +type Action int +var ( + canExit sync.Mutex + nsW *netstring.Writer curDirPath string curDirFd *os.File + action Action + + baseDir = flag.String("basedir", "", "Directory with original files") + dupDir = flag.String("dupdir", "", "Directory with possible duplicates") + actionS = flag.String("action", "", "print, ns, symlink, hardlink") + minSize = flag.Int64("minsize", 1, "minimal file size") + chmod = flag.String("chmod", "", "chmod files") + nsPath = flag.String("ns", "", "link targets from netstring file") + fsync = flag.Bool("fsync", false, "fsync directories?") + version = flag.Bool("version", false, "Print version information") + warranty = flag.Bool("warranty", false, "Print warranty information") ) -func link(dup, orig, action string, fsync bool) { +func link(dup, orig string) { + if action == ActNS { + if _, err := nsW.WriteChunk([]byte(dup)); err != nil { + log.Fatal(err) + } + if _, err := nsW.WriteChunk([]byte(orig)); err != nil { + log.Fatal(err) + } + return + } tgt, err := filepath.Rel(dup, orig) if err != nil { log.Fatal(err) } tgt = tgt[3:] - if action == "print" { - fmt.Println(dup, tgt) + if action == ActPrint { + fmt.Println(dup, "->", tgt) return } canExit.Lock() if err = os.Remove(dup); err != nil { log.Fatal(err) } - if action == "symlink" { + if action == ActSymlink { err = os.Symlink(tgt, dup) } else { err = os.Link(orig, dup) @@ -69,7 +100,7 @@ func link(dup, orig, action string, fsync bool) { if err != nil { log.Fatal(err) } - if fsync { + if *fsync { dirPath := filepath.Dir(dup) if dirPath != curDirPath { curDirFd, err = os.Open(dirPath) @@ -85,54 +116,144 @@ func link(dup, orig, action string, fsync bool) { canExit.Unlock() } +func signalHandler(progressStop func(), deduped *int) chan os.Signal { + termRequired := make(chan os.Signal, 1) + signal.Notify(termRequired, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-termRequired + canExit.Lock() + progressStop() + log.Println(humanize.Comma(int64(*deduped)), "files deduplicated") + os.Exit(0) + }() + return termRequired +} + func main() { - var ( - baseDir = flag.String("basedir", "", "Directory with original files") - dupDir = flag.String("dupdir", "", "Directory with possible duplicates") - action = flag.String("action", "", "print, symlink, hardlink") - doChmod = flag.String("chmod", "", "chmod files") - doFsync = flag.Bool("fsync", false, "fsync directories?") - ) + flag.Usage = func() { + fmt.Fprintf(os.Stderr, `sgodup -- file deduplication utility +Copyright (C) 2020 Sergey Matveev +License GPLv3: GNU GPL version 3 +This is free software: you are free to change and redistribute it. +There is NO WARRANTY, to the extent permitted by law. + +Single pass mode: + %s -basedir DIR -dupdir DIR -action {print,ns,symlink,hardlink} + [-chmod XXX] [-minsize XXX] [-fsync] +Two pass mode: + %s -basedir DIR -dupdir DIR -action ns [-chmod XXX] [-minsize XXX] > state + %s -action {print,symlink,hardlink} [-fsync] -ns state + +Options: +`, os.Args[0], os.Args[0], os.Args[0]) + flag.PrintDefaults() + } flag.Parse() + if *version { + fmt.Println("sgodup version", Version, "built with", runtime.Version()) + return + } + if *warranty { + fmt.Println(`This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see .`) + return + } + + var stdoutW *bufio.Writer + switch *actionS { + case "print": + action = ActPrint + case "ns": + action = ActNS + stdoutW = bufio.NewWriterSize(os.Stdout, BufSize) + nsW = netstring.NewWriter(stdoutW) + case "symlink": + action = ActSymlink + case "hardlink": + action = ActHardlink + default: + log.Fatalln("invalid action") + } + + if *nsPath != "" { + if action == ActNS { + log.Fatalln("\"-action ns\" has no meaning with -ns") + } + fd, err := os.Open(*nsPath) + if err != nil { + log.Fatal(err) + } + nsR := netstring.NewReader(bufio.NewReaderSize(fd, BufSize)) + pathDup := make([]byte, 1<<10) + pathOrig := make([]byte, 1<<10) + var pathDupLen, pathOrigLen uint64 + files := 0 + fullSize := int64(0) + progress := NewProgress(0, 0, &files, &fullSize, " linked", "") + termRequired := signalHandler(progress.Stop, &files) + for { + pathDupLen, err = nsR.Next() + if err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + if _, err = io.ReadFull(nsR, pathDup[:pathDupLen]); err != nil { + log.Fatal(err) + } + pathOrigLen, err = nsR.Next() + if err != nil { + log.Fatal(err) + } + if _, err = io.ReadFull(nsR, pathOrig[:pathOrigLen]); err != nil { + log.Fatal(err) + } + link(string(pathDup[:pathDupLen]), string(pathOrig[:pathOrigLen])) + files++ + } + termRequired <- syscall.SIGTERM + <-termRequired + } + if *baseDir == "" { log.Fatalln("-basedir is required") } if *dupDir == "" { log.Fatalln("-dupdir is required") } - var chmod os.FileMode - if *doChmod != "" { - ch, err := strconv.ParseUint(*doChmod, 8, 16) + var doChmod os.FileMode + if *chmod != "" { + ch, err := strconv.ParseUint(*chmod, 8, 16) if err != nil { log.Fatal(err) } - chmod = os.FileMode(ch) - } - if !(*action == "print" || *action == "symlink" || *action == "hardlink") { - log.Fatalln("choose action") + doChmod = os.FileMode(ch) } log.Println("processing basedir...") size2fi := make(map[int64][]FileInode, 1<<10) - files := 0 - filesSmall := 0 - filesLarge := 0 + var files int var fullSize int64 progress := NewProgress(0, 0, &files, &fullSize, " scanned", " total") for fi := range walk(*baseDir) { - if chmod > 0 { - if err := os.Chmod(fi.Path, chmod); err != nil { + if doChmod > 0 { + if err := os.Chmod(fi.Path, doChmod); err != nil { log.Fatal(err) } } - if fi.Size == 0 { + if fi.Size < *minSize { continue } - if fi.Size <= SizeBoundary { - filesSmall++ - } else { - filesLarge++ - } files++ fullSize += fi.Size size2fi[fi.Size] = append(size2fi[fi.Size], fi) @@ -140,137 +261,100 @@ func main() { progress.Stop() log.Println("processing dupdir...") - queueSmall := make(map[string][]string, filesSmall) - queueLarge := make(map[string][]string, filesLarge) - files = 0 - fullSize = 0 + queue := make([]FileInode, 0, files) + files, fullSize = 0, 0 progress = NewProgress(0, 0, &files, &fullSize, " scanned", " total") for fi := range walk(*dupDir) { - if chmod > 0 { - if err := os.Chmod(fi.Path, chmod); err != nil { + if doChmod > 0 { + if err := os.Chmod(fi.Path, doChmod); err != nil { log.Fatal(err) } } - if fi.Size == 0 { + if fi.Size < *minSize { continue } origs, ok := size2fi[fi.Size] if !ok { continue } - paths := make([]string, 0, len(origs)) + candidates := 0 for _, orig := range origs { if fi.Path == orig.Path || (fi.Dev == orig.Dev && fi.Ino == orig.Ino) { continue } - paths = append(paths, orig.Path) + candidates++ + } + if candidates == 0 { + continue } files++ fullSize += fi.Size - if fi.Size <= SizeBoundary { - queueSmall[fi.Path] = paths - } else { - queueLarge[fi.Path] = paths - } + queue = append(queue, fi) } - size2fi = nil progress.Stop() log.Println("deduplicating...") progress = NewProgress( - files, - fullSize, - &files, - &fullSize, + files, fullSize, + &files, &fullSize, " processed", " deduplicated", ) - files = 0 - fullSize = 0 - deduped := 0 - termRequired := make(chan os.Signal, 1) - signal.Notify(termRequired, syscall.SIGTERM, syscall.SIGINT) - go func() { - <-termRequired - canExit.Lock() - progress.Stop() - log.Println(deduped, "files deduplicated") - os.Exit(0) - }() - bufDup := make([]byte, SizeBoundary) - bufOrig := make([]byte, SizeBoundary) - seen := make(map[string]struct{}, len(queueSmall)) - for dup, origs := range queueSmall { - files++ - if _, ok := seen[dup]; ok { - continue - } - fdDup, err := os.Open(dup) - if err != nil { - log.Fatal(err) - } - sizeDup, err := io.ReadFull(fdDup, bufDup) - if !(err == nil || err == io.ErrUnexpectedEOF) { - log.Fatal(err) - } - if err = fdDup.Close(); err != nil { - log.Fatal(err) - } - for _, orig := range origs { - fdOrig, err := os.Open(orig) - if err != nil { - log.Fatal(err) - } - sizeOrig, err := io.ReadFull(fdOrig, bufOrig) - if !(err == nil || err == io.ErrUnexpectedEOF) { - log.Fatal(err) - } - if sizeOrig != sizeDup { - log.Fatalln(dup, orig, "unexpectedly different sizes") - } - if err = fdOrig.Close(); err != nil { - log.Fatal(err) - } - if bytes.Compare(bufDup[:sizeDup], bufOrig[:sizeOrig]) != 0 { - continue - } - link(dup, orig, *action, *doFsync) - seen[orig] = struct{}{} - deduped++ - fullSize += int64(sizeDup) - break - } - } - queueSmall = nil - + files, fullSize = 0, 0 + bufDup := make([]byte, SectorSize) + bufOrig := make([]byte, SectorSize) + seenDup := make(map[string]struct{}, len(queue)/2) + seenOrig := make(map[string]struct{}, len(queue)/2) hasher, err := blake2b.New512(nil) if err != nil { panic(err) } - seen = make(map[string]struct{}, len(queueLarge)) - var sizeDup int64 - for dup, origs := range queueLarge { + rdDup := bufio.NewReaderSize(nil, BufSize) + rdOrig := bufio.NewReaderSize(nil, BufSize) + var deduped int + termRequired := signalHandler(progress.Stop, &deduped) + for _, fi := range queue { files++ - if _, ok := seen[dup]; ok { + if _, ok := seenOrig[fi.Path]; ok { continue } - fdDup, err := os.Open(dup) + fdDup, err := os.Open(fi.Path) if err != nil { log.Fatal(err) } - if _, err := io.ReadFull(fdDup, bufDup); err != nil { - log.Fatal(err) + readDup, err := io.ReadFull(fdDup, bufDup) + if err != nil { + if err != io.ErrUnexpectedEOF { + log.Fatal(err) + } + if int64(readDup) != fi.Size { + log.Fatalln(fi.Path, "unexpected size", readDup, fi.Size) + } } var hashDup []byte - for _, orig := range origs { - fdOrig, err := os.Open(orig) + for _, orig := range size2fi[fi.Size] { + if fi.Path == orig.Path || (fi.Dev == orig.Dev && fi.Ino == orig.Ino) { + continue + } + if _, ok := seenDup[orig.Path]; ok { + continue + } + fdOrig, err := os.Open(orig.Path) if err != nil { log.Fatal(err) } - if _, err = io.ReadFull(fdOrig, bufOrig); err != nil { + readOrig, err := io.ReadFull(fdOrig, bufOrig) + if !(err == nil || err == io.ErrUnexpectedEOF) { log.Fatal(err) } - if bytes.Compare(bufDup, bufOrig) != 0 { + if readOrig != readDup { + log.Fatalln( + fi.Path, orig.Path, + "unexpectedly different sizes", + readOrig, readDup, + ) + } + if bytes.Compare(bufDup[:readDup], bufOrig[:readOrig]) != 0 { if err = fdOrig.Close(); err != nil { log.Fatal(err) } @@ -278,38 +362,57 @@ func main() { } if hashDup == nil { hasher.Reset() - if n, err := hasher.Write(bufDup); err != nil || n != len(bufDup) { + if n, err := hasher.Write(bufDup[:readDup]); err != nil || n != readDup { log.Fatalln("can not write to hash", err) } - sizeDup, err = io.Copy(hasher, bufio.NewReaderSize(fdDup, BufSize)) + rdDup.Reset(fdDup) + n, err := io.Copy(hasher, rdDup) if err != nil { log.Fatal(err) } + if int64(readDup)+n != fi.Size { + log.Fatalln(fi.Path, "unexpected size", int64(readDup)+n, fi.Size) + } hashDup = hasher.Sum(nil) } hasher.Reset() - if n, err := hasher.Write(bufOrig); err != nil || n != len(bufOrig) { + if n, err := hasher.Write(bufOrig[:readOrig]); err != nil || n != readOrig { log.Fatalln("can not write to hash", err) } - if _, err := io.Copy(hasher, bufio.NewReaderSize(fdOrig, BufSize)); err != nil { + rdOrig.Reset(fdOrig) + n, err := io.Copy(hasher, rdOrig) + if err != nil { log.Fatal(err) } + if int64(readOrig)+n != fi.Size { + log.Fatalln( + fi.Path, orig.Path, + "unexpectedly different sizes", + int64(readOrig)+n, fi.Size, + ) + } if err = fdOrig.Close(); err != nil { log.Fatal(err) } if bytes.Compare(hashDup, hasher.Sum(nil)) != 0 { continue } - link(dup, orig, *action, *doFsync) - seen[orig] = struct{}{} + link(fi.Path, orig.Path) + seenDup[fi.Path] = struct{}{} + seenOrig[orig.Path] = struct{}{} + fullSize += fi.Size deduped++ - fullSize += sizeDup break } if err = fdDup.Close(); err != nil { log.Fatal(err) } } + if action == ActNS { + if err = stdoutW.Flush(); err != nil { + log.Fatal(err) + } + } termRequired <- syscall.SIGTERM <-termRequired } -- 2.44.0