package mmap_span
import (
+ "fmt"
"io"
"log"
"sync"
+ "github.com/anacrolix/torrent/segments"
"github.com/edsrzf/mmap-go"
)
-type segment struct {
- *mmap.MMap
-}
-
-func (s segment) Size() int64 {
- return int64(len(*s.MMap))
-}
-
type MMapSpan struct {
- mu sync.RWMutex
- span
+ mu sync.RWMutex
+ mMaps []mmap.MMap
+ segmentLocater segments.Index
}
-func (ms *MMapSpan) Append(mmap mmap.MMap) {
- ms.span = append(ms.span, segment{&mmap})
+func (ms *MMapSpan) Append(mMap mmap.MMap) {
+ ms.mMaps = append(ms.mMaps, mMap)
}
-func (ms *MMapSpan) Close() error {
+func (ms *MMapSpan) Close() (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- for _, mMap := range ms.span {
- err := mMap.(segment).Unmap()
+ for _, mMap := range ms.mMaps {
+ err := mMap.Unmap()
if err != nil {
- log.Print(err)
+ errs = append(errs, err)
}
}
- return nil
+ // This is for issue 211.
+ ms.mMaps = nil
+ ms.InitIndex()
+ return
}
-func (ms *MMapSpan) Size() (ret int64) {
+func (me *MMapSpan) InitIndex() {
+ i := 0
+ me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
+ if i == len(me.mMaps) {
+ return -1, false
+ }
+ l := int64(len(me.mMaps[i]))
+ i++
+ return l, true
+ })
+ //log.Printf("made mmapspan index: %v", me.segmentLocater)
+}
+
+func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
+ //log.Printf("reading %v bytes at %v", len(p), off)
ms.mu.RLock()
defer ms.mu.RUnlock()
- for _, seg := range ms.span {
- ret += seg.Size()
+ n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
+ if n != len(p) {
+ err = io.EOF
}
return
}
-func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
- ms.mu.RLock()
- defer ms.mu.RUnlock()
- ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
- _n := copy(p, (*interval.(segment).MMap)[intervalOffset:])
+func copyBytes(dst, src []byte) int {
+ return copy(dst, src)
+}
+
+func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
+ ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+ mMapBytes := ms.mMaps[i][e.Start:]
+ //log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
+ _n := copyBytes(copyArgs(p, mMapBytes))
p = p[_n:]
n += _n
- return len(p) == 0
+ if segments.Int(_n) != e.Length {
+ panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
+ }
+ return true
})
- if len(p) != 0 {
- err = io.EOF
- }
return
}
func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
+ log.Printf("writing %v bytes at %v", len(p), off)
ms.mu.RLock()
defer ms.mu.RUnlock()
- ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
- mMap := i.(segment)
- _n := copy((*mMap.MMap)[iOff:], p)
- // err = mMap.Sync(gommap.MS_ASYNC)
- // if err != nil {
- // return true
- // }
- p = p[_n:]
- n += _n
- return len(p) == 0
- })
- if err != nil && len(p) != 0 {
+ n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
+ if n != len(p) {
err = io.ErrShortWrite
}
return
+++ /dev/null
-package mmap_span
-
-type sizer interface {
- Size() int64
-}
-
-type span []sizer
-
-func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
- for _, interval := range s {
- iSize := interval.Size()
- if off >= iSize {
- off -= iSize
- } else {
- if f(off, interval) {
- return
- }
- off = 0
- }
- }
-}
--- /dev/null
+package segments
+
+import (
+ "sort"
+)
+
+func NewIndex(segments LengthIter) (ret Index) {
+ var start Length
+ for l, ok := segments(); ok; l, ok = segments() {
+ ret.segments = append(ret.segments, Extent{start, l})
+ start += l
+ }
+ return
+}
+
+type Index struct {
+ segments []Extent
+}
+
+func (me Index) iterSegments() func() (Length, bool) {
+ return func() (Length, bool) {
+ if len(me.segments) == 0 {
+ return 0, false
+ } else {
+ l := me.segments[0].Length
+ me.segments = me.segments[1:]
+ return l, true
+ }
+ }
+}
+
+func (me Index) Locate(e Extent, output Callback) {
+ first := sort.Search(len(me.segments), func(i int) bool {
+ _e := me.segments[i]
+ return _e.End() > e.Start
+ })
+ if first == len(me.segments) {
+ return
+ }
+ e.Start -= me.segments[first].Start
+ me.segments = me.segments[first:]
+ Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+ return output(i+first, e)
+ })
+}
--- /dev/null
+package segments
+
+type Int = int64
+
+type Length = Int
+
+func min(i Int, rest ...Int) Int {
+ ret := i
+ for _, i := range rest {
+ if i < ret {
+ ret = i
+ }
+ }
+ return ret
+}
+
+type Extent struct {
+ Start, Length Int
+}
+
+func (e Extent) End() Int {
+ return e.Start + e.Length
+}
+
+type (
+ Callback = func(int, Extent) bool
+ LengthIter = func() (Length, bool)
+)
+
+func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
+ i := 0
+ for needle.Length != 0 {
+ l, ok := haystack()
+ if !ok {
+ return
+ }
+ if needle.Start < l || needle.Start == l && l == 0 {
+ e1 := Extent{
+ Start: needle.Start,
+ Length: min(l, needle.End()) - needle.Start,
+ }
+ if e1.Length >= 0 {
+ if !callback(i, e1) {
+ return
+ }
+ needle.Start -= e1.Length
+ needle.Length -= e1.Length
+ }
+ } else {
+ needle.Start -= l
+ }
+ i++
+ }
+}
+
+func LocaterFromLengthIter(li LengthIter) Locater {
+ return func(e Extent, c Callback) {
+ Scan(li, e, c)
+ }
+}
+
+type Locater func(Extent, Callback)
--- /dev/null
+package segments
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func LengthIterFromSlice(ls []Length) LengthIter {
+ return func() (Length, bool) {
+ switch len(ls) {
+ case 0:
+ return -1, false
+ default:
+ l := ls[0]
+ ls = ls[1:]
+ return l, true
+ }
+ }
+}
+
+type ScanCallbackValue struct {
+ Index int
+ Extent
+}
+
+type collectExtents []ScanCallbackValue
+
+func (me *collectExtents) scanCallback(i int, e Extent) bool {
+ *me = append(*me, ScanCallbackValue{
+ Index: i,
+ Extent: e,
+ })
+ return true
+}
+
+type newLocater func(LengthIter) Locater
+
+func assertLocate(t *testing.T, nl newLocater, ls []Length, needle Extent, firstExpectedIndex int, expectedExtents []Extent) {
+ var actual collectExtents
+ var expected collectExtents
+ for i, e := range expectedExtents {
+ expected.scanCallback(firstExpectedIndex+i, e)
+ }
+ nl(LengthIterFromSlice(ls))(needle, actual.scanCallback)
+ assert.EqualValues(t, expected, actual)
+}
+
+func testLocater(t *testing.T, newLocater newLocater) {
+ assertLocate(t, newLocater,
+ []Length{1, 0, 2, 0, 3},
+ Extent{2, 2},
+ 2,
+ []Extent{{1, 1}, {0, 0}, {0, 1}})
+ assertLocate(t, newLocater,
+ []Length{1, 0, 2, 0, 3},
+ Extent{6, 2},
+ 2,
+ []Extent{})
+}
+
+func TestScan(t *testing.T) {
+ testLocater(t, LocaterFromLengthIter)
+}
+
+func TestIndex(t *testing.T) {
+ testLocater(t, func(li LengthIter) Locater {
+ return NewIndex(li).Locate
+ })
+}
}
func (ts *mmapTorrentStorage) Close() error {
- return ts.span.Close()
+ errs := ts.span.Close()
+ if len(errs) > 0 {
+ return errs[0]
+ }
+ return nil
}
type mmapStoragePiece struct {
mms.Append(mm)
}
}
+ mms.InitIndex()
return
}