]> Sergey Matveev's repositories - syncer.git/blob - syncer.go
Raise copyright years
[syncer.git] / syncer.go
1 /*
2 syncer -- stateful file/device data syncer.
3 Copyright (C) 2015-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 // Stateful file/device data syncer.
19 package main
20
21 import (
22         "bytes"
23         "encoding/binary"
24         "flag"
25         "io"
26         "io/ioutil"
27         "log"
28         "os"
29         "runtime"
30
31         "github.com/dchest/blake2b"
32 )
33
34 var (
35         blkSize   = flag.Int64("blk", 2*1<<10, "Block size (KiB)")
36         statePath = flag.String("state", "state.bin", "Path to statefile")
37         dstPath   = flag.String("dst", "", "Path to destination disk")
38         srcPath   = flag.String("src", "/dev/da0", "Path to source disk")
39 )
40
41 type SyncEvent struct {
42         i    int64
43         buf  []byte
44         data []byte
45 }
46
47 func main() {
48         flag.Parse()
49         bs := *blkSize * int64(1<<10)
50
51         if *dstPath == "" {
52                 log.Fatalln("Not destination is specified")
53         }
54
55         // Open source, calculate number of blocks
56         var size int64
57         src, err := os.Open(*srcPath)
58         if err != nil {
59                 log.Fatalln("Unable to open src:", err)
60         }
61         defer src.Close()
62         fi, err := src.Stat()
63         if err != nil {
64                 log.Fatalln("Unable to read src stat:", err)
65         }
66         if fi.Mode()&os.ModeDevice == os.ModeDevice {
67                 size, err = src.Seek(0, 2)
68                 if err != nil {
69                         log.Fatalln("Unable to seek src:", err)
70                 }
71                 src.Seek(0, 0)
72         } else {
73                 size = fi.Size()
74         }
75         blocks := size / bs
76         if size%bs != 0 {
77                 blocks++
78         }
79         log.Println(blocks, bs, "byte blocks")
80
81         // Open destination
82         dst, err := os.OpenFile(*dstPath, os.O_WRONLY|os.O_CREATE, 0600)
83         if err != nil {
84                 log.Fatalln("Unable to open dst:", err)
85         }
86         defer dst.Close()
87
88         // Check if we already have statefile and read the state
89         state := make([]byte, blake2b.Size*blocks)
90         var i int64
91         var tmp []byte
92         if _, err := os.Stat(*statePath); err == nil {
93                 log.Println("State file found")
94                 stateFile, err := os.Open(*statePath)
95                 if err != nil {
96                         log.Fatalln("Unable to read statefile:", err)
97                 }
98
99                 // Check previously used size and block size
100                 tmp = make([]byte, 8)
101                 n, err := stateFile.Read(tmp)
102                 if err != nil || n != 8 {
103                         log.Fatalln("Invalid statefile")
104                 }
105                 prevSize := int64(binary.BigEndian.Uint64(tmp))
106                 if size != prevSize {
107                         log.Fatalln(
108                                 "Size differs with state file:",
109                                 prevSize, "instead of", size,
110                         )
111                 }
112                 tmp = make([]byte, 8)
113                 n, err = stateFile.Read(tmp)
114                 if err != nil || n != 8 {
115                         log.Fatalln("Invalid statefile")
116                 }
117                 prevBs := int64(binary.BigEndian.Uint64(tmp))
118                 if bs != prevBs {
119                         log.Fatalln(
120                                 "Blocksize differs with state file:",
121                                 prevBs, "instead of", bs,
122                         )
123                 }
124
125                 n, err = stateFile.Read(state)
126                 if err != nil || n != len(state) {
127                         log.Fatalln("Corrupted statefile")
128                 }
129                 stateFile.Close()
130         }
131         stateFile, err := ioutil.TempFile(".", "syncer")
132         if err != nil {
133                 log.Fatalln("Unable to create temporary file:", err)
134         }
135         tmp = make([]byte, 8)
136         binary.BigEndian.PutUint64(tmp, uint64(size))
137         stateFile.Write(tmp)
138         tmp = make([]byte, 8)
139         binary.BigEndian.PutUint64(tmp, uint64(bs))
140         stateFile.Write(tmp)
141
142         // Create buffers and event channel
143         workers := runtime.NumCPU()
144         log.Println(workers, "workers")
145         bufs := make(chan []byte, workers)
146         for i := 0; i < workers; i++ {
147                 bufs <- make([]byte, int(bs))
148         }
149         syncs := make(chan chan SyncEvent, workers)
150
151         // Writer
152         prn := NewPrinter(blocks)
153         finished := make(chan struct{})
154         go func() {
155                 var event SyncEvent
156                 for sync := range syncs {
157                         event = <-sync
158                         if event.data != nil {
159                                 dst.Seek(event.i*bs, 0)
160                                 dst.Write(event.data)
161                         }
162                         bufs <- event.buf
163                         <-sync
164                 }
165                 close(finished)
166         }()
167
168         // Reader
169         for i = 0; i < blocks; i++ {
170                 buf := <-bufs
171                 n, err := src.Read(buf)
172                 if err != nil {
173                         if err != io.EOF {
174                                 log.Fatalln("Error during src read:", err)
175                         }
176                         break
177                 }
178                 sync := make(chan SyncEvent)
179                 syncs <- sync
180                 go func(i int64) {
181                         sum := blake2b.Sum512(buf[:n])
182                         sumState := state[i*blake2b.Size : i*blake2b.Size+blake2b.Size]
183                         if bytes.Compare(sumState, sum[:]) != 0 {
184                                 sync <- SyncEvent{i, buf, buf[:n]}
185                                 prn.Changed()
186                         } else {
187                                 sync <- SyncEvent{i, buf, nil}
188                                 prn.Unchanged()
189                         }
190                         copy(sumState, sum[:])
191                         close(sync)
192                 }(i)
193         }
194         close(syncs)
195         <-finished
196         prn.Output()
197
198         log.Println("Saving state")
199         stateFile.Write(state)
200         stateFile.Close()
201         if err = os.Rename(stateFile.Name(), *statePath); err != nil {
202                 log.Fatalln(
203                         "Unable to overwrite statefile:", err,
204                         "saved state is in:", stateFile.Name(),
205                 )
206         }
207 }