]> Sergey Matveev's repositories - btrtrc.git/blob - data/blob/store.go
Rework piece completion
[btrtrc.git] / data / blob / store.go
1 package blob
2
3 import (
4         "bytes"
5         "crypto/sha1"
6         "encoding/hex"
7         "errors"
8         "fmt"
9         "io"
10         "math/rand"
11         "os"
12         "path/filepath"
13
14         dataPkg "bitbucket.org/anacrolix/go.torrent/data"
15         "github.com/anacrolix/libtorgo/metainfo"
16 )
17
18 const (
19         filePerm = 0640
20         dirPerm  = 0750
21 )
22
23 type store struct {
24         baseDir   string
25         capacity  int64
26         completed map[string]struct{}
27 }
28
29 func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
30         return &data{info, me}
31 }
32
33 type StoreOption func(*store)
34
35 func Capacity(bytes int64) StoreOption {
36         return func(s *store) {
37                 s.capacity = bytes
38         }
39 }
40
41 func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store {
42         s := &store{baseDir, -1, nil}
43         for _, o := range opt {
44                 o(s)
45         }
46         s.initCompleted()
47         return s
48 }
49
50 func (me *store) initCompleted() {
51         fis, err := me.readCompletedDir()
52         if err != nil {
53                 panic(err)
54         }
55         me.completed = make(map[string]struct{}, len(fis))
56         for _, fi := range fis {
57                 me.completed[fi.Name()] = struct{}{}
58         }
59 }
60
61 func (me *store) completePieceDirPath() string {
62         return filepath.Join(me.baseDir, "complete")
63 }
64
65 func (me *store) path(p metainfo.Piece, completed bool) string {
66         return filepath.Join(me.baseDir, func() string {
67                 if completed {
68                         return "complete"
69                 } else {
70                         return "incomplete"
71                 }
72         }(), fmt.Sprintf("%x", p.Hash()))
73 }
74
75 func (me *store) pieceComplete(p metainfo.Piece) bool {
76         _, ok := me.completed[hex.EncodeToString(p.Hash())]
77         return ok
78 }
79
80 func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) {
81         if me.pieceComplete(p) {
82                 return
83         }
84         name := me.path(p, false)
85         os.MkdirAll(filepath.Dir(name), dirPerm)
86         f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm)
87         if err != nil {
88                 panic(err)
89         }
90         return
91 }
92
93 func (me *store) pieceRead(p metainfo.Piece) (f *os.File) {
94         f, err := os.Open(me.path(p, true))
95         if err == nil {
96                 return
97         }
98         if !os.IsNotExist(err) {
99                 panic(err)
100         }
101         f, err = os.Open(me.path(p, false))
102         if err == nil {
103                 return
104         }
105         if !os.IsNotExist(err) {
106                 panic(err)
107         }
108         return
109 }
110
111 func (me *store) readCompletedDir() (fis []os.FileInfo, err error) {
112         f, err := os.Open(me.completePieceDirPath())
113         if err != nil {
114                 if os.IsNotExist(err) {
115                         err = nil
116                 }
117                 return
118         }
119         fis, err = f.Readdir(-1)
120         f.Close()
121         return
122 }
123
124 func (me *store) removeCompleted(name string) (err error) {
125         err = os.Remove(filepath.Join(me.completePieceDirPath(), name))
126         if os.IsNotExist(err) {
127                 err = nil
128         }
129         if err != nil {
130                 return err
131         }
132         delete(me.completed, name)
133         return
134 }
135
136 func (me *store) makeSpace(space int64) error {
137         if me.capacity < 0 {
138                 return nil
139         }
140         if space > me.capacity {
141                 return errors.New("space requested exceeds capacity")
142         }
143         fis, err := me.readCompletedDir()
144         if err != nil {
145                 return err
146         }
147         var size int64
148         for _, fi := range fis {
149                 size += fi.Size()
150         }
151         for size > me.capacity-space {
152                 i := rand.Intn(len(fis))
153                 me.removeCompleted(fis[i].Name())
154                 size -= fis[i].Size()
155                 fis[i] = fis[len(fis)-1]
156                 fis = fis[:len(fis)-1]
157         }
158         return nil
159 }
160
161 func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
162         err = me.makeSpace(p.Length())
163         if err != nil {
164                 return
165         }
166         var (
167                 incompletePiecePath = me.path(p, false)
168                 completedPiecePath  = me.path(p, true)
169         )
170         fSrc, err := os.Open(incompletePiecePath)
171         if err != nil {
172                 return
173         }
174         defer fSrc.Close()
175         os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
176         fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
177         if err != nil {
178                 return
179         }
180         defer fDst.Close()
181         hasher := sha1.New()
182         r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher)
183         _, err = io.Copy(fDst, r)
184         if err != nil {
185                 return
186         }
187         if !bytes.Equal(hasher.Sum(nil), p.Hash()) {
188                 err = errors.New("piece incomplete")
189                 os.Remove(completedPiecePath)
190                 return
191         }
192         os.Remove(incompletePiecePath)
193         me.completed[hex.EncodeToString(p.Hash())] = struct{}{}
194         return
195 }