]> Sergey Matveev's repositories - syncer.git/blob - syncer.go
Preset destination value is dangerous
[syncer.git] / syncer.go
1 /*
2 syncer -- stateful file/device data syncer.
3 Copyright (C) 2015 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 // Stateful file/device data syncer.
20 package main
21
22 import (
23         "bytes"
24         "encoding/binary"
25         "flag"
26         "io"
27         "io/ioutil"
28         "log"
29         "os"
30         "runtime"
31
32         "github.com/dchest/blake2b"
33 )
34
35 var (
36         blkSize   = flag.Int64("blk", 2*1<<10, "Block size (KiB)")
37         statePath = flag.String("state", "state.bin", "Path to statefile")
38         dstPath   = flag.String("dst", "", "Path to destination disk")
39         srcPath   = flag.String("src", "/dev/da0", "Path to source disk")
40 )
41
42 type SyncEvent struct {
43         i    int64
44         buf  []byte
45         data []byte
46 }
47
48 func prn(s string) {
49         os.Stdout.Write([]byte(s))
50         os.Stdout.Sync()
51 }
52
53 func main() {
54         flag.Parse()
55         bs := *blkSize * int64(1<<10)
56
57         if *dstPath == "" {
58                 log.Fatalln("Not destination is specified")
59         }
60
61         // Open source, calculate number of blocks
62         var size int64
63         src, err := os.Open(*srcPath)
64         if err != nil {
65                 log.Fatalln("Unable to open src:", err)
66         }
67         defer src.Close()
68         fi, err := src.Stat()
69         if err != nil {
70                 log.Fatalln("Unable to read src stat:", err)
71         }
72         if fi.Mode()&os.ModeDevice == os.ModeDevice {
73                 size, err = src.Seek(0, 2)
74                 if err != nil {
75                         log.Fatalln("Unable to seek src:", err)
76                 }
77                 src.Seek(0, 0)
78         } else {
79                 size = fi.Size()
80         }
81         blocks := size / bs
82         if size%bs != 0 {
83                 blocks++
84         }
85         log.Println(blocks, bs, "byte blocks")
86
87         // Open destination
88         dst, err := os.OpenFile(*dstPath, os.O_WRONLY|os.O_CREATE, 0600)
89         if err != nil {
90                 log.Fatalln("Unable to open dst:", err)
91         }
92         defer dst.Close()
93
94         // Check if we already have statefile and read the state
95         state := make([]byte, blake2b.Size*blocks)
96         var i int64
97         var tmp []byte
98         if _, err := os.Stat(*statePath); err == nil {
99                 log.Println("State file found")
100                 stateFile, err := os.Open(*statePath)
101                 if err != nil {
102                         log.Fatalln("Unable to read statefile:", err)
103                 }
104
105                 // Check previously used size and block size
106                 tmp = make([]byte, 8)
107                 n, err := stateFile.Read(tmp)
108                 if err != nil || n != 8 {
109                         log.Fatalln("Invalid statefile")
110                 }
111                 prevSize := int64(binary.BigEndian.Uint64(tmp))
112                 if size != prevSize {
113                         log.Fatalln(
114                                 "Size differs with state file:",
115                                 prevSize, "instead of", size,
116                         )
117                 }
118                 tmp = make([]byte, 8)
119                 n, err = stateFile.Read(tmp)
120                 if err != nil || n != 8 {
121                         log.Fatalln("Invalid statefile")
122                 }
123                 prevBs := int64(binary.BigEndian.Uint64(tmp))
124                 if bs != prevBs {
125                         log.Fatalln(
126                                 "Blocksize differs with state file:",
127                                 prevBs, "instead of", bs,
128                         )
129                 }
130
131                 n, err = stateFile.Read(state)
132                 if err != nil || n != len(state) {
133                         log.Fatalln("Corrupted statefile")
134                 }
135                 stateFile.Close()
136         }
137         stateFile, err := ioutil.TempFile(".", "syncer")
138         if err != nil {
139                 log.Fatalln("Unable to create temporary file:", err)
140         }
141         tmp = make([]byte, 8)
142         binary.BigEndian.PutUint64(tmp, uint64(size))
143         stateFile.Write(tmp)
144         tmp = make([]byte, 8)
145         binary.BigEndian.PutUint64(tmp, uint64(bs))
146         stateFile.Write(tmp)
147
148         // Create buffers and event channel
149         workers := runtime.NumCPU()
150         log.Println(workers, "workers")
151         bufs := make(chan []byte, workers)
152         for i := 0; i < workers; i++ {
153                 bufs <- make([]byte, int(bs))
154         }
155         syncs := make(chan chan SyncEvent, workers)
156
157         // Writer
158         prn("[")
159         finished := make(chan struct{})
160         go func() {
161                 var event SyncEvent
162                 for sync := range syncs {
163                         event = <-sync
164                         if event.data != nil {
165                                 dst.Seek(event.i*bs, 0)
166                                 dst.Write(event.data)
167                         }
168                         bufs <- event.buf
169                         <-sync
170                 }
171                 close(finished)
172         }()
173
174         // Reader
175         for i = 0; i < blocks; i++ {
176                 buf := <-bufs
177                 n, err := src.Read(buf)
178                 if err != nil {
179                         if err != io.EOF {
180                                 log.Fatalln("Error during src read:", err)
181                         }
182                         break
183                 }
184                 sync := make(chan SyncEvent)
185                 syncs <- sync
186                 go func(i int64) {
187                         sum := blake2b.Sum512(buf[:n])
188                         sumState := state[i*blake2b.Size : i*blake2b.Size+blake2b.Size]
189                         if bytes.Compare(sumState, sum[:]) != 0 {
190                                 sync <- SyncEvent{i, buf, buf[:n]}
191                                 prn("%")
192                         } else {
193                                 sync <- SyncEvent{i, buf, nil}
194                                 prn(".")
195                         }
196                         copy(sumState, sum[:])
197                         close(sync)
198                 }(i)
199         }
200         close(syncs)
201         <-finished
202         prn("]\n")
203
204         log.Println("Saving state")
205         stateFile.Write(state)
206         stateFile.Close()
207         if err = os.Rename(stateFile.Name(), *statePath); err != nil {
208                 log.Fatalln(
209                         "Unable to overwrite statefile:", err,
210                         "saved state is in:", stateFile.Name(),
211                 )
212         }
213 }