package storage
import (
+ "errors"
"fmt"
"io"
+ "io/fs"
"iter"
+ "log/slog"
"os"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
)
+// Piece within File storage.
type filePieceImpl struct {
t *fileTorrentImpl
p metainfo.Piece
var _ PieceImpl = (*filePieceImpl)(nil)
+func (me *filePieceImpl) logger() *slog.Logger {
+ return me.t.client.opts.Logger
+}
+
func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
}
-func (fs *filePieceImpl) extent() segments.Extent {
+func (me *filePieceImpl) extent() segments.Extent {
return segments.Extent{
- Start: fs.p.Offset(),
- Length: fs.p.Length(),
+ Start: me.p.Offset(),
+ Length: me.p.Length(),
}
}
-func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
+func (me *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
return func(yield func(int, file) bool) {
- for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) {
- if !yield(fileIndex, fs.t.files[fileIndex]) {
+ for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
+ if !yield(fileIndex, me.t.files[fileIndex]) {
return
}
}
}
}
-func (fs *filePieceImpl) Completion() Completion {
- c := fs.t.getCompletion(fs.p.Index())
- if !c.Ok {
+func (me *filePieceImpl) pieceCompletion() PieceCompletion {
+ return me.t.client.opts.PieceCompletion
+}
+
+func (me *filePieceImpl) Completion() Completion {
+ c := me.t.getCompletion(me.p.Index())
+ if !c.Ok || c.Err != nil {
return c
}
verified := true
if c.Complete {
+ noFiles := true
// If it's allegedly complete, check that its constituent files have the necessary length.
- if !fs.t.segmentLocater.Locate(
- fs.extent(),
- func(i int, extent segments.Extent) bool {
- file := fs.t.files[i]
- s, err := os.Stat(file.safeOsPath)
- if err != nil || s.Size() < extent.Start+extent.Length {
- verified = false
- return false
- }
- return true
- }) {
+ for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) {
+ noFiles = false
+ file := me.t.files[i]
+ s, err := os.Stat(file.safeOsPath)
+ if errors.Is(err, fs.ErrNotExist) {
+ s, err = os.Stat(file.partFilePath())
+ }
+ if err != nil {
+ me.logger().Warn(
+ "error checking file for piece marked as complete",
+ "piece", me.p,
+ "file", file.safeOsPath,
+ "err", err)
+ } else if s.Size() < extent.Start+extent.Length {
+ me.logger().Error(
+ "file too small for piece marked as complete",
+ "piece", me.p,
+ "file", file.safeOsPath,
+ "size", s.Size(),
+ "extent", extent)
+ } else {
+ continue
+ }
+ verified = false
+ break
+ }
+ // This probably belongs in a wrapper helper of some kind. I will retain the logic for now.
+ if noFiles {
panic("files do not cover piece extent")
}
}
if !verified {
- // The completion was wrong, fix it. TODO: Should we use MarkNotComplete?
- c.Complete = false
- fs.t.completion.Set(fs.pieceKey(), false)
+ // The completion was wrong, fix it.
+ err := me.MarkNotComplete()
+ if err != nil {
+ c.Err = fmt.Errorf("error marking piece not complete: %w", err)
+ }
}
return c
}
-func (fs *filePieceImpl) MarkComplete() (err error) {
- err = fs.t.completion.Set(fs.pieceKey(), true)
+func (me *filePieceImpl) MarkComplete() (err error) {
+ err = me.pieceCompletion().Set(me.pieceKey(), true)
if err != nil {
return
}
nextFile:
- for i, f := range fs.pieceFiles() {
+ for i, f := range me.pieceFiles() {
for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
_ = i
//fmt.Printf("%v %#v %v\n", i, f, p)
- cmpl := fs.t.getCompletion(p)
+ cmpl := me.t.getCompletion(p)
if !cmpl.Ok || !cmpl.Complete {
continue nextFile
}
}
- err = fs.t.promotePartFile(f)
+ err = me.promotePartFile(f)
if err != nil {
err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err)
return
return
}
-func (fs *filePieceImpl) MarkNotComplete() error {
- return fs.t.completion.Set(fs.pieceKey(), false)
+func (me *filePieceImpl) MarkNotComplete() (err error) {
+ err = me.pieceCompletion().Set(me.pieceKey(), false)
+ if err != nil {
+ return
+ }
+ for i, f := range me.pieceFiles() {
+ _ = i
+ err = me.onFileNotComplete(f)
+ if err != nil {
+ err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err)
+ return
+ }
+ }
+ return
+
+}
+
+func (me *filePieceImpl) promotePartFile(f file) (err error) {
+ if me.partFiles() {
+ err = os.Rename(f.partFilePath(), f.safeOsPath)
+ // If we get ENOENT, the file may already be in the final location.
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ err = fmt.Errorf("renaming part file: %w", err)
+ return
+ }
+ }
+ info, err := os.Stat(f.safeOsPath)
+ if err != nil {
+ err = fmt.Errorf("statting file: %w", err)
+ return
+ }
+ // Clear writability for the file.
+ err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
+ if err != nil {
+ err = fmt.Errorf("setting file to read-only: %w", err)
+ return
+ }
+ return
+}
+
+func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
+ if me.partFiles() {
+ err = os.Rename(f.safeOsPath, f.partFilePath())
+ // If we get ENOENT, the file may already be in the final location.
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ err = fmt.Errorf("renaming incomplete file: %w", err)
+ return
+ }
+ }
+ info, err := os.Stat(me.pathForWrite(f))
+ if err != nil {
+ err = fmt.Errorf("statting file: %w", err)
+ return
+ }
+ // Ensure the file is writable
+ err = os.Chmod(f.safeOsPath, info.Mode().Perm()|(filePerm&0o222))
+ if err != nil {
+ err = fmt.Errorf("setting file to read-only: %w", err)
+ return
+ }
+ return
+}
+
+func (me *filePieceImpl) pathForWrite(f file) string {
+ return me.t.pathForWrite(f)
+}
+
+func (me *filePieceImpl) partFiles() bool {
+ return me.t.partFiles()
}
"errors"
"fmt"
"io"
+ "io/fs"
"iter"
"log/slog"
"os"
TorrentDirMaker TorrentDirFilePathMaker
PieceCompletion PieceCompletion
UsePartFiles g.Option[bool]
+ Logger *slog.Logger
}
// NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
if opts.PieceCompletion == nil {
opts.PieceCompletion = pieceCompletionForDir(opts.ClientBaseDir)
}
- return fileClientImpl{opts}
+ if opts.Logger == nil {
+ opts.Logger = log.Default.Slogger()
+ }
+ return &fileClientImpl{opts}
}
-func (me fileClientImpl) Close() error {
+func (me *fileClientImpl) Close() error {
return me.opts.PieceCompletion.Close()
}
}
}
-func (fs fileClientImpl) OpenTorrent(
+func (fs *fileClientImpl) OpenTorrent(
ctx context.Context,
info *metainfo.Info,
infoHash metainfo.Hash,
files,
info.FileSegmentsIndex(),
infoHash,
- fs.opts.PieceCompletion,
- fs.opts.UsePartFiles.UnwrapOr(true),
+ fs,
}
return TorrentImpl{
Piece: t.Piece,
length int64
}
+func (f file) partFilePath() string {
+ return f.safeOsPath + ".part"
+}
+
type fileTorrentImpl struct {
info *metainfo.Info
files []file
segmentLocater segments.Index
infoHash metainfo.Hash
- completion PieceCompletion
- partFiles bool
+ // Save memory by pointing to the other data.
+ client *fileClientImpl
}
-func (fts *fileTorrentImpl) promotePartFile(f file) (err error) {
- //fmt.Printf("promoting %q\n", f.safeOsPath)
- if fts.partFiles {
- err = os.Rename(f.safeOsPath+".part", f.safeOsPath)
- if err != nil {
- err = fmt.Errorf("renaming part file: %w", err)
- return
- }
- }
- info, err := os.Stat(f.safeOsPath)
- if err != nil {
- err = fmt.Errorf("statting file: %w", err)
- return
- }
- // Clear writability for the file.
- err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
- if err != nil {
- err = fmt.Errorf("setting file to read-only: %w", err)
- return
+func (fts *fileTorrentImpl) partFiles() bool {
+ return fts.client.opts.UsePartFiles.UnwrapOr(true)
+}
+
+func (fts *fileTorrentImpl) pathForWrite(f file) string {
+ if fts.partFiles() {
+ return f.partFilePath()
}
- return
+ return f.safeOsPath
}
func (fts *fileTorrentImpl) getCompletion(piece int) Completion {
- cmpl, err := fts.completion.Get(metainfo.PieceKey{
+ cmpl, err := fts.client.opts.PieceCompletion.Get(metainfo.PieceKey{
fts.infoHash, piece,
})
cmpl.Err = errors.Join(cmpl.Err, err)
}
func fsync(filePath string) (err error) {
- _ = os.MkdirAll(filepath.Dir(filePath), dirPerm)
- var f *os.File
- f, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, filePerm)
- if err != nil {
- return err
+ f, err := os.OpenFile(filePath, os.O_WRONLY, filePerm)
+ if err != nil && !errors.Is(err, fs.ErrNotExist) {
+ return
}
defer f.Close()
if err = f.Sync(); err != nil {
func (fts *fileTorrentImpl) Flush() error {
for _, f := range fts.files {
- if err := fsync(f.safeOsPath); err != nil {
+ if err := fsync(fts.pathForWrite(f)); err != nil {
return err
}
}
// Returns EOF on short or missing file.
func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
f, err := os.Open(file.safeOsPath)
- if os.IsNotExist(err) {
- // File missing is treated the same as a short file.
+ if fst.fts.partFiles() && errors.Is(err, fs.ErrNotExist) {
+ f, err = os.Open(file.partFilePath())
+ }
+ if errors.Is(err, fs.ErrNotExist) {
+ // File missing is treated the same as a short file. Should we propagate this through the
+ // interface now that fs.ErrNotExist is a thing?
err = io.EOF
return
}
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
// log.Printf("write at %v: %v bytes", off, len(p))
fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
- name := fst.fts.files[i].safeOsPath
+ name := fst.fts.pathForWrite(fst.fts.files[i])
os.MkdirAll(filepath.Dir(name), dirPerm)
var f *os.File
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, filePerm)