From: Matt Joiner Date: Fri, 25 Apr 2025 05:04:47 +0000 (+1000) Subject: Use iters in a few places around segments and v2 file trees and ensure mmap storage... X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ddc29fedcde07727be871edfcb51ad8e45be0452;p=btrtrc.git Use iters in a few places around segments and v2 file trees and ensure mmap storage has v2 compatible segment index --- diff --git a/common/upverted_files.go b/common/upverted_files.go deleted file mode 100644 index 8984eb25..00000000 --- a/common/upverted_files.go +++ /dev/null @@ -1,27 +0,0 @@ -package common - -import ( - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/segments" -) - -func LengthIterFromUpvertedFiles(fis []metainfo.FileInfo) segments.LengthIter { - i := 0 - return func() (segments.Length, bool) { - if i == len(fis) { - return -1, false - } - l := fis[i].Length - i++ - return l, true - } -} - -// Returns file segments, BitTorrent v2 aware. -func TorrentOffsetFileSegments(info *metainfo.Info) (ret []segments.Extent) { - files := info.UpvertedFiles() - for _, fi := range files { - ret = append(ret, segments.Extent{fi.TorrentOffset, fi.Length}) - } - return -} diff --git a/file.go b/file.go index a71055cf..2ff47442 100644 --- a/file.go +++ b/file.go @@ -119,9 +119,17 @@ func fileBytesLeft( } func (f *File) bytesLeft() (left int64) { - return fileBytesLeft(int64(f.t.usualPieceSize()), f.BeginPieceIndex(), f.EndPieceIndex(), f.offset, f.length, &f.t._completedPieces, func(pieceIndex int) int64 { - return int64(f.t.piece(pieceIndex).numDirtyBytes()) - }) + return fileBytesLeft( + int64(f.t.usualPieceSize()), + f.BeginPieceIndex(), + f.EndPieceIndex(), + f.offset, + f.length, + &f.t._completedPieces, + func(pieceIndex int) int64 { + return int64(f.t.piece(pieceIndex).numDirtyBytes()) + }, + ) } // The relative file path for a multi-file torrent, and the torrent name for a diff --git a/go.sum b/go.sum index 882fee6b..c1771431 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,6 @@ github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e h1:A0Ty9UeyBDI github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e/go.mod h1:4YFqy+788tLJWtin2jNliYVJi+8aDejG9zcu/2/pONw= github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d h1:ypNOsIwvdumNRlqWj/hsnLs5TyQWQOylwi+T9Qs454A= github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ= -github.com/anacrolix/chansync v0.5.1 h1:j+R9DtotkXm40VFjZ8rJTSJkg2Gv1ldZt8kl96lyJJ0= -github.com/anacrolix/chansync v0.5.1/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/chansync v0.6.0 h1:/aQVvZ1yLRhmqEYrr9dC92JwzNBQ/SNnFi4uk+fTkQY= github.com/anacrolix/chansync v0.6.0/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/dht/v2 v2.19.2-0.20221121215055-066ad8494444 h1:8V0K09lrGoeT2KRJNOtspA7q+OMxGwQqK/Ug0IiaaRE= diff --git a/metainfo/file-tree.go b/metainfo/file-tree.go index 85cbd736..e04867d0 100644 --- a/metainfo/file-tree.go +++ b/metainfo/file-tree.go @@ -1,6 +1,7 @@ package metainfo import ( + "iter" "maps" "slices" @@ -94,35 +95,41 @@ func (ft *FileTree) orderedKeys() []string { return slices.Sorted(maps.Keys(ft.Dir)) } -func (ft *FileTree) upvertedFiles(pieceLength int64, out func(fi FileInfo)) { +func (ft *FileTree) upvertedFiles(pieceLength int64) iter.Seq[FileInfo] { var offset int64 - ft.upvertedFilesInner(pieceLength, nil, &offset, out) + return ft.upvertedFilesInner(pieceLength, nil, &offset) } func (ft *FileTree) upvertedFilesInner( pieceLength int64, path []string, offset *int64, - out func(fi FileInfo), -) { - if ft.IsDir() { - for _, key := range ft.orderedKeys() { - if key == FileTreePropertiesKey { - continue +) iter.Seq[FileInfo] { + return func(yield func(FileInfo) bool) { + if ft.IsDir() { + for _, key := range ft.orderedKeys() { + if key == FileTreePropertiesKey { + continue + } + sub := g.MapMustGet(ft.Dir, key) + for fi := range sub.upvertedFilesInner(pieceLength, append(path, key), offset) { + if !yield(fi) { + return + } + } } - sub := g.MapMustGet(ft.Dir, key) - sub.upvertedFilesInner(pieceLength, append(path, key), offset, out) + } else { + yield(FileInfo{ + Length: ft.File.Length, + Path: append([]string(nil), path...), + // BEP 52 requires paths be UTF-8 if possible. + PathUtf8: append([]string(nil), path...), + PiecesRoot: ft.PiecesRootAsByteArray(), + TorrentOffset: *offset, + }) + *offset += (ft.File.Length + pieceLength - 1) / pieceLength * pieceLength } - } else { - out(FileInfo{ - Length: ft.File.Length, - Path: append([]string(nil), path...), - // BEP 52 requires paths be UTF-8 if possible. - PathUtf8: append([]string(nil), path...), - PiecesRoot: ft.PiecesRootAsByteArray(), - TorrentOffset: *offset, - }) - *offset += (ft.File.Length + pieceLength - 1) / pieceLength * pieceLength + } } diff --git a/metainfo/info.go b/metainfo/info.go index e58bb10e..e6c29530 100644 --- a/metainfo/info.go +++ b/metainfo/info.go @@ -4,11 +4,14 @@ import ( "errors" "fmt" "io" + "iter" "os" "path/filepath" + "slices" + "sort" "strings" - "github.com/anacrolix/missinggo/v2/slices" + "github.com/anacrolix/torrent/segments" ) // The info dictionary. See BEP 3 and BEP 52. @@ -77,7 +80,8 @@ func (info *Info) BuildFromFilePath(root string) (err error) { if err != nil { return } - slices.Sort(info.Files, func(l, r FileInfo) bool { + sort.Slice(info.Files, func(i, j int) bool { + l, r := info.Files[i], info.Files[j] return strings.Join(l.BestPath(), "/") < strings.Join(r.BestPath(), "/") }) if info.PieceLength == 0 { @@ -154,33 +158,41 @@ func (info *Info) IsDir() bool { // The files field, converted up from the old single-file in the parent info dict if necessary. This // is a helper to avoid having to conditionally handle single and multi-file torrent infos. func (info *Info) UpvertedFiles() (files []FileInfo) { + return slices.Collect(info.UpvertedFilesIter()) +} + +// The files field, converted up from the old single-file in the parent info dict if necessary. This +// is a helper to avoid having to conditionally handle single and multi-file torrent infos. +func (info *Info) UpvertedFilesIter() iter.Seq[FileInfo] { if info.HasV2() { - info.FileTree.upvertedFiles(info.PieceLength, func(fi FileInfo) { - files = append(files, fi) - }) - return + return info.FileTree.upvertedFiles(info.PieceLength) } return info.UpvertedV1Files() } // UpvertedFiles but specific to the files listed in the v1 info fields. This will include padding // files for example that wouldn't appear in v2 file trees. -func (info *Info) UpvertedV1Files() (files []FileInfo) { - if len(info.Files) == 0 { - return []FileInfo{{ - Length: info.Length, - // Callers should determine that Info.Name is the basename, and - // thus a regular file. - Path: nil, - }} - } - var offset int64 - for _, fi := range info.Files { - fi.TorrentOffset = offset - offset += fi.Length - files = append(files, fi) +func (info *Info) UpvertedV1Files() iter.Seq[FileInfo] { + return func(yield func(FileInfo) bool) { + if len(info.Files) == 0 { + yield(FileInfo{ + Length: info.Length, + // Callers should determine that Info.Name is the basename, and + // thus a regular file. + Path: nil, + }) + } + var offset int64 + for _, fi := range info.Files { + fi.TorrentOffset = offset + offset += fi.Length + if !yield(fi) { + return + } + } + return + } - return } func (info *Info) Piece(index int) Piece { @@ -207,3 +219,11 @@ func (info *Info) HasV1() bool { func (info *Info) FilesArePieceAligned() bool { return info.HasV2() } + +func (info *Info) FileSegmentsIndex() segments.Index { + return segments.NewIndexFromSegments(slices.Collect(func(yield func(segments.Extent) bool) { + for fi := range info.UpvertedFilesIter() { + yield(segments.Extent{fi.TorrentOffset, fi.Length}) + } + })) +} diff --git a/metainfo/piece.go b/metainfo/piece.go index 946e874b..87950a51 100644 --- a/metainfo/piece.go +++ b/metainfo/piece.go @@ -1,6 +1,8 @@ package metainfo import ( + "iter" + g "github.com/anacrolix/generics" ) @@ -16,19 +18,15 @@ func (p Piece) Length() int64 { var offset int64 pieceLength := p.Info.PieceLength lastFileEnd := int64(0) - done := false - p.Info.FileTree.upvertedFiles(pieceLength, func(fi FileInfo) { - if done { - return - } + for fi := range p.Info.FileTree.upvertedFiles(pieceLength) { fileStartPiece := int(offset / pieceLength) if fileStartPiece > p.i { - done = true - return + break } lastFileEnd = offset + fi.Length offset = (lastFileEnd + pieceLength - 1) / pieceLength * pieceLength - }) + + } ret := min(lastFileEnd-int64(p.i)*pieceLength, pieceLength) if ret <= 0 { panic(ret) @@ -38,6 +36,13 @@ func (p Piece) Length() int64 { return p.V1Length() } +func iterLast[T any](i iter.Seq[T]) (last g.Option[T]) { + for t := range i { + last.Set(t) + } + return +} + func (p Piece) V1Length() int64 { i := p.i lastPiece := p.Info.NumPieces() - 1 @@ -45,8 +50,7 @@ func (p Piece) V1Length() int64 { case 0 <= i && i < lastPiece: return p.Info.PieceLength case lastPiece >= 0 && i == lastPiece: - files := p.Info.UpvertedV1Files() - lastFile := files[len(files)-1] + lastFile := iterLast(p.Info.UpvertedV1Files()).Unwrap() length := lastFile.TorrentOffset + lastFile.Length - int64(i)*p.Info.PieceLength if length <= 0 || length > p.Info.PieceLength { panic(length) diff --git a/mmap_span/mmap_span.go b/mmap_span/mmap_span.go index 22c394f7..4ba2efdd 100644 --- a/mmap_span/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -1,8 +1,10 @@ package mmap_span import ( + "errors" "fmt" "io" + "io/fs" "sync" "github.com/anacrolix/torrent/segments" @@ -16,12 +18,16 @@ type Mmap interface { type MMapSpan struct { mu sync.RWMutex + closed bool mMaps []Mmap segmentLocater segments.Index } -func (ms *MMapSpan) Append(mMap Mmap) { - ms.mMaps = append(ms.mMaps, mMap) +func New(mMaps []Mmap, index segments.Index) *MMapSpan { + return &MMapSpan{ + mMaps: mMaps, + segmentLocater: index, + } } func (ms *MMapSpan) Flush() (errs []error) { @@ -36,38 +42,25 @@ func (ms *MMapSpan) Flush() (errs []error) { return } -func (ms *MMapSpan) Close() (errs []error) { +func (ms *MMapSpan) Close() (err error) { ms.mu.Lock() defer ms.mu.Unlock() for _, mMap := range ms.mMaps { - err := mMap.Unmap() - if err != nil { - errs = append(errs, err) - } + err = errors.Join(err, mMap.Unmap()) } // This is for issue 211. ms.mMaps = nil - ms.InitIndex() + ms.closed = true return } -func (me *MMapSpan) InitIndex() { - i := 0 - me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) { - if i == len(me.mMaps) { - return -1, false - } - l := int64(len(me.mMaps[i].Bytes())) - i++ - return l, true - }) - // log.Printf("made mmapspan index: %v", me.segmentLocater) -} - func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { // log.Printf("reading %v bytes at %v", len(p), off) ms.mu.RLock() defer ms.mu.RUnlock() + if ms.closed { + err = fs.ErrClosed + } n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off) if n != len(p) { err = io.EOF @@ -79,19 +72,26 @@ func copyBytes(dst, src []byte) int { return copy(dst, src) } -func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) { - ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool { - mMapBytes := ms.mMaps[i].Bytes()[e.Start:] - // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes)) - _n := copyBytes(copyArgs(p, mMapBytes)) - p = p[_n:] - n += _n +func (ms *MMapSpan) locateCopy( + copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), + p []byte, + off int64, +) (n int) { + ms.segmentLocater.Locate( + segments.Extent{off, int64(len(p))}, + func(i int, e segments.Extent) bool { + mMapBytes := ms.mMaps[i].Bytes()[e.Start:] + // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes)) + _n := copyBytes(copyArgs(p, mMapBytes)) + p = p[_n:] + n += _n - if segments.Int(_n) != e.Length { - panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length)) - } - return true - }) + if segments.Int(_n) != e.Length { + panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length)) + } + return true + }, + ) return } diff --git a/segments/index.go b/segments/index.go index 7ed30805..577ec90f 100644 --- a/segments/index.go +++ b/segments/index.go @@ -40,7 +40,8 @@ func (me Index) iterSegments() func() (Extent, bool) { } // Returns true if the callback returns false early, or extents are found in the index for all parts -// of the given extent. +// of the given extent. TODO: This might not handle discontiguous extents. To be tested. Needed for +// BitTorrent v2 possibly. func (me Index) Locate(e Extent, output Callback) bool { first := sort.Search(len(me.segments), func(i int) bool { _e := me.segments[i] diff --git a/segments/segments_test.go b/segments/segments_test.go index ef165714..c8c45376 100644 --- a/segments/segments_test.go +++ b/segments/segments_test.go @@ -36,7 +36,7 @@ func (me *collectExtents) scanCallback(i int, e Extent) bool { type newLocater func(LengthIter) Locater -func assertLocate( +func checkContiguous( t *testing.T, nl newLocater, ls []Length, @@ -54,17 +54,17 @@ func assertLocate( } func testLocater(t *testing.T, newLocater newLocater) { - assertLocate(t, newLocater, + checkContiguous(t, newLocater, []Length{1, 0, 2, 0, 3}, Extent{2, 2}, 2, []Extent{{1, 1}, {0, 0}, {0, 1}}) - assertLocate(t, newLocater, + checkContiguous(t, newLocater, []Length{1, 0, 2, 0, 3}, Extent{6, 2}, 2, []Extent{}) - assertLocate(t, newLocater, + checkContiguous(t, newLocater, []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537}, // 128737588 Extent{0, 16384}, 0, @@ -76,7 +76,7 @@ func testLocater(t *testing.T, newLocater newLocater) { {0, 1546}, {0, 8500}, }) - assertLocate(t, newLocater, + checkContiguous(t, newLocater, []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537, 1536, 1551}, // 128737588 Extent{129236992, 16384}, 5, diff --git a/storage/file-misc_test.go b/storage/file-misc_test.go index ee373e54..b2afd97c 100644 --- a/storage/file-misc_test.go +++ b/storage/file-misc_test.go @@ -5,9 +5,7 @@ import ( "github.com/go-quicktest/qt" - "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/segments" ) type requiredLength struct { @@ -18,7 +16,7 @@ type requiredLength struct { // The required file indices and file lengths for the given extent to be "complete". This is the // outdated interface used by some tests. func extentCompleteRequiredLengths(info *metainfo.Info, off, n int64) (ret []requiredLength) { - index := segments.NewIndexFromSegments(common.TorrentOffsetFileSegments(info)) + index := info.FileSegmentsIndex() minFileLengthsForTorrentExtent(index, off, n, func(fileIndex int, length int64) bool { ret = append(ret, requiredLength{fileIndex, length}) return true diff --git a/storage/file.go b/storage/file.go index f9fe743a..aa52034c 100644 --- a/storage/file.go +++ b/storage/file.go @@ -11,7 +11,6 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" - "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) @@ -92,7 +91,7 @@ func (fs fileClientImpl) OpenTorrent( } t := &fileTorrentImpl{ files, - segments.NewIndexFromSegments(common.TorrentOffsetFileSegments(info)), + info.FileSegmentsIndex(), infoHash, fs.opts.PieceCompletion, } @@ -174,7 +173,7 @@ type fileTorrentImplIO struct { } // Returns EOF on short or missing file. -func (fst *fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { +func (fst fileTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) { f, err := os.Open(file.path) if os.IsNotExist(err) { // File missing is treated the same as a short file. diff --git a/storage/mmap.go b/storage/mmap.go index 15ff9901..1640a139 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -70,11 +70,7 @@ func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl { } func (ts *mmapTorrentStorage) Close() error { - errs := ts.span.Close() - if len(errs) > 0 { - return errs[0] - } - return nil + return ts.span.Close() } func (ts *mmapTorrentStorage) Flush() error { @@ -114,10 +110,12 @@ func (sp mmapStoragePiece) MarkNotComplete() error { } func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { - mms = &mmap_span.MMapSpan{} + var mMaps []FileMapping defer func() { if err != nil { - mms.Close() + for _, mm := range mMaps { + err = errors.Join(err, mm.Unmap()) + } } }() for _, miFile := range md.UpvertedFiles() { @@ -130,13 +128,12 @@ func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e var mm FileMapping mm, err = mmapFile(fileName, miFile.Length) if err != nil { - err = fmt.Errorf("file %q: %s", miFile.DisplayPath(md), err) + err = fmt.Errorf("file %q: %w", miFile.DisplayPath(md), err) return } - mms.Append(mm) + mMaps = append(mMaps, mm) } - mms.InitIndex() - return + return mmap_span.New(mMaps, md.FileSegmentsIndex()), nil } func mmapFile(name string, size int64) (_ FileMapping, err error) { diff --git a/webseed/client.go b/webseed/client.go index 288c0fb7..9e9a96ee 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -12,7 +12,6 @@ import ( "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/segments" ) @@ -61,7 +60,7 @@ func (me *Client) SetInfo(info *metainfo.Info) { // http://ia600500.us.archive.org/1/items URLs in archive.org torrents. return } - me.fileIndex = segments.NewIndexFromSegments(common.TorrentOffsetFileSegments(info)) + me.fileIndex = info.FileSegmentsIndex() me.info = info me.Pieces.AddRange(0, uint64(info.NumPieces())) }