]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Abstract out segments mapping and use it in mmap storage
authorMatt Joiner <anacrolix@gmail.com>
Sun, 31 May 2020 11:00:19 +0000 (21:00 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 31 May 2020 11:00:19 +0000 (21:00 +1000)
mmap_span/mmap_span.go
mmap_span/span.go [deleted file]
segments/index.go [new file with mode: 0644]
segments/segments.go [new file with mode: 0644]
segments/segments_test.go [new file with mode: 0644]
storage/mmap.go

index 21f6bd26b76a93b5229a895f69de404b041054b9..c698a89c1f2595ea1c2897404de85b73c4376731 100644 (file)
@@ -1,81 +1,89 @@
 package mmap_span
 
 import (
+       "fmt"
        "io"
        "log"
        "sync"
 
+       "github.com/anacrolix/torrent/segments"
        "github.com/edsrzf/mmap-go"
 )
 
-type segment struct {
-       *mmap.MMap
-}
-
-func (s segment) Size() int64 {
-       return int64(len(*s.MMap))
-}
-
 type MMapSpan struct {
-       mu sync.RWMutex
-       span
+       mu             sync.RWMutex
+       mMaps          []mmap.MMap
+       segmentLocater segments.Index
 }
 
-func (ms *MMapSpan) Append(mmap mmap.MMap) {
-       ms.span = append(ms.span, segment{&mmap})
+func (ms *MMapSpan) Append(mMap mmap.MMap) {
+       ms.mMaps = append(ms.mMaps, mMap)
 }
 
-func (ms *MMapSpan) Close() error {
+func (ms *MMapSpan) Close() (errs []error) {
        ms.mu.Lock()
        defer ms.mu.Unlock()
-       for _, mMap := range ms.span {
-               err := mMap.(segment).Unmap()
+       for _, mMap := range ms.mMaps {
+               err := mMap.Unmap()
                if err != nil {
-                       log.Print(err)
+                       errs = append(errs, err)
                }
        }
-       return nil
+       // This is for issue 211.
+       ms.mMaps = nil
+       ms.InitIndex()
+       return
 }
 
-func (ms *MMapSpan) Size() (ret int64) {
+func (me *MMapSpan) InitIndex() {
+       i := 0
+       me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
+               if i == len(me.mMaps) {
+                       return -1, false
+               }
+               l := int64(len(me.mMaps[i]))
+               i++
+               return l, true
+       })
+       //log.Printf("made mmapspan index: %v", me.segmentLocater)
+}
+
+func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
+       //log.Printf("reading %v bytes at %v", len(p), off)
        ms.mu.RLock()
        defer ms.mu.RUnlock()
-       for _, seg := range ms.span {
-               ret += seg.Size()
+       n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
+       if n != len(p) {
+               err = io.EOF
        }
        return
 }
 
-func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
-       ms.mu.RLock()
-       defer ms.mu.RUnlock()
-       ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
-               _n := copy(p, (*interval.(segment).MMap)[intervalOffset:])
+func copyBytes(dst, src []byte) int {
+       return copy(dst, src)
+}
+
+func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
+       ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+               mMapBytes := ms.mMaps[i][e.Start:]
+               //log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
+               _n := copyBytes(copyArgs(p, mMapBytes))
                p = p[_n:]
                n += _n
-               return len(p) == 0
+               if segments.Int(_n) != e.Length {
+                       panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
+               }
+               return true
        })
-       if len(p) != 0 {
-               err = io.EOF
-       }
        return
 }
 
 func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
+       log.Printf("writing %v bytes at %v", len(p), off)
        ms.mu.RLock()
        defer ms.mu.RUnlock()
-       ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
-               mMap := i.(segment)
-               _n := copy((*mMap.MMap)[iOff:], p)
-               // err = mMap.Sync(gommap.MS_ASYNC)
-               // if err != nil {
-               //      return true
-               // }
-               p = p[_n:]
-               n += _n
-               return len(p) == 0
-       })
-       if err != nil && len(p) != 0 {
+       n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
+       if n != len(p) {
                err = io.ErrShortWrite
        }
        return
diff --git a/mmap_span/span.go b/mmap_span/span.go
deleted file mode 100644 (file)
index 141ea99..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package mmap_span
-
-type sizer interface {
-       Size() int64
-}
-
-type span []sizer
-
-func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
-       for _, interval := range s {
-               iSize := interval.Size()
-               if off >= iSize {
-                       off -= iSize
-               } else {
-                       if f(off, interval) {
-                               return
-                       }
-                       off = 0
-               }
-       }
-}
diff --git a/segments/index.go b/segments/index.go
new file mode 100644 (file)
index 0000000..c469734
--- /dev/null
@@ -0,0 +1,45 @@
+package segments
+
+import (
+       "sort"
+)
+
+func NewIndex(segments LengthIter) (ret Index) {
+       var start Length
+       for l, ok := segments(); ok; l, ok = segments() {
+               ret.segments = append(ret.segments, Extent{start, l})
+               start += l
+       }
+       return
+}
+
+type Index struct {
+       segments []Extent
+}
+
+func (me Index) iterSegments() func() (Length, bool) {
+       return func() (Length, bool) {
+               if len(me.segments) == 0 {
+                       return 0, false
+               } else {
+                       l := me.segments[0].Length
+                       me.segments = me.segments[1:]
+                       return l, true
+               }
+       }
+}
+
+func (me Index) Locate(e Extent, output Callback) {
+       first := sort.Search(len(me.segments), func(i int) bool {
+               _e := me.segments[i]
+               return _e.End() > e.Start
+       })
+       if first == len(me.segments) {
+               return
+       }
+       e.Start -= me.segments[first].Start
+       me.segments = me.segments[first:]
+       Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+               return output(i+first, e)
+       })
+}
diff --git a/segments/segments.go b/segments/segments.go
new file mode 100644 (file)
index 0000000..97b611d
--- /dev/null
@@ -0,0 +1,62 @@
+package segments
+
+type Int = int64
+
+type Length = Int
+
+func min(i Int, rest ...Int) Int {
+       ret := i
+       for _, i := range rest {
+               if i < ret {
+                       ret = i
+               }
+       }
+       return ret
+}
+
+type Extent struct {
+       Start, Length Int
+}
+
+func (e Extent) End() Int {
+       return e.Start + e.Length
+}
+
+type (
+       Callback   = func(int, Extent) bool
+       LengthIter = func() (Length, bool)
+)
+
+func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
+       i := 0
+       for needle.Length != 0 {
+               l, ok := haystack()
+               if !ok {
+                       return
+               }
+               if needle.Start < l || needle.Start == l && l == 0 {
+                       e1 := Extent{
+                               Start:  needle.Start,
+                               Length: min(l, needle.End()) - needle.Start,
+                       }
+                       if e1.Length >= 0 {
+                               if !callback(i, e1) {
+                                       return
+                               }
+                               needle.Start -= e1.Length
+                               needle.Length -= e1.Length
+                       }
+               } else {
+                       needle.Start -= l
+               }
+               i++
+       }
+}
+
+func LocaterFromLengthIter(li LengthIter) Locater {
+       return func(e Extent, c Callback) {
+               Scan(li, e, c)
+       }
+}
+
+type Locater func(Extent, Callback)
diff --git a/segments/segments_test.go b/segments/segments_test.go
new file mode 100644 (file)
index 0000000..836fdbe
--- /dev/null
@@ -0,0 +1,70 @@
+package segments
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func LengthIterFromSlice(ls []Length) LengthIter {
+       return func() (Length, bool) {
+               switch len(ls) {
+               case 0:
+                       return -1, false
+               default:
+                       l := ls[0]
+                       ls = ls[1:]
+                       return l, true
+               }
+       }
+}
+
+type ScanCallbackValue struct {
+       Index int
+       Extent
+}
+
+type collectExtents []ScanCallbackValue
+
+func (me *collectExtents) scanCallback(i int, e Extent) bool {
+       *me = append(*me, ScanCallbackValue{
+               Index:  i,
+               Extent: e,
+       })
+       return true
+}
+
+type newLocater func(LengthIter) Locater
+
+func assertLocate(t *testing.T, nl newLocater, ls []Length, needle Extent, firstExpectedIndex int, expectedExtents []Extent) {
+       var actual collectExtents
+       var expected collectExtents
+       for i, e := range expectedExtents {
+               expected.scanCallback(firstExpectedIndex+i, e)
+       }
+       nl(LengthIterFromSlice(ls))(needle, actual.scanCallback)
+       assert.EqualValues(t, expected, actual)
+}
+
+func testLocater(t *testing.T, newLocater newLocater) {
+       assertLocate(t, newLocater,
+               []Length{1, 0, 2, 0, 3},
+               Extent{2, 2},
+               2,
+               []Extent{{1, 1}, {0, 0}, {0, 1}})
+       assertLocate(t, newLocater,
+               []Length{1, 0, 2, 0, 3},
+               Extent{6, 2},
+               2,
+               []Extent{})
+}
+
+func TestScan(t *testing.T) {
+       testLocater(t, LocaterFromLengthIter)
+}
+
+func TestIndex(t *testing.T) {
+       testLocater(t, func(li LengthIter) Locater {
+               return NewIndex(li).Locate
+       })
+}
index aebf1094ce7e78c39300502c35c2e9f09b3bbaae..f811e24caa8a21c517e29882daf8714e88862c89 100644 (file)
@@ -61,7 +61,11 @@ func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
 }
 
 func (ts *mmapTorrentStorage) Close() error {
-       return ts.span.Close()
+       errs := ts.span.Close()
+       if len(errs) > 0 {
+               return errs[0]
+       }
+       return nil
 }
 
 type mmapStoragePiece struct {
@@ -113,6 +117,7 @@ func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e
                        mms.Append(mm)
                }
        }
+       mms.InitIndex()
        return
 }