import (
"errors"
- "fmt"
"io"
"io/fs"
"sync"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/segments"
)
p []byte,
off int64,
) (n int) {
- ms.segmentLocater.Locate(
- segments.Extent{off, int64(len(p))},
- func(i int, e segments.Extent) bool {
- mMapBytes := ms.mMaps[i].Bytes()[e.Start:]
- // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
- _n := copyBytes(copyArgs(p, mMapBytes))
- p = p[_n:]
- n += _n
-
- if segments.Int(_n) != e.Length {
- panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
- }
- return true
- },
- )
+ for i, e := range ms.segmentLocater.LocateIter(segments.Extent{off, int64(len(p))}) {
+ mMapBytes := ms.mMaps[i].Bytes()[e.Start:]
+ // log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
+ _n := copyBytes(copyArgs(p, mMapBytes))
+ p = p[_n:]
+ n += _n
+ panicif.NotEq(segments.Int(_n), e.Length)
+ }
return
}
fileSegmentsIndex segments.Index,
off, n int64,
each func(fileIndex int, length int64) bool,
-) bool {
- return fileSegmentsIndex.Locate(segments.Extent{
+) {
+ for fileIndex, segmentBounds := range fileSegmentsIndex.LocateIter(segments.Extent{
Start: off,
Length: n,
- }, func(fileIndex int, segmentBounds segments.Extent) bool {
- return each(fileIndex, segmentBounds.Start+segmentBounds.Length)
- })
+ }) {
+ if !each(fileIndex, segmentBounds.Start+segmentBounds.Length) {
+ return
+ }
+ }
}
func fsync(filePath string) (err error) {
"os"
"path/filepath"
+ "github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/torrent/segments"
)
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
- fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
+ for i, e := range fst.fts.segmentLocater.LocateIter(
+ segments.Extent{off, int64(len(b))},
+ ) {
n1, err1 := fst.readFileAt(fst.fts.file(i), b[:e.Length], e.Start)
n += n1
b = b[n1:]
- err = err1
- return err == nil // && int64(n1) == e.Length
- })
- if len(b) != 0 && err == nil {
+ if segments.Int(n1) == e.Length {
+ switch err1 {
+ // ReaderAt.ReadAt contract.
+ case nil, io.EOF:
+ default:
+ err = err1
+ return
+ }
+ } else {
+ panicif.Nil(err1)
+ err = err1
+ return
+ }
+ }
+ if len(b) != 0 {
+ // We're at the end of the torrent.
err = io.EOF
}
return
}
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
- // log.Printf("write at %v: %v bytes", off, len(p))
- fst.fts.segmentLocater.Locate(
+ for i, e := range fst.fts.segmentLocater.LocateIter(
segments.Extent{off, int64(len(p))},
- func(i int, e segments.Extent) bool {
- var f *os.File
- f, err = fst.openForWrite(fst.fts.file(i))
- if err != nil {
- return false
- }
- var n1 int
- n1, err = f.WriteAt(p[:e.Length], e.Start)
- // log.Printf("%v %v wrote %v: %v", i, e, n1, err)
- closeErr := f.Close()
- n += n1
- p = p[n1:]
- if err == nil {
- err = closeErr
- }
- if err == nil && int64(n1) != e.Length {
- err = io.ErrShortWrite
- }
- return err == nil
- })
+ ) {
+ var f *os.File
+ f, err = fst.openForWrite(fst.fts.file(i))
+ if err != nil {
+ return
+ }
+ var n1 int
+ n1, err = f.WriteAt(p[:e.Length], e.Start)
+ closeErr := f.Close()
+ n += n1
+ p = p[n1:]
+ if err == nil {
+ err = closeErr
+ }
+ if err == nil && int64(n1) != e.Length {
+ err = io.ErrShortWrite
+ }
+ if err != nil {
+ return
+ }
+ }
return
}
func (ws *Client) StartNewRequest(r RequestSpec, debugLogger *slog.Logger) Request {
ctx, cancel := context.WithCancel(context.TODO())
var requestParts []requestPart
- if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
+ for i, e := range ws.fileIndex.LocateIter(r) {
req, err := newRequest(
ctx,
ws.Url, i, ws.info, e.Start, e.Length,
ws.PathEscaper,
)
- if err != nil {
- panic(err)
- }
+ panicif.Err(err)
part := requestPart{
req: req,
e: e,
return ws.HttpClient.Do(req)
}
requestParts = append(requestParts, part)
- return true
- }) {
- panic("request out of file bounds")
}
+ // Technically what we want to ensure is that all parts exist consecutively. If the file data
+ // isn't consecutive, then it is piece aligned and we wouoldn't need to be doing multiple
+ // requests. TODO: Assert this.
+ panicif.Zero(len(requestParts))
body, w := io.Pipe()
req := Request{
cancel: cancel,