// Uniquely identifies a piece.
type PieceKey struct {
InfoHash Hash
- Index pieceIndex
+ Index PieceIndex
}
type Piece struct {
Info *Info // Can we embed the fields here instead, or is it something to do with saving memory?
- i pieceIndex
+ i PieceIndex
}
func (p Piece) String() string {
return fmt.Sprintf("metainfo.Piece(Info.Name=%q, i=%v)", p.Info.Name, p.i)
}
-type pieceIndex = int
+type PieceIndex = int
func (p Piece) Length() int64 {
if p.Info.HasV2() {
dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
logger := log.ContextLogger(ctx).Slogger()
logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir))
- var files []file
- for i, fileInfo := range enumIter(info.UpvertedFilesIter()) {
+ metainfoFileInfos := info.UpvertedFiles()
+ files := make([]fileExtra, len(metainfoFileInfos))
+ for i, fileInfo := range metainfoFileInfos {
filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
Info: info,
File: &fileInfo,
err = fmt.Errorf("file %v: path %q is not sub path of %q", i, filePath, dir)
return
}
- f := file{
- safeOsPath: filePath,
- length: fileInfo.Length,
- beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength),
- endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength),
- }
- if f.length == 0 {
- err = CreateNativeZeroLengthFile(f.safeOsPath)
+ files[i].safeOsPath = filePath
+ if metainfoFileInfos[i].Length == 0 {
+ err = CreateNativeZeroLengthFile(filePath)
if err != nil {
err = fmt.Errorf("creating zero length file: %w", err)
return
}
}
- files = append(files, f)
}
t := &fileTorrentImpl{
info,
files,
+ metainfoFileInfos,
info.FileSegmentsIndex(),
infoHash,
fs,
"path/filepath"
"sync"
+ "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
)
return f.Close()
}
+// Combines data from different locations required to handle files in file storage.
type file struct {
+ // Required for piece length.
+ *metainfo.Info
+ // Enumerated when info is provided.
+ *metainfo.FileInfo
+ *fileExtra
+}
+
+func (f *file) beginPieceIndex() int {
+ return f.FileInfo.BeginPieceIndex(f.Info.PieceLength)
+}
+
+func (f *file) endPieceIndex() int {
+ return f.FileInfo.EndPieceIndex(f.Info.PieceLength)
+}
+
+func (f *file) length() int64 {
+ return f.FileInfo.Length
+}
+
+func (f *file) torrentOffset() int64 {
+ return f.FileInfo.TorrentOffset
+}
+
+// Extra state in the file storage for each file.
+type fileExtra struct {
// This protects high level OS file state like partial file name, permission mod, renaming etc.
mu sync.RWMutex
// The safe, OS-local file path.
- safeOsPath string
- beginPieceIndex int
- endPieceIndex int
- length int64
+ safeOsPath string
// Utility value to help the race detector find issues for us.
race byte
}
-func (f *file) partFilePath() string {
+func (f *fileExtra) partFilePath() string {
return f.safeOsPath + ".part"
}
"github.com/anacrolix/torrent/segments"
)
-// Piece within File storage.
+// Piece within File storage. This is created on demand.
type filePieceImpl struct {
t *fileTorrentImpl
p metainfo.Piece
}
}
-func (me *filePieceImpl) pieceFiles() iter.Seq2[int, *file] {
- return func(yield func(int, *file) bool) {
+func (me *filePieceImpl) pieceFiles() iter.Seq[file] {
+ return func(yield func(file) bool) {
for fileIndex := range me.t.segmentLocater.LocateIter(me.extent()) {
- if !yield(fileIndex, &me.t.files[fileIndex]) {
+ f := me.t.file(fileIndex)
+ if !yield(f) {
return
}
}
return me.t.pieceCompletion()
}
-func (me *filePieceImpl) Completion() Completion {
- c := me.t.getCompletion(me.p.Index())
+func (me *filePieceImpl) Completion() (c Completion) {
+ c = me.t.getCompletion(me.p.Index())
if !c.Ok || c.Err != nil {
return c
}
- verified := true
if c.Complete {
+ c = me.checkCompleteFileSizes()
+ }
+ return
+}
+
+func (me *filePieceImpl) iterFileSegments() iter.Seq2[int, segments.Extent] {
+ return func(yield func(int, segments.Extent) bool) {
noFiles := true
- // If it's allegedly complete, check that its constituent files have the necessary length.
for i, extent := range me.t.segmentLocater.LocateIter(me.extent()) {
noFiles = false
- file := &me.t.files[i]
- file.mu.RLock()
- s, err := os.Stat(file.safeOsPath)
- if me.partFiles() && errors.Is(err, fs.ErrNotExist) {
- // Can we use shared files for this? Is it faster?
- s, err = os.Stat(file.partFilePath())
+ if !yield(i, extent) {
+ return
}
- file.mu.RUnlock()
- if err != nil {
+ }
+ if noFiles {
+ panic("files do not cover piece extent")
+ }
+ }
+}
+
+// If a piece is complete, check consituent files have the minimum required sizes.
+func (me *filePieceImpl) checkCompleteFileSizes() (c Completion) {
+ c.Complete = true
+ c.Ok = true
+ for i, extent := range me.iterFileSegments() {
+ file := me.t.file(i)
+ file.mu.RLock()
+ s, err := os.Stat(file.safeOsPath)
+ if me.partFiles() && errors.Is(err, fs.ErrNotExist) {
+ // Can we use shared files for this? Is it faster?
+ s, err = os.Stat(file.partFilePath())
+ }
+ file.mu.RUnlock()
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
me.logger().Warn(
- "error checking file for piece marked as complete",
- "piece", me.p,
+ "error checking file size for piece marked as complete",
"file", file.safeOsPath,
+ "piece", me.p.Index(),
"err", err)
- } else if s.Size() < extent.End() {
- me.logger().Error(
- "file too small for piece marked as complete",
- "piece", me.p,
- "file", file.safeOsPath,
- "size", s.Size(),
- "extent", extent)
- } else {
- continue
+ c.Complete = false
+ me.markIncompletePieces(&file, 0)
+ return
}
- verified = false
- break
+ c.Err = fmt.Errorf("checking file %v: %w", file.safeOsPath, err)
+ c.Complete = false
+ return
}
- // This probably belongs in a wrapper helper of some kind. I will retain the logic for now.
- if noFiles {
- panic("files do not cover piece extent")
+ if s.Size() < extent.End() {
+ me.logger().Warn(
+ "file too small for piece marked as complete",
+ "piece", me.p.Index(),
+ "file", file.safeOsPath,
+ "size", s.Size(),
+ "extent", extent)
+ me.markIncompletePieces(&file, s.Size())
+ c.Complete = false
+ return
}
}
+ return
+}
- if !verified {
- // The completion was wrong, fix it. TODO: Fix all other affected pieces too so we don't
- // spam log messages, or record that the file is known to be bad until it comes good again.
- err := me.MarkNotComplete()
+func (me *filePieceImpl) markIncompletePieces(file *file, size int64) {
+ if size >= file.length() {
+ return
+ }
+ pieceLength := me.t.info.PieceLength
+ begin := metainfo.PieceIndex((file.torrentOffset() + size) / pieceLength)
+ end := metainfo.PieceIndex((file.torrentOffset() + file.length() + pieceLength - 1) / pieceLength)
+ for p := begin; p < end; p++ {
+ key := metainfo.PieceKey{
+ InfoHash: me.t.infoHash,
+ Index: p,
+ }
+ err := me.pieceCompletion().Set(key, false)
if err != nil {
- c.Err = fmt.Errorf("error marking piece not complete: %w", err)
+ me.logger().Error("error marking piece not complete", "piece", p, "err", err)
+ return
}
- c.Complete = false
}
-
- return c
}
func (me *filePieceImpl) MarkComplete() (err error) {
if err != nil {
return
}
- for _, f := range me.pieceFiles() {
+ for f := range me.pieceFiles() {
res := me.allFilePiecesComplete(f)
if res.Err != nil {
err = res.Err
return
}
-func (me *filePieceImpl) allFilePiecesComplete(f *file) (ret g.Result[bool]) {
+func (me *filePieceImpl) allFilePiecesComplete(f file) (ret g.Result[bool]) {
next, stop := iter.Pull(GetPieceCompletionRange(
me.t.pieceCompletion(),
me.t.infoHash,
- f.beginPieceIndex,
- f.endPieceIndex,
+ f.beginPieceIndex(),
+ f.endPieceIndex(),
))
defer stop()
- for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+ for p := f.beginPieceIndex(); p < f.endPieceIndex(); p++ {
cmpl, ok := next()
panicif.False(ok)
if cmpl.Err != nil {
if err != nil {
return
}
- for i, f := range me.pieceFiles() {
- _ = i
+ for f := range me.pieceFiles() {
err = me.onFileNotComplete(f)
if err != nil {
err = fmt.Errorf("preparing incomplete file %q: %w", f.safeOsPath, err)
}
-func (me *filePieceImpl) promotePartFile(f *file) (err error) {
+func (me *filePieceImpl) promotePartFile(f file) (err error) {
f.mu.Lock()
defer f.mu.Unlock()
f.race++
return nil
}
-func (me *filePieceImpl) onFileNotComplete(f *file) (err error) {
+func (me *filePieceImpl) onFileNotComplete(f file) (err error) {
f.mu.Lock()
defer f.mu.Unlock()
f.race++
return
}
}
- info, err := os.Stat(me.pathForWrite(f))
+ info, err := os.Stat(me.pathForWrite(&f))
if errors.Is(err, fs.ErrNotExist) {
return nil
}
return
}
// Ensure the file is writable
- err = os.Chmod(me.pathForWrite(f), info.Mode().Perm()|(filePerm&0o222))
+ err = os.Chmod(me.pathForWrite(&f), info.Mode().Perm()|(filePerm&0o222))
if err != nil {
err = fmt.Errorf("setting file writable: %w", err)
return
}
// Returns EOF on short or missing file.
-func (fst fileTorrentImplIO) readFileAt(file *file, b []byte, off int64) (n int, err error) {
+func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
fst.fts.logger().Debug("readFileAt", "file.safeOsPath", file.safeOsPath)
var f sharedFileIf
file.mu.RLock()
}
defer f.Close()
// Limit the read to within the expected bounds of this file.
- if int64(len(b)) > file.length-off {
- b = b[:file.length-off]
+ if int64(len(b)) > file.length()-off {
+ b = b[:file.length()-off]
}
- for off < file.length && len(b) != 0 {
+ for off < file.length() && len(b) != 0 {
n1, err1 := f.ReadAt(b, off)
b = b[n1:]
n += n1
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
- n1, err1 := fst.readFileAt(&fst.fts.files[i], b[:e.Length], e.Start)
+ n1, err1 := fst.readFileAt(fst.fts.file(i), b[:e.Length], e.Start)
n += n1
b = b[n1:]
err = err1
return
}
-func (fst fileTorrentImplIO) openForWrite(file *file) (f *os.File, err error) {
+func (fst fileTorrentImplIO) openForWrite(file file) (f *os.File, err error) {
// It might be possible to have a writable handle shared files cache if we need it.
fst.fts.logger().Debug("openForWrite", "file.safeOsPath", file.safeOsPath)
- p := fst.fts.pathForWrite(file)
+ p := fst.fts.pathForWrite(&file)
f, err = os.OpenFile(p, os.O_WRONLY|os.O_CREATE, filePerm)
if err == nil {
return
segments.Extent{off, int64(len(p))},
func(i int, e segments.Extent) bool {
var f *os.File
- f, err = fst.openForWrite(&fst.fts.files[i])
+ f, err = fst.openForWrite(fst.fts.file(i))
if err != nil {
return false
}
)
type fileTorrentImpl struct {
- info *metainfo.Info
- files []file
- segmentLocater segments.Index
- infoHash metainfo.Hash
+ info *metainfo.Info
+ files []fileExtra
+ metainfoFileInfos []metainfo.FileInfo
+ segmentLocater segments.Index
+ infoHash metainfo.Hash
// Save memory by pointing to the other data.
client *fileClientImpl
}
// Set piece completions based on whether all files in each piece are not .part files.
func (fts *fileTorrentImpl) setCompletionFromPartFiles() error {
notComplete := make([]bool, fts.info.NumPieces())
- for _, f := range fts.files {
+ for fileIndex := range fts.files {
+ f := fts.file(fileIndex)
fi, err := os.Stat(f.safeOsPath)
if err == nil {
- if fi.Size() == f.length {
+ if fi.Size() == f.length() {
continue
}
fts.logger().Warn("file has unexpected size", "file", f.safeOsPath, "size", fi.Size(), "expected", f.length)
} else if !errors.Is(err, fs.ErrNotExist) {
fts.logger().Warn("error checking file size", "err", err)
}
- for i := f.beginPieceIndex; i < f.endPieceIndex; i++ {
- notComplete[i] = true
+ for pieceIndex := f.beginPieceIndex(); pieceIndex < f.endPieceIndex(); pieceIndex++ {
+ notComplete[pieceIndex] = true
}
}
for i, nc := range notComplete {
func (fts *fileTorrentImpl) Flush() error {
for i := range fts.files {
- f := &fts.files[i]
+ f := fts.file(i)
fts.logger().Debug("flushing", "file.safeOsPath", f.safeOsPath)
- if err := fsync(fts.pathForWrite(f)); err != nil {
+ if err := fsync(fts.pathForWrite(&f)); err != nil {
return err
}
}
return nil
}
+
+func (fts *fileTorrentImpl) file(index int) file {
+ return file{
+ Info: fts.info,
+ FileInfo: &fts.metainfoFileInfos[index],
+ fileExtra: &fts.files[index],
+ }
+}