From 4b1b009049bdbda112b73e85f6ee1551c67538df Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 10 Feb 2015 00:14:52 +1100 Subject: [PATCH] Encapsulate torrent data, and provide os.File and mmap-based implementations --- TODO | 2 - data/file/file.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++ data/mmap/mmap.go | 66 ++++++++++++++++++++++++++ misc.go | 67 -------------------------- torrent.go | 38 ++++++++------- 5 files changed, 205 insertions(+), 85 deletions(-) create mode 100644 data/file/file.go create mode 100644 data/mmap/mmap.go diff --git a/TODO b/TODO index d76eeb2e..e99c5b6a 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,5 @@ - * Properly encapsulate the mmap/span stuff. * Track upload and download data. * Emulate a UDP server in the UDP tracker tests. * Make use of sparse file regions in download data for faster hashing. - * Add an alternative to mmap()'ed torrent data. * If we're choked and interested, we never send not-interested if there's nothing we want? * Perform announce as part of GetPeers, to reduce closest node calculation overhead, and wasting discarded nodes during the search. \ No newline at end of file diff --git a/data/file/file.go b/data/file/file.go new file mode 100644 index 00000000..7017e105 --- /dev/null +++ b/data/file/file.go @@ -0,0 +1,117 @@ +package file + +import ( + "io" + "os" + "path/filepath" + + "github.com/anacrolix/libtorgo/metainfo" +) + +type data struct { + info *metainfo.Info + loc string +} + +func TorrentData(md *metainfo.Info, location string) (ret *data, err error) { + ret = &data{md, location} + return +} + +func (me *data) ReadAt(p []byte, off int64) (n int, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := len(p) + if int64(n1) > fi.Length-off { + n1 = int(fi.Length - off) + } + var f *os.File + f, err = os.Open(me.fileInfoName(fi)) + if err != nil { + return + } + n1, err = f.ReadAt(p[:n1], off) + f.Close() + if err != nil { + return + } + n += n1 + off = 0 + p = p[n1:] + if len(p) == 0 { + break + } + } + return +} + +func (me *data) Close() {} + +func (me *data) WriteAt(p []byte, off int64) (n int, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := len(p) + if int64(n1) > fi.Length-off { + n1 = int(fi.Length - off) + } + name := me.fileInfoName(fi) + os.MkdirAll(filepath.Dir(name), 0770) + var f *os.File + f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0660) + if err != nil { + return + } + n1, err = f.WriteAt(p[:n1], off) + f.Close() + if err != nil { + return + } + n += n1 + off = 0 + p = p[n1:] + if len(p) == 0 { + break + } + } + return +} + +func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { + for _, fi := range me.info.UpvertedFiles() { + if off >= fi.Length { + off -= fi.Length + continue + } + n1 := fi.Length - off + if n1 > n { + n1 = n + } + var f *os.File + f, err = os.Open(me.fileInfoName(fi)) + if err != nil { + return + } + n1, err = io.Copy(w, io.NewSectionReader(f, off, n1)) + f.Close() + if err != nil { + return + } + written += n1 + off = 0 + n -= n1 + if n == 0 { + break + } + } + return +} + +func (me *data) fileInfoName(fi metainfo.FileInfo) string { + return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...) +} diff --git a/data/mmap/mmap.go b/data/mmap/mmap.go new file mode 100644 index 00000000..423cf34c --- /dev/null +++ b/data/mmap/mmap.go @@ -0,0 +1,66 @@ +package mmap + +import ( + "fmt" + "os" + "path/filepath" + + "bitbucket.org/anacrolix/go.torrent/mmap_span" + "github.com/anacrolix/libtorgo/metainfo" + "launchpad.net/gommap" +) + +func TorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { + mms = &mmap_span.MMapSpan{} + defer func() { + if err != nil { + mms.Close() + mms = nil + } + }() + for _, miFile := range md.UpvertedFiles() { + fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...) + err = os.MkdirAll(filepath.Dir(fileName), 0777) + if err != nil { + err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err) + return + } + var file *os.File + file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return + } + func() { + defer file.Close() + var fi os.FileInfo + fi, err = file.Stat() + if err != nil { + return + } + if fi.Size() < miFile.Length { + err = file.Truncate(miFile.Length) + if err != nil { + return + } + } + if miFile.Length == 0 { + // Can't mmap() regions with length 0. + return + } + var mMap gommap.MMap + mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) + if err != nil { + err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err) + return + } + if int64(len(mMap)) != miFile.Length { + panic("mmap has wrong length") + } + mms.Append(mMap) + }() + if err != nil { + return + } + } + return +} diff --git a/misc.go b/misc.go index 85b8f588..f14c8f74 100644 --- a/misc.go +++ b/misc.go @@ -5,15 +5,10 @@ import ( "errors" "fmt" "math/rand" - "os" - "path/filepath" "sync" "time" - "bitbucket.org/anacrolix/go.torrent/mmap_span" "bitbucket.org/anacrolix/go.torrent/peer_protocol" - "github.com/anacrolix/libtorgo/metainfo" - "launchpad.net/gommap" ) const ( @@ -104,68 +99,6 @@ var ( ErrDataNotReady = errors.New("data not ready") ) -func upvertedSingleFileInfoFiles(info *metainfo.Info) []metainfo.FileInfo { - if len(info.Files) != 0 { - return info.Files - } - return []metainfo.FileInfo{{Length: info.Length, Path: nil}} -} - -func mmapTorrentData(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, err error) { - mms = &mmap_span.MMapSpan{} - defer func() { - if err != nil { - mms.Close() - mms = nil - } - }() - for _, miFile := range upvertedSingleFileInfoFiles(md) { - fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...) - err = os.MkdirAll(filepath.Dir(fileName), 0777) - if err != nil { - err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err) - return - } - var file *os.File - file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - return - } - func() { - defer file.Close() - var fi os.FileInfo - fi, err = file.Stat() - if err != nil { - return - } - if fi.Size() < miFile.Length { - err = file.Truncate(miFile.Length) - if err != nil { - return - } - } - if miFile.Length == 0 { - // Can't mmap() regions with length 0. - return - } - var mMap gommap.MMap - mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) - if err != nil { - err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err) - return - } - if int64(len(mMap)) != miFile.Length { - panic("mmap has wrong length") - } - mms.Append(mMap) - }() - if err != nil { - return - } - } - return -} - // The size in bytes of a metadata extension piece. func metadataPieceSize(totalSize int, piece int) int { ret := totalSize - piece*(1<<14) diff --git a/torrent.go b/torrent.go index a2901ba5..53cb980c 100644 --- a/torrent.go +++ b/torrent.go @@ -10,7 +10,8 @@ import ( "sync" "time" - "bitbucket.org/anacrolix/go.torrent/mmap_span" + "bitbucket.org/anacrolix/go.torrent/data/file" + pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" "bitbucket.org/anacrolix/go.torrent/util" @@ -39,6 +40,14 @@ type peersKey struct { Port int } +type torrentData interface { + ReadAt(p []byte, off int64) (n int, err error) + Close() + WriteAt(p []byte, off int64) (n int, err error) + WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) +} + +// Is not aware of Client. type torrent struct { stateMu sync.Mutex closing chan struct{} @@ -50,9 +59,10 @@ type torrent struct { InfoHash InfoHash Pieces []*piece length int64 + // Prevent mutations to Data memory maps while in use as they're not safe. dataLock sync.RWMutex - Data *mmap_span.MMapSpan + Data torrentData Info *MetaInfo // Active peer connections. @@ -175,14 +185,12 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) { // Called when metadata for a torrent becomes available. func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) { t.Info = newMetaInfo(&md) + t.length = 0 + for _, f := range t.Info.UpvertedFiles() { + t.length += f.Length + } t.MetaData = infoBytes t.metadataHave = nil - t.Data, err = mmapTorrentData(&md, dataDir) - if err != nil { - err = fmt.Errorf("error mmap'ing torrent data: %s", err) - return - } - t.length = t.Data.Size() for _, hash := range infoPieceHashes(&md) { piece := &piece{} piece.Event.L = eventLocker @@ -196,6 +204,11 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte conn.Close() } } + t.Data, err = file.TorrentData(&md, dataDir) + if err != nil { + err = fmt.Errorf("error mmap'ing torrent data: %s", err) + return + } return } @@ -571,15 +584,8 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) { func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() t.dataLock.RLock() - n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) + t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) t.dataLock.RUnlock() - if err != nil { - panic(err) - } - if pp.Integer(n) != t.PieceLength(piece) { - // log.Print(t.Info) - panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece)) - } util.CopyExact(ps[:], hash.Sum(nil)) return } -- 2.48.1