From 7fec6785bfcd05f8a2f7dfb3df75dfd8f2487e13 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 31 May 2020 21:00:19 +1000 Subject: [PATCH] Abstract out segments mapping and use it in mmap storage --- mmap_span/mmap_span.go | 90 +++++++++++++++++++++------------------ mmap_span/span.go | 21 --------- segments/index.go | 45 ++++++++++++++++++++ segments/segments.go | 62 +++++++++++++++++++++++++++ segments/segments_test.go | 70 ++++++++++++++++++++++++++++++ storage/mmap.go | 7 ++- 6 files changed, 232 insertions(+), 63 deletions(-) delete mode 100644 mmap_span/span.go create mode 100644 segments/index.go create mode 100644 segments/segments.go create mode 100644 segments/segments_test.go diff --git a/mmap_span/mmap_span.go b/mmap_span/mmap_span.go index 21f6bd26..c698a89c 100644 --- a/mmap_span/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -1,81 +1,89 @@ package mmap_span import ( + "fmt" "io" "log" "sync" + "github.com/anacrolix/torrent/segments" "github.com/edsrzf/mmap-go" ) -type segment struct { - *mmap.MMap -} - -func (s segment) Size() int64 { - return int64(len(*s.MMap)) -} - type MMapSpan struct { - mu sync.RWMutex - span + mu sync.RWMutex + mMaps []mmap.MMap + segmentLocater segments.Index } -func (ms *MMapSpan) Append(mmap mmap.MMap) { - ms.span = append(ms.span, segment{&mmap}) +func (ms *MMapSpan) Append(mMap mmap.MMap) { + ms.mMaps = append(ms.mMaps, mMap) } -func (ms *MMapSpan) Close() error { +func (ms *MMapSpan) Close() (errs []error) { ms.mu.Lock() defer ms.mu.Unlock() - for _, mMap := range ms.span { - err := mMap.(segment).Unmap() + for _, mMap := range ms.mMaps { + err := mMap.Unmap() if err != nil { - log.Print(err) + errs = append(errs, err) } } - return nil + // This is for issue 211. + ms.mMaps = nil + ms.InitIndex() + return } -func (ms *MMapSpan) Size() (ret int64) { +func (me *MMapSpan) InitIndex() { + i := 0 + me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) { + if i == len(me.mMaps) { + return -1, false + } + l := int64(len(me.mMaps[i])) + i++ + return l, true + }) + //log.Printf("made mmapspan index: %v", me.segmentLocater) +} + +func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { + //log.Printf("reading %v bytes at %v", len(p), off) ms.mu.RLock() defer ms.mu.RUnlock() - for _, seg := range ms.span { - ret += seg.Size() + n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off) + if n != len(p) { + err = io.EOF } return } -func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { - ms.mu.RLock() - defer ms.mu.RUnlock() - ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) { - _n := copy(p, (*interval.(segment).MMap)[intervalOffset:]) +func copyBytes(dst, src []byte) int { + return copy(dst, src) +} + +func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) { + ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { + mMapBytes := ms.mMaps[i][e.Start:] + //log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes)) + _n := copyBytes(copyArgs(p, mMapBytes)) p = p[_n:] n += _n - return len(p) == 0 + if segments.Int(_n) != e.Length { + panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length)) + } + return true }) - if len(p) != 0 { - err = io.EOF - } return } func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) { + log.Printf("writing %v bytes at %v", len(p), off) ms.mu.RLock() defer ms.mu.RUnlock() - ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) { - mMap := i.(segment) - _n := copy((*mMap.MMap)[iOff:], p) - // err = mMap.Sync(gommap.MS_ASYNC) - // if err != nil { - // return true - // } - p = p[_n:] - n += _n - return len(p) == 0 - }) - if err != nil && len(p) != 0 { + n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off) + if n != len(p) { err = io.ErrShortWrite } return diff --git a/mmap_span/span.go b/mmap_span/span.go deleted file mode 100644 index 141ea990..00000000 --- a/mmap_span/span.go +++ /dev/null @@ -1,21 +0,0 @@ -package mmap_span - -type sizer interface { - Size() int64 -} - -type span []sizer - -func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) { - for _, interval := range s { - iSize := interval.Size() - if off >= iSize { - off -= iSize - } else { - if f(off, interval) { - return - } - off = 0 - } - } -} diff --git a/segments/index.go b/segments/index.go new file mode 100644 index 00000000..c469734d --- /dev/null +++ b/segments/index.go @@ -0,0 +1,45 @@ +package segments + +import ( + "sort" +) + +func NewIndex(segments LengthIter) (ret Index) { + var start Length + for l, ok := segments(); ok; l, ok = segments() { + ret.segments = append(ret.segments, Extent{start, l}) + start += l + } + return +} + +type Index struct { + segments []Extent +} + +func (me Index) iterSegments() func() (Length, bool) { + return func() (Length, bool) { + if len(me.segments) == 0 { + return 0, false + } else { + l := me.segments[0].Length + me.segments = me.segments[1:] + return l, true + } + } +} + +func (me Index) Locate(e Extent, output Callback) { + first := sort.Search(len(me.segments), func(i int) bool { + _e := me.segments[i] + return _e.End() > e.Start + }) + if first == len(me.segments) { + return + } + e.Start -= me.segments[first].Start + me.segments = me.segments[first:] + Scan(me.iterSegments(), e, func(i int, e Extent) bool { + return output(i+first, e) + }) +} diff --git a/segments/segments.go b/segments/segments.go new file mode 100644 index 00000000..97b611da --- /dev/null +++ b/segments/segments.go @@ -0,0 +1,62 @@ +package segments + +type Int = int64 + +type Length = Int + +func min(i Int, rest ...Int) Int { + ret := i + for _, i := range rest { + if i < ret { + ret = i + } + } + return ret +} + +type Extent struct { + Start, Length Int +} + +func (e Extent) End() Int { + return e.Start + e.Length +} + +type ( + Callback = func(int, Extent) bool + LengthIter = func() (Length, bool) +) + +func Scan(haystack func() (Length, bool), needle Extent, callback Callback) { + i := 0 + for needle.Length != 0 { + l, ok := haystack() + if !ok { + return + } + if needle.Start < l || needle.Start == l && l == 0 { + e1 := Extent{ + Start: needle.Start, + Length: min(l, needle.End()) - needle.Start, + } + if e1.Length >= 0 { + if !callback(i, e1) { + return + } + needle.Start -= e1.Length + needle.Length -= e1.Length + } + } else { + needle.Start -= l + } + i++ + } +} + +func LocaterFromLengthIter(li LengthIter) Locater { + return func(e Extent, c Callback) { + Scan(li, e, c) + } +} + +type Locater func(Extent, Callback) diff --git a/segments/segments_test.go b/segments/segments_test.go new file mode 100644 index 00000000..836fdbe0 --- /dev/null +++ b/segments/segments_test.go @@ -0,0 +1,70 @@ +package segments + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func LengthIterFromSlice(ls []Length) LengthIter { + return func() (Length, bool) { + switch len(ls) { + case 0: + return -1, false + default: + l := ls[0] + ls = ls[1:] + return l, true + } + } +} + +type ScanCallbackValue struct { + Index int + Extent +} + +type collectExtents []ScanCallbackValue + +func (me *collectExtents) scanCallback(i int, e Extent) bool { + *me = append(*me, ScanCallbackValue{ + Index: i, + Extent: e, + }) + return true +} + +type newLocater func(LengthIter) Locater + +func assertLocate(t *testing.T, nl newLocater, ls []Length, needle Extent, firstExpectedIndex int, expectedExtents []Extent) { + var actual collectExtents + var expected collectExtents + for i, e := range expectedExtents { + expected.scanCallback(firstExpectedIndex+i, e) + } + nl(LengthIterFromSlice(ls))(needle, actual.scanCallback) + assert.EqualValues(t, expected, actual) +} + +func testLocater(t *testing.T, newLocater newLocater) { + assertLocate(t, newLocater, + []Length{1, 0, 2, 0, 3}, + Extent{2, 2}, + 2, + []Extent{{1, 1}, {0, 0}, {0, 1}}) + assertLocate(t, newLocater, + []Length{1, 0, 2, 0, 3}, + Extent{6, 2}, + 2, + []Extent{}) +} + +func TestScan(t *testing.T) { + testLocater(t, LocaterFromLengthIter) +} + +func TestIndex(t *testing.T) { + testLocater(t, func(li LengthIter) Locater { + return NewIndex(li).Locate + }) +} diff --git a/storage/mmap.go b/storage/mmap.go index aebf1094..f811e24c 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -61,7 +61,11 @@ func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl { } func (ts *mmapTorrentStorage) Close() error { - return ts.span.Close() + errs := ts.span.Close() + if len(errs) > 0 { + return errs[0] + } + return nil } type mmapStoragePiece struct { @@ -113,6 +117,7 @@ func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e mms.Append(mm) } } + mms.InitIndex() return } -- 2.48.1