}
}
-func (ms *MMapSpan) Flush() (errs []error) {
+func (ms *MMapSpan) Flush() (err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
for _, mMap := range ms.mMaps {
- err := mMap.Flush()
- if err != nil {
- errs = append(errs, err)
- }
+ err = errors.Join(err, mMap.Flush())
}
return
}
return p.t.storage.PieceWithHash(p.Info(), pieceHash)
}
-func (p *Piece) Flush() {
- if p.t.storage.Flush != nil {
- _ = p.t.storage.Flush()
- }
+func (p *Piece) Flush() error {
+ return p.Storage().Flush()
}
func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
- Flush: t.Flush,
}, nil
}
type classicFileIo struct{}
-func (i classicFileIo) openForSharedRead(name string) (sharedFileIf, error) {
- return sharedFiles.Open(name)
-}
-
-type classicFileReader struct {
- *os.File
+func (me classicFileIo) flush(name string, offset, nbytes int64) error {
+ return fsync(name)
}
-func (c classicFileReader) seekData(offset int64) (ret int64, err error) {
- return seekData(c.File, offset)
+func (me classicFileIo) openForSharedRead(name string) (sharedFileIf, error) {
+ return sharedFiles.Open(name)
}
-func (i classicFileIo) openForRead(name string) (fileReader, error) {
+func (me classicFileIo) openForRead(name string) (fileReader, error) {
f, err := os.Open(name)
return classicFileReader{f}, err
}
func (classicFileIo) openForWrite(p string, size int64) (f fileWriter, err error) {
return openFileExtra(p, os.O_WRONLY)
}
+
+type classicFileReader struct {
+ *os.File
+}
+
+func (c classicFileReader) seekData(offset int64) (ret int64, err error) {
+ return seekData(c.File, offset)
+}
)
type mmapFileIo struct {
- mu sync.Mutex
+ mu sync.RWMutex
paths map[string]*fileMmap
}
+func (me *mmapFileIo) flush(name string, offset, nbytes int64) error {
+ // Since we are only flushing writes that we created, and we don't currently unmap files after
+ // we've opened them, then if the mmap doesn't exist yet then there's nothing to flush.
+ me.mu.RLock()
+ defer me.mu.RUnlock()
+ v, ok := me.paths[name]
+ if !ok {
+ return nil
+ }
+ if !v.writable {
+ return nil
+ }
+ return msync(v.m, int(offset), int(nbytes))
+}
+
type fileMmap struct {
m mmap.MMap
writable bool
openForSharedRead(name string) (sharedFileIf, error)
openForRead(name string) (fileReader, error)
openForWrite(name string, size int64) (fileWriter, error)
+ flush(name string, offset, nbytes int64) error
}
package storage
import (
- "errors"
"io"
- "io/fs"
"os"
"path/filepath"
"sync"
func fsync(filePath string) (err error) {
f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm)
- if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ if err != nil {
return
}
defer f.Close()
PieceImpl
//PieceReaderer
io.WriterTo
+ Flusher
} = (*filePieceImpl)(nil)
+func (me *filePieceImpl) Flush() (err error) {
+ for fileIndex, extent := range me.fileExtents() {
+ file := me.t.file(fileIndex)
+ name := me.t.pathForWrite(&file)
+ err1 := me.t.io.flush(name, extent.Start, extent.Length)
+ if err1 != nil {
+ err = errors.Join(err, fmt.Errorf("flushing %q: %w", name, err1))
+ return
+ }
+ }
+ return nil
+}
+
func (me *filePieceImpl) logger() *slog.Logger {
return me.t.client.opts.Logger
}
}
}
+func (me *filePieceImpl) fileExtents() iter.Seq2[int, segments.Extent] {
+ return me.t.segmentLocater.LocateIter(me.extent())
+}
+
func (me *filePieceImpl) pieceFiles() iter.Seq[file] {
return func(yield func(file) bool) {
- for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
+ for fileIndex := range me.fileExtents() {
f := me.t.file(fileIndex)
if !yield(f) {
return
return nil
}
-func (fts *fileTorrentImpl) Flush() error {
- for i := range fts.files {
- f := fts.file(i)
- fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
- if err := fsync(fts.pathForWrite(&f)); err != nil {
- return err
- }
- }
- return nil
-}
-
func (fts *fileTorrentImpl) file(index int) file {
return file{
Info: fts.info,
// Preferred over PieceWithHash. Called with the piece hash if it's available.
PieceWithHash func(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl
Close func() error
- Flush func() error
// Storages that share the same space, will provide equal pointers. The function is called once
// to determine the storage for torrents sharing the same function pointer, and mutated in
// place.
Completion() Completion
}
+type Flusher interface {
+ Flush() error
+}
+
// Completion state of a piece.
type Completion struct {
Err error
span: span,
pc: s.pc,
}
- return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err
+ return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
}
func (s *mmapClientImpl) Close() error {
func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
return mmapStoragePiece{
- pc: ts.pc,
+ t: ts,
p: p,
- ih: ts.infoHash,
ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()),
WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()),
}
return ts.span.Close()
}
-func (ts *mmapTorrentStorage) Flush() error {
- errs := ts.span.Flush()
- if len(errs) > 0 {
- return errs[0]
- }
- return nil
-}
-
type mmapStoragePiece struct {
- pc PieceCompletionGetSetter
- p metainfo.Piece
- ih metainfo.Hash
+ t *mmapTorrentStorage
+ p metainfo.Piece
io.ReaderAt
io.WriterAt
}
+var _ Flusher = mmapStoragePiece{}
+
+func (me mmapStoragePiece) Flush() error {
+ // TODO: Flush just the regions of the files we care about. At least this is no worse than it
+ // was previously.
+ return me.t.span.Flush()
+}
+
func (me mmapStoragePiece) pieceKey() metainfo.PieceKey {
- return metainfo.PieceKey{me.ih, me.p.Index()}
+ return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
}
func (sp mmapStoragePiece) Completion() Completion {
- c, err := sp.pc.Get(sp.pieceKey())
+ c, err := sp.t.pc.Get(sp.pieceKey())
if err != nil {
panic(err)
}
}
func (sp mmapStoragePiece) MarkComplete() error {
- return sp.pc.Set(sp.pieceKey(), true)
+ return sp.t.pc.Set(sp.pieceKey(), true)
}
func (sp mmapStoragePiece) MarkNotComplete() error {
- return sp.pc.Set(sp.pieceKey(), false)
+ return sp.t.pc.Set(sp.pieceKey(), false)
}
func mMapTorrent(md *metainfo.Info, location string) (mms *mmapSpan.MMapSpan, err error) {
+++ /dev/null
-package storage
-
-import (
- "io"
- "os"
-)
-
-func seekData(f *os.File, offset int64) (ret int64, err error) {
- return f.Seek(offset, io.SeekStart)
-}
"io"
"os"
+ "github.com/edsrzf/mmap-go"
"golang.org/x/sys/unix"
)
}
return
}
+
+func msync(mm mmap.MMap, offset, nbytes int) error {
+ return unix.Msync(mm[offset:offset+nbytes], unix.MS_SYNC)
+}
--- /dev/null
+package storage
+
+import (
+ "io"
+ "os"
+
+ "github.com/edsrzf/mmap-go"
+)
+
+func seekData(f *os.File, offset int64) (ret int64, err error) {
+ return f.Seek(offset, io.SeekStart)
+}
+
+func msync(mm mmap.MMap, offset, nbytes int) error {
+ // Fuck you Windows you suck. TODO: Use windows.FlushViewOfFile.
+ return mm.Flush()
+}
}, nil
}
+func (p Piece) Flush() error {
+ if fl, ok := p.PieceImpl.(Flusher); ok {
+ return fl.Flush()
+ }
+ return nil
+}
+
type nopCloser struct{}
func (nopCloser) Close() error {
--- /dev/null
+package torrent
hasDirty := p.hasDirtyChunks()
t.cl.unlock()
if hasDirty {
- p.Flush() // You can be synchronous here!
+ // This could return fs.ErrNotExist, and that would be unexpected since we haven't
+ // marked it complete yet, and nobody should have moved it.
+ err := p.Flush() // You can be synchronous here!
+ if err != nil {
+ t.slogger().Warn("error flushing piece storage", "piece", piece, "err", err)
+ }
}
p.race++
err := p.Storage().MarkComplete()