]> Sergey Matveev's repositories - btrtrc.git/blob - mmap_span/mmap_span.go
Do torrent storage flush on piece completion (#755)
[btrtrc.git] / mmap_span / mmap_span.go
1 package mmap_span
2
3 import (
4         "fmt"
5         "io"
6         "sync"
7
8         "github.com/anacrolix/torrent/segments"
9         "github.com/edsrzf/mmap-go"
10 )
11
12 type MMapSpan struct {
13         mu             sync.RWMutex
14         mMaps          []mmap.MMap
15         segmentLocater segments.Index
16 }
17
18 func (ms *MMapSpan) Append(mMap mmap.MMap) {
19         ms.mMaps = append(ms.mMaps, mMap)
20 }
21
22 func (ms *MMapSpan) Flush() (errs []error) {
23         ms.mu.RLock()
24         defer ms.mu.RUnlock()
25         for _, mMap := range ms.mMaps {
26                 err := mMap.Flush()
27                 if err != nil {
28                         errs = append(errs, err)
29                 }
30         }
31         return
32 }
33
34 func (ms *MMapSpan) Close() (errs []error) {
35         ms.mu.Lock()
36         defer ms.mu.Unlock()
37         for _, mMap := range ms.mMaps {
38                 err := mMap.Unmap()
39                 if err != nil {
40                         errs = append(errs, err)
41                 }
42         }
43         // This is for issue 211.
44         ms.mMaps = nil
45         ms.InitIndex()
46         return
47 }
48
49 func (me *MMapSpan) InitIndex() {
50         i := 0
51         me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
52                 if i == len(me.mMaps) {
53                         return -1, false
54                 }
55                 l := int64(len(me.mMaps[i]))
56                 i++
57                 return l, true
58         })
59         // log.Printf("made mmapspan index: %v", me.segmentLocater)
60 }
61
62 func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
63         // log.Printf("reading %v bytes at %v", len(p), off)
64         ms.mu.RLock()
65         defer ms.mu.RUnlock()
66         n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
67         if n != len(p) {
68                 err = io.EOF
69         }
70         return
71 }
72
73 func copyBytes(dst, src []byte) int {
74         return copy(dst, src)
75 }
76
77 func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
78         ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
79                 mMapBytes := ms.mMaps[i][e.Start:]
80                 // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
81                 _n := copyBytes(copyArgs(p, mMapBytes))
82                 p = p[_n:]
83                 n += _n
84
85                 if segments.Int(_n) != e.Length {
86                         panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
87                 }
88                 return true
89         })
90         return
91 }
92
93 func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
94         // log.Printf("writing %v bytes at %v", len(p), off)
95         ms.mu.RLock()
96         defer ms.mu.RUnlock()
97         n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
98         if n != len(p) {
99                 err = io.ErrShortWrite
100         }
101         return
102 }