TODO | 2 -- data/file/file.go | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++++ data/mmap/mmap.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++ misc.go | 67 ----------------------------------------------------- torrent.go | 38 ++++++++++++++++++++++---------------- diff --git a/TODO b/TODO index d76eeb2ec77195cf7c27a369b24dc0d54f46c031..e99c5b6a5a1504b7d65355f2437da4961ffd0674 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,5 @@ - * Properly encapsulate the mmap/span stuff. * Track upload and download data. * Emulate a UDP server in the UDP tracker tests. * Make use of sparse file regions in download data for faster hashing. - * Add an alternative to mmap()'ed torrent data. * If we're choked and interested, we never send not-interested if there's nothing we want? * Perform announce as part of GetPeers, to reduce closest node calculation overhead, and wasting discarded nodes during the search. \ No newline at end of file diff --git a/data/file/file.go b/data/file/file.go new file mode 100644 index 0000000000000000000000000000000000000000..7017e105efd4eb1e94acb3c5992d31b64a4c0adb --- /dev/null +++ b/data/file/file.go @@ -0,0 +1,117 @@ +package file + +import ( + "io" + "os" + "path/filepath" + + "github.com/anacrolix/libtorgo/metainfo" +) + +type data struct { + info *metainfo.Info + loc string +} + +func TorrentData(md *metainfo.Info, location string) (ret *data, err error) { + ret = &data{md, location} + return +} + +func (me *data) ReadAt(p []byte, off int64) (n int, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := len(p) + if int64(n1) > fi.Length-off { + n1 = int(fi.Length - off) + } + var f *os.File + f, err = os.Open(me.fileInfoName(fi)) + if err != nil { + return + } + n1, err = f.ReadAt(p[:n1], off) + f.Close() + if err != nil { + return + } + n += n1 + off = 0 + p = p[n1:] + if len(p) == 0 { + break + } + } + return +} + +func (me *data) Close() {} + +func (me *data) WriteAt(p []byte, off int64) (n int, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := len(p) + if int64(n1) > fi.Length-off { + n1 = int(fi.Length - off) + } + name := me.fileInfoName(fi) + os.MkdirAll(filepath.Dir(name), 0770) + var f *os.File + f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0660) + if err != nil { + return + } + n1, err = f.WriteAt(p[:n1], off) + f.Close() + if err != nil { + return + } + n += n1 + off = 0 + p = p[n1:] + if len(p) == 0 { + break + } + } + return +} + +func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := fi.Length - off + if n1 > n { + n1 = n + } + var f *os.File + f, err = os.Open(me.fileInfoName(fi)) + if err != nil { + return + } + n1, err = io.Copy(w, io.NewSectionReader(f, off, n1)) + f.Close() + if err != nil { + return + } + written += n1 + off = 0 + n -= n1 + if n == 0 { + break + } + } + return +} + +func (me *data) fileInfoName(fi metainfo.FileInfo) string { + return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...) +} diff --git a/data/mmap/mmap.go b/data/mmap/mmap.go new file mode 100644 index 0000000000000000000000000000000000000000..423cf34c8347a5c298242aa0b21da6597f416238 --- /dev/null +++ b/data/mmap/mmap.go @@ -0,0 +1,66 @@ +package mmap + +import ( + "fmt" + "os" + "path/filepath" + + "bitbucket.org/anacrolix/go.torrent/mmap_span" + "github.com/anacrolix/libtorgo/metainfo" + "launchpad.net/gommap" +) + +func TorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { + mms = &mmap_span.MMapSpan{} + defer func() { + if err != nil { + mms.Close() + mms = nil + } + }() + for _, miFile := range md.UpvertedFiles() { + fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...) + err = os.MkdirAll(filepath.Dir(fileName), 0777) + if err != nil { + err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err) + return + } + var file *os.File + file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return + } + func() { + defer file.Close() + var fi os.FileInfo + fi, err = file.Stat() + if err != nil { + return + } + if fi.Size() < miFile.Length { + err = file.Truncate(miFile.Length) + if err != nil { + return + } + } + if miFile.Length == 0 { + // Can't mmap() regions with length 0. + return + } + var mMap gommap.MMap + mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) + if err != nil { + err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err) + return + } + if int64(len(mMap)) != miFile.Length { + panic("mmap has wrong length") + } + mms.Append(mMap) + }() + if err != nil { + return + } + } + return +} diff --git a/misc.go b/misc.go index 85b8f588411dfbf95f02196578884865cf00b7c1..f14c8f74d9d752b1c605bcb91954b7ac5801d17c 100644 --- a/misc.go +++ b/misc.go @@ -5,15 +5,10 @@ "crypto" "errors" "fmt" "math/rand" - "os" - "path/filepath" "sync" "time" - "bitbucket.org/anacrolix/go.torrent/mmap_span" "bitbucket.org/anacrolix/go.torrent/peer_protocol" - "github.com/anacrolix/libtorgo/metainfo" - "launchpad.net/gommap" ) const ( @@ -103,68 +98,6 @@ var ( // Requested data not yet available. ErrDataNotReady = errors.New("data not ready") ) - -func upvertedSingleFileInfoFiles(info *metainfo.Info) []metainfo.FileInfo { - if len(info.Files) != 0 { - return info.Files - } - return []metainfo.FileInfo{{Length: info.Length, Path: nil}} -} - -func mmapTorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { - mms = &mmap_span.MMapSpan{} - defer func() { - if err != nil { - mms.Close() - mms = nil - } - }() - for _, miFile := range upvertedSingleFileInfoFiles(md) { - fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...) - err = os.MkdirAll(filepath.Dir(fileName), 0777) - if err != nil { - err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err) - return - } - var file *os.File - file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - return - } - func() { - defer file.Close() - var fi os.FileInfo - fi, err = file.Stat() - if err != nil { - return - } - if fi.Size() < miFile.Length { - err = file.Truncate(miFile.Length) - if err != nil { - return - } - } - if miFile.Length == 0 { - // Can't mmap() regions with length 0. - return - } - var mMap gommap.MMap - mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) - if err != nil { - err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err) - return - } - if int64(len(mMap)) != miFile.Length { - panic("mmap has wrong length") - } - mms.Append(mMap) - }() - if err != nil { - return - } - } - return -} // The size in bytes of a metadata extension piece. func metadataPieceSize(totalSize int, piece int) int { diff --git a/torrent.go b/torrent.go index a2901ba5c5188141952d062228376026f16281d2..53cb980c33771c891b4d5b955745719badce9ade 100644 --- a/torrent.go +++ b/torrent.go @@ -10,7 +10,8 @@ "sort" "sync" "time" - "bitbucket.org/anacrolix/go.torrent/mmap_span" + "bitbucket.org/anacrolix/go.torrent/data/file" + pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" "bitbucket.org/anacrolix/go.torrent/util" @@ -39,6 +40,14 @@ IPBytes string Port int } +type torrentData interface { + ReadAt(p []byte, off int64) (n int, err error) + Close() + WriteAt(p []byte, off int64) (n int, err error) + WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) +} + +// Is not aware of Client. type torrent struct { stateMu sync.Mutex closing chan struct{} @@ -50,9 +59,10 @@ InfoHash InfoHash Pieces []*piece length int64 + // Prevent mutations to Data memory maps while in use as they're not safe. dataLock sync.RWMutex - Data *mmap_span.MMapSpan + Data torrentData Info *MetaInfo // Active peer connections. @@ -175,14 +185,12 @@ // Called when metadata for a torrent becomes available. func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) { t.Info = newMetaInfo(&md) + t.length = 0 + for _, f := range t.Info.UpvertedFiles() { + t.length += f.Length + } t.MetaData = infoBytes t.metadataHave = nil - t.Data, err = mmapTorrentData(&md, dataDir) - if err != nil { - err = fmt.Errorf("error mmap'ing torrent data: %s", err) - return - } - t.length = t.Data.Size() for _, hash := range infoPieceHashes(&md) { piece := &piece{} piece.Event.L = eventLocker @@ -195,6 +203,11 @@ if err := conn.setNumPieces(t.numPieces()); err != nil { log.Printf("closing connection: %s", err) conn.Close() } + } + t.Data, err = file.TorrentData(&md, dataDir) + if err != nil { + err = fmt.Errorf("error mmap'ing torrent data: %s", err) + return } return } @@ -571,15 +584,8 @@ func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() t.dataLock.RLock() - n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) + t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) t.dataLock.RUnlock() - if err != nil { - panic(err) - } - if pp.Integer(n) != t.PieceLength(piece) { - // log.Print(t.Info) - panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece)) - } util.CopyExact(ps[:], hash.Sum(nil)) return }