// Returns the index of the first piece containing data for the file.
func (f *File) BeginPieceIndex() int {
- if f.t.usualPieceSize() == 0 {
- return 0
- }
- return pieceIndex(f.offset / int64(f.t.usualPieceSize()))
+ return f.fi.BeginPieceIndex(int64(f.t.usualPieceSize()))
}
// Returns the index of the piece after the last one containing data for the file.
func (f *File) EndPieceIndex() int {
- if f.t.usualPieceSize() == 0 {
- return 0
- }
- return pieceIndex((f.offset + f.length + int64(f.t.usualPieceSize()) - 1) / int64(f.t.usualPieceSize()))
+ return f.fi.EndPieceIndex(int64(f.t.usualPieceSize()))
}
func (f *File) numPieces() int {
PiecesRoot: ft.PiecesRootAsByteArray(),
TorrentOffset: *offset,
})
+ // v2 files are piece aligned. This bumps up the offset to the next piece boundary.
*offset += (ft.File.Length + pieceLength - 1) / pieceLength * pieceLength
}
}
return fi.Path
}
+
+func (fi *FileInfo) BeginPieceIndex(pieceLength int64) int {
+ if pieceLength == 0 {
+ return 0
+ }
+ return int(fi.TorrentOffset / pieceLength)
+}
+
+func (fi *FileInfo) EndPieceIndex(pieceLength int64) int {
+ if pieceLength == 0 {
+ return 0
+ }
+ return int((fi.TorrentOffset + fi.Length + pieceLength - 1) / pieceLength)
+}
}
}))
}
+
+// TODO: Add NumFiles helper?
package segments
import (
+ "iter"
"sort"
g "github.com/anacrolix/generics"
return output(i+first, e)
})
}
+
+func (me Index) LocateIter(e Extent) iter.Seq2[int, Extent] {
+ return func(yield func(int, Extent) bool) {
+ first := sort.Search(len(me.segments), func(i int) bool {
+ _e := me.segments[i]
+ return _e.End() > e.Start
+ })
+ if first == len(me.segments) {
+ return
+ }
+ e.Start -= me.segments[first].Start
+ // The extent is before the first segment.
+ if e.Start < 0 {
+ e.Length += e.Start
+ e.Start = 0
+ }
+ me.segments = me.segments[first:]
+ ScanConsecutive(me.iterSegments(), e, func(i int, e Extent) bool {
+ return yield(i+first, e)
+ })
+ }
+}
}
// Returns true if callback returns false early, or all segments in the haystack for the needle are
-// found.
+// found. TODO: Does this handle discontiguous extents?
func ScanConsecutive(haystack ConsecutiveExtentIter, needle Extent, callback Callback) bool {
i := 0
- // Extents have been found in the haystack and we're waiting for the needle to end. This is kind
- // of for backwards compatibility for some tests that expect to have zero-length extents.
+ // Extents have been found in the haystack, and we're waiting for the needle to end. This is
+ // kind of for backwards compatibility for some tests that expect to have zero-length extents.
startedNeedle := false
for needle.Length != 0 {
l, ok := haystack()
package storage
import (
+ "fmt"
"io"
- "log"
+ "iter"
"os"
"github.com/anacrolix/torrent/metainfo"
)
type filePieceImpl struct {
- *fileTorrentImpl
+ t *fileTorrentImpl
p metainfo.Piece
io.WriterAt
io.ReaderAt
var _ PieceImpl = (*filePieceImpl)(nil)
func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
- return metainfo.PieceKey{me.infoHash, me.p.Index()}
+ return metainfo.PieceKey{me.t.infoHash, me.p.Index()}
+}
+
+func (fs *filePieceImpl) extent() segments.Extent {
+ return segments.Extent{
+ Start: fs.p.Offset(),
+ Length: fs.p.Length(),
+ }
+}
+
+func (fs *filePieceImpl) pieceFiles() iter.Seq2[int, file] {
+ return func(yield func(int, file) bool) {
+ for fileIndex := range fs.t.segmentLocater.LocateIter(fs.extent()) {
+ if !yield(fileIndex, fs.t.files[fileIndex]) {
+ return
+ }
+ }
+ }
}
func (fs *filePieceImpl) Completion() Completion {
- c, err := fs.completion.Get(fs.pieceKey())
- if err != nil {
- log.Printf("error getting piece completion: %s", err)
- c.Ok = false
+ c := fs.t.getCompletion(fs.p.Index())
+ if !c.Ok {
return c
}
-
verified := true
if c.Complete {
// If it's allegedly complete, check that its constituent files have the necessary length.
- if !fs.segmentLocater.Locate(segments.Extent{
- Start: fs.p.Offset(),
- Length: fs.p.Length(),
- }, func(i int, extent segments.Extent) bool {
- file := fs.files[i]
- s, err := os.Stat(file.path)
- if err != nil || s.Size() < extent.Start+extent.Length {
- verified = false
- return false
- }
- return true
- }) {
+ if !fs.t.segmentLocater.Locate(
+ fs.extent(),
+ func(i int, extent segments.Extent) bool {
+ file := fs.t.files[i]
+ s, err := os.Stat(file.safeOsPath)
+ if err != nil || s.Size() < extent.Start+extent.Length {
+ verified = false
+ return false
+ }
+ return true
+ }) {
panic("files do not cover piece extent")
}
}
if !verified {
- // The completion was wrong, fix it.
+ // The completion was wrong, fix it. TODO: Should we use MarkNotComplete?
c.Complete = false
- fs.completion.Set(fs.pieceKey(), false)
+ fs.t.completion.Set(fs.pieceKey(), false)
}
return c
}
-func (fs *filePieceImpl) MarkComplete() error {
- return fs.completion.Set(fs.pieceKey(), true)
+func (fs *filePieceImpl) MarkComplete() (err error) {
+ err = fs.t.completion.Set(fs.pieceKey(), true)
+ if err != nil {
+ return
+ }
+nextFile:
+ for i, f := range fs.pieceFiles() {
+ for p := f.beginPieceIndex; p < f.endPieceIndex; p++ {
+ _ = i
+ //fmt.Printf("%v %#v %v\n", i, f, p)
+ cmpl := fs.t.getCompletion(p)
+ if !cmpl.Ok || !cmpl.Complete {
+ continue nextFile
+ }
+ }
+ err = fs.t.promotePartFile(f)
+ if err != nil {
+ err = fmt.Errorf("error promoting part file %q: %w", f.safeOsPath, err)
+ return
+ }
+ }
+ return
}
func (fs *filePieceImpl) MarkNotComplete() error {
- return fs.completion.Set(fs.pieceKey(), false)
+ return fs.t.completion.Set(fs.pieceKey(), false)
}
import (
"context"
+ "errors"
"fmt"
"io"
+ "iter"
"log/slog"
"os"
"path/filepath"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2"
FilePathMaker FilePathMaker
TorrentDirMaker TorrentDirFilePathMaker
PieceCompletion PieceCompletion
+ UsePartFiles g.Option[bool]
}
// NewFileOpts creates a new ClientImplCloser that stores files using the OS native filesystem.
return me.opts.PieceCompletion.Close()
}
+func enumIter[T any](i iter.Seq[T]) iter.Seq2[int, T] {
+ return func(yield func(int, T) bool) {
+ j := 0
+ for t := range i {
+ if !yield(j, t) {
+ return
+ }
+ j++
+ }
+ }
+}
+
func (fs fileClientImpl) OpenTorrent(
ctx context.Context,
info *metainfo.Info,
dir := fs.opts.TorrentDirMaker(fs.opts.ClientBaseDir, info, infoHash)
logger := log.ContextLogger(ctx).Slogger()
logger.DebugContext(ctx, "opened file torrent storage", slog.String("dir", dir))
- upvertedFiles := info.UpvertedFiles()
- files := make([]file, 0, len(upvertedFiles))
- for i, fileInfo := range upvertedFiles {
+ var files []file
+ for i, fileInfo := range enumIter(info.UpvertedFilesIter()) {
filePath := filepath.Join(dir, fs.opts.FilePathMaker(FilePathMakerOpts{
Info: info,
File: &fileInfo,
return
}
f := file{
- path: filePath,
- length: fileInfo.Length,
+ safeOsPath: filePath,
+ length: fileInfo.Length,
+ beginPieceIndex: fileInfo.BeginPieceIndex(info.PieceLength),
+ endPieceIndex: fileInfo.EndPieceIndex(info.PieceLength),
}
if f.length == 0 {
- err = CreateNativeZeroLengthFile(f.path)
+ err = CreateNativeZeroLengthFile(f.safeOsPath)
if err != nil {
err = fmt.Errorf("creating zero length file: %w", err)
return
files = append(files, f)
}
t := &fileTorrentImpl{
+ info,
files,
info.FileSegmentsIndex(),
infoHash,
fs.opts.PieceCompletion,
+ fs.opts.UsePartFiles.UnwrapOr(true),
}
return TorrentImpl{
Piece: t.Piece,
type file struct {
// The safe, OS-local file path.
- path string
- length int64
+ safeOsPath string
+ beginPieceIndex int
+ endPieceIndex int
+ length int64
}
type fileTorrentImpl struct {
+ info *metainfo.Info
files []file
segmentLocater segments.Index
infoHash metainfo.Hash
completion PieceCompletion
+ partFiles bool
+}
+
+func (fts *fileTorrentImpl) promotePartFile(f file) (err error) {
+ //fmt.Printf("promoting %q\n", f.safeOsPath)
+ if fts.partFiles {
+ err = os.Rename(f.safeOsPath+".part", f.safeOsPath)
+ if err != nil {
+ err = fmt.Errorf("renaming part file: %w", err)
+ return
+ }
+ }
+ info, err := os.Stat(f.safeOsPath)
+ if err != nil {
+ err = fmt.Errorf("statting file: %w", err)
+ return
+ }
+ // Clear writability for the file.
+ err = os.Chmod(f.safeOsPath, info.Mode().Perm()&^0o222)
+ if err != nil {
+ err = fmt.Errorf("setting file to read-only: %w", err)
+ return
+ }
+ return
+}
+
+func (fts *fileTorrentImpl) getCompletion(piece int) Completion {
+ cmpl, err := fts.completion.Get(metainfo.PieceKey{
+ fts.infoHash, piece,
+ })
+ cmpl.Err = errors.Join(cmpl.Err, err)
+ return cmpl
}
func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
func (fts *fileTorrentImpl) Flush() error {
for _, f := range fts.files {
- if err := fsync(f.path); err != nil {
+ if err := fsync(f.safeOsPath); err != nil {
return err
}
}
// Returns EOF on short or missing file.
func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
- f, err := os.Open(file.path)
+ f, err := os.Open(file.safeOsPath)
if os.IsNotExist(err) {
// File missing is treated the same as a short file.
err = io.EOF
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
// log.Printf("write at %v: %v bytes", off, len(p))
fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
- name := fst.fts.files[i].path
+ name := fst.fts.files[i].safeOsPath
os.MkdirAll(filepath.Dir(name), 0o777)
var f *os.File
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
)
type PieceCompletionGetSetter interface {
+ // I think the extra error parameter is vestigial. Looks like you should put your error in
+ // Completion.Err.
Get(metainfo.PieceKey) (Completion, error)
Set(_ metainfo.PieceKey, complete bool) error
}
}
err := p.Storage().MarkComplete()
if err != nil {
- t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
+ t.logger.Levelf(log.Error, "%T: error marking piece complete %d: %s", t.storage, piece, err)
}
t.cl.lock()