"os"
"testing"
- "github.com/stretchr/testify/assert"
+ qt "github.com/frankban/quicktest"
+
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/internal/testutil"
t.Fatalf("removing torrent dummy data dir: %v", err)
}
}()
+ logErr := func(f func() error, msg string) {
+ err := f()
+ t.Logf("%s: %v", msg, err)
+ if err != nil {
+ t.Fail()
+ }
+ }
cfg := TestingConfig(t)
cfg.Seed = false
cfg.Debug = true
cfg.DataDir = dir
comp, err := storage.NewBoltPieceCompletion(dir)
- require.NoError(t, err)
- defer comp.Close()
- cfg.DefaultStorage = storage.NewMMapWithCompletion(dir, comp)
+ c := qt.New(t)
+ c.Assert(err, qt.IsNil)
+ defer logErr(comp.Close, "closing bolt piece completion")
+ mmapStorage := storage.NewMMapWithCompletion(dir, comp)
+ defer logErr(mmapStorage.Close, "closing mmap storage")
+ cfg.DefaultStorage = mmapStorage
cl, err := NewClient(cfg)
- require.NoError(t, err)
- defer cl.Close()
+ c.Assert(err, qt.IsNil)
+ defer logErr(cl.Close, "closing client")
tor, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
- require.NoError(t, err)
- assert.True(t, new)
- require.True(t, cl.WaitAll())
+ c.Assert(err, qt.IsNil)
+ c.Assert(new, qt.IsTrue)
+ c.Assert(cl.WaitAll(), qt.IsTrue)
tor.Drop()
_, new, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
- require.NoError(t, err)
- assert.True(t, new)
- require.True(t, cl.WaitAll())
+ c.Assert(err, qt.IsNil)
+ c.Assert(new, qt.IsTrue)
+ c.Assert(cl.WaitAll(), qt.IsTrue)
}
}
// Stops the client. All connections to peers are closed and all activity will come to a halt.
-func (cl *Client) Close() (errs []error) {
+func (cl *Client) Close() error {
var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()
+ var errs []error
for _, t := range cl.torrents {
err := t.close(&closeGroup)
if err != nil {
cl.unlock()
cl.event.Broadcast()
closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
- return
+ return errors.Join(errs...)
}
func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
"os"
"path/filepath"
+ "github.com/anacrolix/torrent/storage"
+
"github.com/anacrolix/tagflag"
"github.com/edsrzf/mmap-go"
"github.com/anacrolix/torrent/mmap_span"
)
-func mmapFile(name string) (mm mmap.MMap, err error) {
+func mmapFile(name string) (mm storage.FileMapping, err error) {
f, err := os.Open(name)
if err != nil {
return
}
- defer f.Close()
+ defer func() {
+ if err != nil {
+ f.Close()
+ }
+ }()
fi, err := f.Stat()
if err != nil {
return
if fi.Size() == 0 {
return
}
- return mmap.MapRegion(f, -1, mmap.RDONLY, mmap.COPY, 0)
+ reg, err := mmap.MapRegion(f, -1, mmap.RDONLY, mmap.COPY, 0)
+ if err != nil {
+ return
+ }
+ return storage.WrapFileMapping(reg, f), nil
}
func verifyTorrent(info *metainfo.Info, root string) error {
if err != nil {
return err
}
- if int64(len(mm)) != file.Length {
+ if int64(len(mm.Bytes())) != file.Length {
return fmt.Errorf("file %q has wrong length", filename)
}
span.Append(mm)
"io"
"sync"
- "github.com/edsrzf/mmap-go"
-
"github.com/anacrolix/torrent/segments"
)
+type Mmap interface {
+ Flush() error
+ Unmap() error
+ Bytes() []byte
+}
+
type MMapSpan struct {
mu sync.RWMutex
- mMaps []mmap.MMap
+ mMaps []Mmap
segmentLocater segments.Index
}
-func (ms *MMapSpan) Append(mMap mmap.MMap) {
+func (ms *MMapSpan) Append(mMap Mmap) {
ms.mMaps = append(ms.mMaps, mMap)
}
if i == len(me.mMaps) {
return -1, false
}
- l := int64(len(me.mMaps[i]))
+ l := int64(len(me.mMaps[i].Bytes()))
i++
return l, true
})
func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
- mMapBytes := ms.mMaps[i][e.Start:]
+ mMapBytes := ms.mMaps[i].Bytes()[e.Start:]
// log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
_n := copyBytes(copyArgs(p, mMapBytes))
p = p[_n:]
func TestShortFile(t *testing.T) {
td := t.TempDir()
s := NewFile(td)
+ defer s.Close()
info := &metainfo.Info{
Name: "a",
Length: 2,
}
t1, err := c.OpenTorrent(i1, metainfo.HashBytes([]byte("a")))
require.NoError(t, err)
+ defer t1.Close()
i2 := &metainfo.Info{
Files: []metainfo.FileInfo{{Path: []string{"a"}}},
Pieces: make([]byte, 20),
}
t2, err := c.OpenTorrent(i2, metainfo.HashBytes([]byte("b")))
require.NoError(t, err)
+ defer t2.Close()
t2p := t2.Piece(i2.Piece(0))
assert.NoError(t, t1.Close())
assert.NotPanics(t, func() { t2p.Completion() })
func TestIssue95File(t *testing.T) {
td := t.TempDir()
- testIssue95(t, NewFile(td))
+ cs := NewFile(td)
+ defer cs.Close()
+ testIssue95(t, cs)
}
func TestIssue95MMap(t *testing.T) {
td := t.TempDir()
- testIssue95(t, NewMMap(td))
+ cs := NewMMap(td)
+ defer cs.Close()
+ testIssue95(t, cs)
}
func TestIssue95ResourcePieces(t *testing.T) {
func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImplCloser) {
td := t.TempDir()
- cs := NewClient(csf(td))
+ cic := csf(td)
+ defer cic.Close()
+ cs := NewClient(cic)
info := &metainfo.Info{
PieceLength: 1,
Files: []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}},
return
}
fileName := filepath.Join(location, safeName)
- var mm mmap.MMap
+ var mm FileMapping
mm, err = mmapFile(fileName, miFile.Length)
if err != nil {
err = fmt.Errorf("file %q: %s", miFile.DisplayPath(md), err)
return
}
- if mm != nil {
- mms.Append(mm)
- }
+ mms.Append(mm)
}
mms.InitIndex()
return
}
-func mmapFile(name string, size int64) (ret mmap.MMap, err error) {
+func mmapFile(name string, size int64) (_ FileMapping, err error) {
dir := filepath.Dir(name)
err = os.MkdirAll(dir, 0o750)
if err != nil {
if err != nil {
return
}
- defer file.Close()
+ defer func() {
+ if err != nil {
+ file.Close()
+ }
+ }()
var fi os.FileInfo
fi, err = file.Stat()
if err != nil {
return
}
}
- if size == 0 {
- // Can't mmap() regions with length 0.
- return
- }
- intLen := int(size)
- if int64(intLen) != size {
- err = errors.New("size too large for system")
+ return func() (ret mmapWithFile, err error) {
+ ret.f = file
+ if size == 0 {
+ // Can't mmap() regions with length 0.
+ return
+ }
+ intLen := int(size)
+ if int64(intLen) != size {
+ err = errors.New("size too large for system")
+ return
+ }
+ ret.mmap, err = mmap.MapRegion(file, intLen, mmap.RDWR, 0, 0)
+ if err != nil {
+ err = fmt.Errorf("error mapping region: %s", err)
+ return
+ }
+ if int64(len(ret.mmap)) != size {
+ panic(len(ret.mmap))
+ }
return
+ }()
+}
+
+// Combines a mmapped region and file into a storage Mmap abstraction, which handles closing the
+// mmap file handle.
+func WrapFileMapping(region mmap.MMap, file *os.File) FileMapping {
+ return mmapWithFile{
+ f: file,
+ mmap: region,
}
- ret, err = mmap.MapRegion(file, intLen, mmap.RDWR, 0, 0)
- if err != nil {
- err = fmt.Errorf("error mapping region: %s", err)
- return
+}
+
+type FileMapping = mmap_span.Mmap
+
+// Handles closing the mmap's file handle (needed for Windows). Could be implemented differently by
+// OS.
+type mmapWithFile struct {
+ f *os.File
+ mmap mmap.MMap
+}
+
+func (m mmapWithFile) Flush() error {
+ return m.mmap.Flush()
+}
+
+func (m mmapWithFile) Unmap() (err error) {
+ if m.mmap != nil {
+ err = m.mmap.Unmap()
}
- if int64(len(ret)) != size {
- panic(len(ret))
+ return errors.Join(err, m.f.Close())
+}
+
+func (m mmapWithFile) Bytes() []byte {
+ if m.mmap == nil {
+ return nil
}
- return
+ return m.mmap
}
--- /dev/null
+package storage
+
+import (
+ "testing"
+
+ qt "github.com/frankban/quicktest"
+
+ "github.com/anacrolix/torrent/internal/testutil"
+)
+
+func TestMmapWindows(t *testing.T) {
+ c := qt.New(t)
+ dir, mi := testutil.GreetingTestTorrent()
+ cs := NewMMap(dir)
+ defer func() {
+ c.Check(cs.Close(), qt.IsNil)
+ }()
+ info, err := mi.UnmarshalInfo()
+ c.Assert(err, qt.IsNil)
+ ts, err := cs.OpenTorrent(&info, mi.HashInfoBytes())
+ c.Assert(err, qt.IsNil)
+ defer func() {
+ c.Check(ts.Close(), qt.IsNil)
+ }()
+}
defer wg.Wait()
t.cl.lock()
defer t.cl.unlock()
- t.cl.dropTorrent(t.infoHash, &wg)
+ err := t.cl.dropTorrent(t.infoHash, &wg)
+ if err != nil {
+ panic(err)
+ }
}
// Number of bytes of the entire torrent we have completed. This is the sum of