]> Sergey Matveev's repositories - btrtrc.git/blob - mmap_span/mmap_span.go
Fix overflow in average download rate
[btrtrc.git] / mmap_span / mmap_span.go
1 package mmap_span
2
3 import (
4         "fmt"
5         "io"
6         "sync"
7
8         "github.com/edsrzf/mmap-go"
9
10         "github.com/anacrolix/torrent/segments"
11 )
12
13 type MMapSpan struct {
14         mu             sync.RWMutex
15         mMaps          []mmap.MMap
16         segmentLocater segments.Index
17 }
18
19 func (ms *MMapSpan) Append(mMap mmap.MMap) {
20         ms.mMaps = append(ms.mMaps, mMap)
21 }
22
23 func (ms *MMapSpan) Flush() (errs []error) {
24         ms.mu.RLock()
25         defer ms.mu.RUnlock()
26         for _, mMap := range ms.mMaps {
27                 err := mMap.Flush()
28                 if err != nil {
29                         errs = append(errs, err)
30                 }
31         }
32         return
33 }
34
35 func (ms *MMapSpan) Close() (errs []error) {
36         ms.mu.Lock()
37         defer ms.mu.Unlock()
38         for _, mMap := range ms.mMaps {
39                 err := mMap.Unmap()
40                 if err != nil {
41                         errs = append(errs, err)
42                 }
43         }
44         // This is for issue 211.
45         ms.mMaps = nil
46         ms.InitIndex()
47         return
48 }
49
50 func (me *MMapSpan) InitIndex() {
51         i := 0
52         me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
53                 if i == len(me.mMaps) {
54                         return -1, false
55                 }
56                 l := int64(len(me.mMaps[i]))
57                 i++
58                 return l, true
59         })
60         // log.Printf("made mmapspan index: %v", me.segmentLocater)
61 }
62
63 func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
64         // log.Printf("reading %v bytes at %v", len(p), off)
65         ms.mu.RLock()
66         defer ms.mu.RUnlock()
67         n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
68         if n != len(p) {
69                 err = io.EOF
70         }
71         return
72 }
73
74 func copyBytes(dst, src []byte) int {
75         return copy(dst, src)
76 }
77
78 func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
79         ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
80                 mMapBytes := ms.mMaps[i][e.Start:]
81                 // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
82                 _n := copyBytes(copyArgs(p, mMapBytes))
83                 p = p[_n:]
84                 n += _n
85
86                 if segments.Int(_n) != e.Length {
87                         panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
88                 }
89                 return true
90         })
91         return
92 }
93
94 func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
95         // log.Printf("writing %v bytes at %v", len(p), off)
96         ms.mu.RLock()
97         defer ms.mu.RUnlock()
98         n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
99         if n != len(p) {
100                 err = io.ErrShortWrite
101         }
102         return
103 }