]> Sergey Matveev's repositories - syncer.git/blob - syncer.go
Raise copyright years
[syncer.git] / syncer.go
1 /*
2 syncer -- stateful file/device data syncer.
3 Copyright (C) 2015-2023 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 // Stateful file/device data syncer.
19 package main
20
21 import (
22         "bytes"
23         "encoding/binary"
24         "flag"
25         "io"
26         "io/ioutil"
27         "log"
28         "os"
29         "runtime"
30
31         "lukechampine.com/blake3"
32 )
33
34 const HashSize = 256 / 8
35
36 var (
37         blkSize   = flag.Int64("blk", 2*1<<10, "Block size (KiB)")
38         statePath = flag.String("state", "state.bin", "Path to statefile")
39         dstPath   = flag.String("dst", "", "Path to destination disk")
40         srcPath   = flag.String("src", "/dev/da0", "Path to source disk")
41 )
42
43 type SyncEvent struct {
44         i    int64
45         buf  []byte
46         data []byte
47 }
48
49 func main() {
50         flag.Parse()
51         bs := *blkSize * int64(1<<10)
52
53         if *dstPath == "" {
54                 log.Fatalln("Not destination is specified")
55         }
56
57         // Open source, calculate number of blocks
58         var size int64
59         src, err := os.Open(*srcPath)
60         if err != nil {
61                 log.Fatalln("Unable to open src:", err)
62         }
63         defer src.Close()
64         fi, err := src.Stat()
65         if err != nil {
66                 log.Fatalln("Unable to read src stat:", err)
67         }
68         if fi.Mode()&os.ModeDevice == os.ModeDevice {
69                 size, err = src.Seek(0, 2)
70                 if err != nil {
71                         log.Fatalln("Unable to seek src:", err)
72                 }
73                 src.Seek(0, 0)
74         } else {
75                 size = fi.Size()
76         }
77         blocks := size / bs
78         if size%bs != 0 {
79                 blocks++
80         }
81         log.Println(blocks, bs, "byte blocks")
82
83         // Open destination
84         dst, err := os.OpenFile(*dstPath, os.O_WRONLY|os.O_CREATE, 0600)
85         if err != nil {
86                 log.Fatalln("Unable to open dst:", err)
87         }
88         defer dst.Close()
89
90         // Check if we already have statefile and read the state
91         state := make([]byte, HashSize*blocks)
92         var i int64
93         var tmp []byte
94         if _, err := os.Stat(*statePath); err == nil {
95                 log.Println("State file found")
96                 stateFile, err := os.Open(*statePath)
97                 if err != nil {
98                         log.Fatalln("Unable to read statefile:", err)
99                 }
100
101                 // Check previously used size and block size
102                 tmp = make([]byte, 8)
103                 n, err := stateFile.Read(tmp)
104                 if err != nil || n != 8 {
105                         log.Fatalln("Invalid statefile")
106                 }
107                 prevSize := int64(binary.BigEndian.Uint64(tmp))
108                 if size != prevSize {
109                         log.Fatalln(
110                                 "Size differs with state file:",
111                                 prevSize, "instead of", size,
112                         )
113                 }
114                 tmp = make([]byte, 8)
115                 n, err = stateFile.Read(tmp)
116                 if err != nil || n != 8 {
117                         log.Fatalln("Invalid statefile")
118                 }
119                 prevBs := int64(binary.BigEndian.Uint64(tmp))
120                 if bs != prevBs {
121                         log.Fatalln(
122                                 "Blocksize differs with state file:",
123                                 prevBs, "instead of", bs,
124                         )
125                 }
126
127                 n, err = stateFile.Read(state)
128                 if err != nil || n != len(state) {
129                         log.Fatalln("Corrupted statefile")
130                 }
131                 stateFile.Close()
132         }
133         stateFile, err := ioutil.TempFile(".", "syncer")
134         if err != nil {
135                 log.Fatalln("Unable to create temporary file:", err)
136         }
137         tmp = make([]byte, 8)
138         binary.BigEndian.PutUint64(tmp, uint64(size))
139         stateFile.Write(tmp)
140         tmp = make([]byte, 8)
141         binary.BigEndian.PutUint64(tmp, uint64(bs))
142         stateFile.Write(tmp)
143
144         // Create buffers and event channel
145         workers := runtime.NumCPU()
146         log.Println(workers, "workers")
147         bufs := make(chan []byte, workers)
148         for i := 0; i < workers; i++ {
149                 bufs <- make([]byte, int(bs))
150         }
151         syncs := make(chan chan SyncEvent, workers)
152
153         // Writer
154         prn := NewPrinter(blocks)
155         finished := make(chan struct{})
156         go func() {
157                 var event SyncEvent
158                 for sync := range syncs {
159                         event = <-sync
160                         if event.data != nil {
161                                 dst.Seek(event.i*bs, 0)
162                                 dst.Write(event.data)
163                         }
164                         bufs <- event.buf
165                         <-sync
166                 }
167                 close(finished)
168         }()
169
170         // Reader
171         for i = 0; i < blocks; i++ {
172                 buf := <-bufs
173                 n, err := src.Read(buf)
174                 if err != nil {
175                         if err != io.EOF {
176                                 log.Fatalln("Error during src read:", err)
177                         }
178                         break
179                 }
180                 sync := make(chan SyncEvent)
181                 syncs <- sync
182                 go func(i int64) {
183                         sum := blake3.Sum256(buf[:n])
184                         sumState := state[i*HashSize : i*HashSize+HashSize]
185                         if bytes.Compare(sumState, sum[:]) != 0 {
186                                 sync <- SyncEvent{i, buf, buf[:n]}
187                                 prn.Changed()
188                         } else {
189                                 sync <- SyncEvent{i, buf, nil}
190                                 prn.Unchanged()
191                         }
192                         copy(sumState, sum[:])
193                         close(sync)
194                 }(i)
195         }
196         close(syncs)
197         <-finished
198         prn.Output()
199
200         log.Println("Saving state")
201         stateFile.Write(state)
202         stateFile.Close()
203         if err = os.Rename(stateFile.Name(), *statePath); err != nil {
204                 log.Fatalln(
205                         "Unable to overwrite statefile:", err,
206                         "saved state is in:", stateFile.Name(),
207                 )
208         }
209 }