]> Sergey Matveev's repositories - syncer.git/blob - syncer.go
README language corrections
[syncer.git] / syncer.go
1 /*
2 syncer -- stateful file/device data syncer.
3 Copyright (C) 2015 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 // Stateful file/device data syncer.
20 package main
21
22 import (
23         "bytes"
24         "encoding/binary"
25         "flag"
26         "io"
27         "io/ioutil"
28         "log"
29         "os"
30         "runtime"
31
32         "github.com/dchest/blake2b"
33 )
34
35 var (
36         blkSize   = flag.Int64("blk", 2*1<<10, "Block size (KiB)")
37         statePath = flag.String("state", "state.bin", "Path to statefile")
38         dstPath   = flag.String("dst", "/dev/ada0", "Path to destination disk")
39         srcPath   = flag.String("src", "/dev/da0", "Path to source disk")
40 )
41
42 type SyncEvent struct {
43         i    int64
44         buf  []byte
45         data []byte
46 }
47
48 func prn(s string) {
49         os.Stdout.Write([]byte(s))
50         os.Stdout.Sync()
51 }
52
53 func main() {
54         flag.Parse()
55         bs := *blkSize * int64(1<<10)
56
57         // Open source, calculate number of blocks
58         var size int64
59         src, err := os.Open(*srcPath)
60         if err != nil {
61                 log.Fatalln("Unable to open src:", err)
62         }
63         defer src.Close()
64         fi, err := src.Stat()
65         if err != nil {
66                 log.Fatalln("Unable to read src stat:", err)
67         }
68         if fi.Mode()&os.ModeDevice == os.ModeDevice {
69                 size, err = src.Seek(0, 2)
70                 if err != nil {
71                         log.Fatalln("Unable to seek src:", err)
72                 }
73                 src.Seek(0, 0)
74         } else {
75                 size = fi.Size()
76         }
77         blocks := size / bs
78         if size%bs != 0 {
79                 blocks++
80         }
81         log.Println(blocks, bs, "byte blocks")
82
83         // Open destination
84         dst, err := os.OpenFile(*dstPath, os.O_WRONLY|os.O_CREATE, 0600)
85         if err != nil {
86                 log.Fatalln("Unable to open dst:", err)
87         }
88         defer dst.Close()
89
90         // Check if we already have statefile and read the state
91         state := make([]byte, blake2b.Size*blocks)
92         var i int64
93         var tmp []byte
94         if _, err := os.Stat(*statePath); err == nil {
95                 log.Println("State file found")
96                 stateFile, err := os.Open(*statePath)
97                 if err != nil {
98                         log.Fatalln("Unable to read statefile:", err)
99                 }
100
101                 // Check previously used size and block size
102                 tmp = make([]byte, 8)
103                 n, err := stateFile.Read(tmp)
104                 if err != nil || n != 8 {
105                         log.Fatalln("Invalid statefile")
106                 }
107                 prevSize := int64(binary.BigEndian.Uint64(tmp))
108                 if size != prevSize {
109                         log.Fatalln(
110                                 "Size differs with state file:",
111                                 prevSize, "instead of", size,
112                         )
113                 }
114                 tmp = make([]byte, 8)
115                 n, err = stateFile.Read(tmp)
116                 if err != nil || n != 8 {
117                         log.Fatalln("Invalid statefile")
118                 }
119                 prevBs := int64(binary.BigEndian.Uint64(tmp))
120                 if bs != prevBs {
121                         log.Fatalln(
122                                 "Blocksize differs with state file:",
123                                 prevBs, "instead of", bs,
124                         )
125                 }
126
127                 n, err = stateFile.Read(state)
128                 if err != nil || n != len(state) {
129                         log.Fatalln("Corrupted statefile")
130                 }
131                 stateFile.Close()
132         }
133         stateFile, err := ioutil.TempFile(".", "syncer")
134         if err != nil {
135                 log.Fatalln("Unable to create temporary file:", err)
136         }
137         tmp = make([]byte, 8)
138         binary.BigEndian.PutUint64(tmp, uint64(size))
139         stateFile.Write(tmp)
140         tmp = make([]byte, 8)
141         binary.BigEndian.PutUint64(tmp, uint64(bs))
142         stateFile.Write(tmp)
143
144         // Create buffers and event channel
145         workers := runtime.NumCPU()
146         log.Println(workers, "workers")
147         bufs := make(chan []byte, workers)
148         for i := 0; i < workers; i++ {
149                 bufs <- make([]byte, int(bs))
150         }
151         syncs := make(chan chan SyncEvent, workers)
152
153         // Writer
154         prn("[")
155         finished := make(chan struct{})
156         go func() {
157                 var event SyncEvent
158                 for sync := range syncs {
159                         event = <-sync
160                         if event.data != nil {
161                                 dst.Seek(event.i*bs, 0)
162                                 dst.Write(event.data)
163                         }
164                         bufs <- event.buf
165                         <-sync
166                 }
167                 close(finished)
168         }()
169
170         // Reader
171         for i = 0; i < blocks; i++ {
172                 buf := <-bufs
173                 n, err := src.Read(buf)
174                 if err != nil {
175                         if err != io.EOF {
176                                 log.Fatalln("Error during src read:", err)
177                         }
178                         break
179                 }
180                 sync := make(chan SyncEvent)
181                 syncs <- sync
182                 go func(i int64) {
183                         sum := blake2b.Sum512(buf[:n])
184                         sumState := state[i*blake2b.Size : i*blake2b.Size+blake2b.Size]
185                         if bytes.Compare(sumState, sum[:]) != 0 {
186                                 sync <- SyncEvent{i, buf, buf[:n]}
187                                 prn("%")
188                         } else {
189                                 sync <- SyncEvent{i, buf, nil}
190                                 prn(".")
191                         }
192                         copy(sumState, sum[:])
193                         close(sync)
194                 }(i)
195         }
196         close(syncs)
197         <-finished
198         prn("]\n")
199
200         log.Println("Saving state")
201         stateFile.Write(state)
202         stateFile.Close()
203         if err = os.Rename(stateFile.Name(), *statePath); err != nil {
204                 log.Fatalln(
205                         "Unable to overwrite statefile:", err,
206                         "saved state is in:", stateFile.Name(),
207                 )
208         }
209 }