]> Sergey Matveev's repositories - syncer.git/blob - syncer.go
081cce6e0a23df51b5af3d45b0a827e4b2672ff6
[syncer.git] / syncer.go
1 /*
2 syncer -- stateful file/device data syncer.
3 Copyright (C) 2015-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 // Stateful file/device data syncer.
20 package main
21
22 import (
23         "bytes"
24         "encoding/binary"
25         "flag"
26         "io"
27         "io/ioutil"
28         "log"
29         "os"
30         "runtime"
31
32         "github.com/dchest/blake2b"
33 )
34
35 var (
36         blkSize   = flag.Int64("blk", 2*1<<10, "Block size (KiB)")
37         statePath = flag.String("state", "state.bin", "Path to statefile")
38         dstPath   = flag.String("dst", "", "Path to destination disk")
39         srcPath   = flag.String("src", "/dev/da0", "Path to source disk")
40 )
41
42 type SyncEvent struct {
43         i    int64
44         buf  []byte
45         data []byte
46 }
47
48 func main() {
49         flag.Parse()
50         bs := *blkSize * int64(1<<10)
51
52         if *dstPath == "" {
53                 log.Fatalln("Not destination is specified")
54         }
55
56         // Open source, calculate number of blocks
57         var size int64
58         src, err := os.Open(*srcPath)
59         if err != nil {
60                 log.Fatalln("Unable to open src:", err)
61         }
62         defer src.Close()
63         fi, err := src.Stat()
64         if err != nil {
65                 log.Fatalln("Unable to read src stat:", err)
66         }
67         if fi.Mode()&os.ModeDevice == os.ModeDevice {
68                 size, err = src.Seek(0, 2)
69                 if err != nil {
70                         log.Fatalln("Unable to seek src:", err)
71                 }
72                 src.Seek(0, 0)
73         } else {
74                 size = fi.Size()
75         }
76         blocks := size / bs
77         if size%bs != 0 {
78                 blocks++
79         }
80         log.Println(blocks, bs, "byte blocks")
81
82         // Open destination
83         dst, err := os.OpenFile(*dstPath, os.O_WRONLY|os.O_CREATE, 0600)
84         if err != nil {
85                 log.Fatalln("Unable to open dst:", err)
86         }
87         defer dst.Close()
88
89         // Check if we already have statefile and read the state
90         state := make([]byte, blake2b.Size*blocks)
91         var i int64
92         var tmp []byte
93         if _, err := os.Stat(*statePath); err == nil {
94                 log.Println("State file found")
95                 stateFile, err := os.Open(*statePath)
96                 if err != nil {
97                         log.Fatalln("Unable to read statefile:", err)
98                 }
99
100                 // Check previously used size and block size
101                 tmp = make([]byte, 8)
102                 n, err := stateFile.Read(tmp)
103                 if err != nil || n != 8 {
104                         log.Fatalln("Invalid statefile")
105                 }
106                 prevSize := int64(binary.BigEndian.Uint64(tmp))
107                 if size != prevSize {
108                         log.Fatalln(
109                                 "Size differs with state file:",
110                                 prevSize, "instead of", size,
111                         )
112                 }
113                 tmp = make([]byte, 8)
114                 n, err = stateFile.Read(tmp)
115                 if err != nil || n != 8 {
116                         log.Fatalln("Invalid statefile")
117                 }
118                 prevBs := int64(binary.BigEndian.Uint64(tmp))
119                 if bs != prevBs {
120                         log.Fatalln(
121                                 "Blocksize differs with state file:",
122                                 prevBs, "instead of", bs,
123                         )
124                 }
125
126                 n, err = stateFile.Read(state)
127                 if err != nil || n != len(state) {
128                         log.Fatalln("Corrupted statefile")
129                 }
130                 stateFile.Close()
131         }
132         stateFile, err := ioutil.TempFile(".", "syncer")
133         if err != nil {
134                 log.Fatalln("Unable to create temporary file:", err)
135         }
136         tmp = make([]byte, 8)
137         binary.BigEndian.PutUint64(tmp, uint64(size))
138         stateFile.Write(tmp)
139         tmp = make([]byte, 8)
140         binary.BigEndian.PutUint64(tmp, uint64(bs))
141         stateFile.Write(tmp)
142
143         // Create buffers and event channel
144         workers := runtime.NumCPU()
145         log.Println(workers, "workers")
146         bufs := make(chan []byte, workers)
147         for i := 0; i < workers; i++ {
148                 bufs <- make([]byte, int(bs))
149         }
150         syncs := make(chan chan SyncEvent, workers)
151
152         // Writer
153         prn := NewPrinter(blocks)
154         finished := make(chan struct{})
155         go func() {
156                 var event SyncEvent
157                 for sync := range syncs {
158                         event = <-sync
159                         if event.data != nil {
160                                 dst.Seek(event.i*bs, 0)
161                                 dst.Write(event.data)
162                         }
163                         bufs <- event.buf
164                         <-sync
165                 }
166                 close(finished)
167         }()
168
169         // Reader
170         for i = 0; i < blocks; i++ {
171                 buf := <-bufs
172                 n, err := src.Read(buf)
173                 if err != nil {
174                         if err != io.EOF {
175                                 log.Fatalln("Error during src read:", err)
176                         }
177                         break
178                 }
179                 sync := make(chan SyncEvent)
180                 syncs <- sync
181                 go func(i int64) {
182                         sum := blake2b.Sum512(buf[:n])
183                         sumState := state[i*blake2b.Size : i*blake2b.Size+blake2b.Size]
184                         if bytes.Compare(sumState, sum[:]) != 0 {
185                                 sync <- SyncEvent{i, buf, buf[:n]}
186                                 prn.Changed()
187                         } else {
188                                 sync <- SyncEvent{i, buf, nil}
189                                 prn.Unchanged()
190                         }
191                         copy(sumState, sum[:])
192                         close(sync)
193                 }(i)
194         }
195         close(syncs)
196         <-finished
197         prn.Output()
198
199         log.Println("Saving state")
200         stateFile.Write(state)
201         stateFile.Close()
202         if err = os.Rename(stateFile.Name(), *statePath); err != nil {
203                 log.Fatalln(
204                         "Unable to overwrite statefile:", err,
205                         "saved state is in:", stateFile.Name(),
206                 )
207         }
208 }