+++ /dev/null
-package mmap
-
-import (
- "fmt"
- "os"
- "path/filepath"
-
- "github.com/edsrzf/mmap-go"
-
- "github.com/anacrolix/torrent/metainfo"
- "github.com/anacrolix/torrent/mmap_span"
-)
-
-type torrentData struct {
- // Supports non-torrent specific data operations for the torrent.Data
- // interface.
- mmap_span.MMapSpan
-
- completed []bool
-}
-
-func (me *torrentData) PieceComplete(piece int) bool {
- return me.completed[piece]
-}
-
-func (me *torrentData) PieceCompleted(piece int) error {
- me.completed[piece] = true
- return nil
-}
-
-func TorrentData(md *metainfo.Info, location string) (ret *torrentData, err error) {
- var mms mmap_span.MMapSpan
- defer func() {
- if err != nil {
- mms.Close()
- }
- }()
- for _, miFile := range md.UpvertedFiles() {
- fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...)
- err = os.MkdirAll(filepath.Dir(fileName), 0777)
- if err != nil {
- err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err)
- return
- }
- var file *os.File
- file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
- if err != nil {
- return
- }
- func() {
- defer file.Close()
- var fi os.FileInfo
- fi, err = file.Stat()
- if err != nil {
- return
- }
- if fi.Size() < miFile.Length {
- err = file.Truncate(miFile.Length)
- if err != nil {
- return
- }
- }
- if miFile.Length == 0 {
- // Can't mmap() regions with length 0.
- return
- }
- var mMap mmap.MMap
- mMap, err = mmap.MapRegion(file,
- int(miFile.Length), // Probably not great on <64 bit systems.
- mmap.RDWR, 0, 0)
- if err != nil {
- err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err)
- return
- }
- if int64(len(mMap)) != miFile.Length {
- panic("mmap has wrong length")
- }
- mms.Append(mMap)
- }()
- if err != nil {
- return
- }
- }
- ret = &torrentData{
- MMapSpan: mms,
- completed: make([]bool, md.NumPieces()),
- }
- return
-}
+++ /dev/null
-package pieceStore
-
-import (
- "encoding/hex"
- "io"
-
- "github.com/anacrolix/torrent/metainfo"
-)
-
-type data struct {
- info *metainfo.Info
- store *store
-}
-
-func (me *data) pieceHashHex(i int) string {
- return hex.EncodeToString(me.info.Pieces[i*20 : (i+1)*20])
-}
-
-func (me *data) Close() {}
-
-// TODO: Make sure that reading completed can't read from incomplete. Then
-// also it'll be possible to verify that the Content-Range on completed
-// returns the correct piece length so there aren't short reads.
-
-func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
- for len(b) != 0 {
- if off >= me.info.TotalLength() {
- err = io.EOF
- break
- }
- p := me.info.Piece(int(off / me.info.PieceLength))
- b1 := b
- maxN1 := int(p.Length() - off%me.info.PieceLength)
- if len(b1) > maxN1 {
- b1 = b1[:maxN1]
- }
- var n1 int
- n1, err = me.store.pieceReadAt(p, b1, off%me.info.PieceLength)
- n += n1
- off += int64(n1)
- b = b[n1:]
- if err != nil {
- break
- }
- }
- return
-}
-
-// TODO: Rewrite this later, on short writes to a piece it will start to play up.
-func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
- i := int(off / me.info.PieceLength)
- off %= me.info.PieceLength
- for len(p) != 0 {
- p1 := p
- maxN := me.info.Piece(i).Length() - off
- if int64(len(p1)) > maxN {
- p1 = p1[:maxN]
- }
- var n1 int
- n1, err = me.store.pieceWriteAt(me.info.Piece(i), p1, off)
- n += n1
- if err != nil {
- return
- }
- p = p[n1:]
- off = 0
- i++
- }
- return
-}
-
-func (me *data) pieceReader(p metainfo.Piece, off int64) (ret io.ReadCloser, err error) {
- return me.store.getPieceRange(p, off, p.Length()-off)
-}
-
-func (me *data) PieceCompleted(index int) (err error) {
- return me.store.pieceCompleted(me.info.Piece(index))
-}
-
-func (me *data) PieceComplete(piece int) bool {
- return me.store.pieceComplete(me.info.Piece(piece))
-}
+++ /dev/null
-package fileCacheDataBackend
-
-import (
- "io"
- "os"
-
- "github.com/anacrolix/missinggo/filecache"
-
- "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
-)
-
-type backend struct {
- c *filecache.Cache
-}
-
-func New(fc *filecache.Cache) *backend {
- return &backend{
- c: fc,
- }
-}
-
-var _ dataBackend.I = &backend{}
-
-func (me *backend) Delete(path string) (err error) {
- err = me.c.Remove(path)
- return
-}
-
-func (me *backend) GetLength(path string) (ret int64, err error) {
- fi, err := me.c.StatFile(path)
- if os.IsNotExist(err) {
- err = dataBackend.ErrNotFound
- }
- if err != nil {
- return
- }
- ret = fi.Size()
- return
-}
-
-func (me *backend) Open(path string, flag int) (ret dataBackend.File, err error) {
- ret, err = me.c.OpenFile(path, flag)
- return
-}
-
-func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
- f, err := me.c.OpenFile(path, os.O_RDONLY)
- if os.IsNotExist(err) {
- err = dataBackend.ErrNotFound
- }
- if err != nil {
- return
- }
- ret = struct {
- io.Reader
- io.Closer
- }{
- io.NewSectionReader(f, off, n),
- f,
- }
- return
-}
+++ /dev/null
-package httpDataBackend
-
-import (
- "crypto/tls"
- "io"
- "net/http"
- "net/url"
- "path"
-
- "github.com/anacrolix/missinggo/httpfile"
- "github.com/anacrolix/missinggo/httptoo"
- "golang.org/x/net/http2"
-
- "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
-)
-
-type backend struct {
- // Backend URL.
- url url.URL
-
- FS httpfile.FS
-}
-
-func New(u url.URL) *backend {
- return &backend{
- url: *httptoo.CopyURL(&u),
- FS: httpfile.FS{
- Client: &http.Client{
- Transport: &http2.Transport{
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true,
- NextProtos: []string{"h2"},
- },
- },
- },
- },
- }
-}
-
-var _ dataBackend.I = &backend{}
-
-func fixErrNotFound(err error) error {
- if err == httpfile.ErrNotFound {
- return dataBackend.ErrNotFound
- }
- return err
-}
-
-func (me *backend) urlStr(_path string) string {
- u := me.url
- u.Path = path.Join(u.Path, _path)
- return u.String()
-}
-
-func (me *backend) Delete(path string) (err error) {
- err = me.FS.Delete(me.urlStr(path))
- err = fixErrNotFound(err)
- return
-}
-
-func (me *backend) GetLength(path string) (ret int64, err error) {
- ret, err = me.FS.GetLength(me.urlStr(path))
- err = fixErrNotFound(err)
- return
-}
-
-func (me *backend) Open(path string, flags int) (ret dataBackend.File, err error) {
- ret, err = me.FS.Open(me.urlStr(path), flags)
- err = fixErrNotFound(err)
- return
-}
-
-func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
- ret, err = me.FS.OpenSectionReader(me.urlStr(path), off, n)
- err = fixErrNotFound(err)
- return
-}
+++ /dev/null
-package dataBackend
-
-import (
- "io"
- "os"
-)
-
-// All functions must return ErrNotFound as required.
-type I interface {
- GetLength(path string) (int64, error)
- Open(path string, flags int) (File, error)
- OpenSection(path string, off, n int64) (io.ReadCloser, error)
- Delete(path string) error
-}
-
-var ErrNotFound = os.ErrNotExist
-
-type File interface {
- io.Closer
- io.Seeker
- io.Writer
- io.Reader
-}
+++ /dev/null
-package pieceStore
-
-import (
- "bytes"
- "crypto/sha1"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "log"
- "os"
- "path"
- "strconv"
- "sync"
- "time"
-
- "github.com/bradfitz/iter"
-
- "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
- "github.com/anacrolix/torrent/metainfo"
-)
-
-type store struct {
- db dataBackend.I
-
- mu sync.Mutex
- // The cached completion state for pieces.
- completion map[[20]byte]bool
- lastError time.Time
-}
-
-func (me *store) completedPiecePath(p metainfo.Piece) string {
- return path.Join("completed", hex.EncodeToString(p.Hash()))
-}
-
-func (me *store) incompletePiecePath(p metainfo.Piece) string {
- return path.Join(
- "incomplete",
- strconv.FormatInt(int64(os.Getpid()), 10),
- hex.EncodeToString(p.Hash()))
-}
-
-func (me *store) OpenTorrentData(info *metainfo.Info) (ret *data) {
- ret = &data{info, me}
- for i := range iter.N(info.NumPieces()) {
- go ret.PieceComplete(i)
- }
- return
-}
-
-func New(db dataBackend.I) *store {
- s := &store{
- db: db,
- }
- return s
-}
-
-// Turns 40 byte hex string into its equivalent binary byte array.
-func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) {
- if len(s) != 40 {
- return
- }
- n, err := hex.Decode(ret[:], []byte(s))
- if err != nil {
- return
- }
- if n != 20 {
- panic(n)
- }
- ok = true
- return
-}
-
-func sliceToPieceHashArray(b []byte) (ret [20]byte) {
- n := copy(ret[:], b)
- if n != 20 {
- panic(n)
- }
- return
-}
-
-func pieceHashArray(p metainfo.Piece) [20]byte {
- return sliceToPieceHashArray(p.Hash())
-}
-
-func (me *store) completionKnown(p metainfo.Piece) bool {
- me.mu.Lock()
- _, ok := me.completion[pieceHashArray(p)]
- me.mu.Unlock()
- return ok
-}
-
-func (me *store) isComplete(p metainfo.Piece) bool {
- me.mu.Lock()
- ret, _ := me.completion[pieceHashArray(p)]
- me.mu.Unlock()
- return ret
-}
-
-func (me *store) setCompletion(p metainfo.Piece, complete bool) {
- me.mu.Lock()
- if me.completion == nil {
- me.completion = make(map[[20]byte]bool)
- }
- me.completion[pieceHashArray(p)] = complete
- me.mu.Unlock()
-}
-
-func (me *store) pieceComplete(p metainfo.Piece) bool {
- if me.completionKnown(p) {
- return me.isComplete(p)
- }
- // Prevent a errors from stalling the caller.
- if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second {
- return false
- }
- length, err := me.db.GetLength(me.completedPiecePath(p))
- if err == dataBackend.ErrNotFound {
- me.setCompletion(p, false)
- return false
- }
- if err != nil {
- me.lastError = time.Now()
- log.Printf("%+v", err)
- return false
- }
- complete := length == p.Length()
- if !complete {
- log.Printf("completed piece %x has wrong length: %d", p.Hash(), length)
- }
- me.setCompletion(p, complete)
- return complete
-}
-
-func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
- if me.pieceComplete(p) {
- err = errors.New("already have piece")
- return
- }
- f, err := me.db.Open(me.incompletePiecePath(p), os.O_WRONLY|os.O_CREATE)
- if err != nil {
- err = fmt.Errorf("error opening %q: %s", me.incompletePiecePath(p), err)
- return
- }
- defer func() {
- closeErr := f.Close()
- if err == nil {
- err = closeErr
- }
- }()
- _, err = f.Seek(off, os.SEEK_SET)
- if err != nil {
- return
- }
- n, err = f.Write(b)
- return
-}
-
-func (me *store) forgetCompletions() {
- me.mu.Lock()
- me.completion = nil
- me.mu.Unlock()
-}
-
-func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) {
- rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n)
- if err == dataBackend.ErrNotFound {
- if me.isComplete(p) {
- me.forgetCompletions()
- }
- me.setCompletion(p, false)
- rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n)
- }
- if err == dataBackend.ErrNotFound {
- err = io.ErrUnexpectedEOF
- return
- }
- if err != nil {
- return
- }
- // Wrap up the response body so that the request slot is released when the
- // response body is closed.
- ret = rc
- return
-}
-
-func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
- rc, err := me.getPieceRange(p, off, int64(len(b)))
- if err != nil {
- return
- }
- defer rc.Close()
- n, err = io.ReadFull(rc, b)
- if err == io.EOF {
- err = io.ErrUnexpectedEOF
- }
- return
-}
-
-func (me *store) removePath(path string) (err error) {
- err = me.db.Delete(path)
- return
-}
-
-// Remove the completed piece if it exists, and mark the piece not completed.
-// Mustn't fail.
-func (me *store) deleteCompleted(p metainfo.Piece) {
- if err := me.removePath(me.completedPiecePath(p)); err != nil {
- panic(err)
- }
- me.setCompletion(p, false)
-}
-
-func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) {
- src, err := me.db.OpenSection(from, 0, n)
- if err != nil {
- return
- }
- defer src.Close()
- hasher := sha1.New()
- tee := io.TeeReader(src, hasher)
- dest, err := me.db.Open(to, os.O_WRONLY|os.O_CREATE)
- if err != nil {
- return
- }
- defer dest.Close()
- _, err = io.Copy(dest, tee)
- if err != nil {
- return
- }
- hash = hasher.Sum(nil)
- return
-}
-
-func (me *store) pieceCompleted(p metainfo.Piece) (err error) {
- if me.pieceComplete(p) {
- return
- }
- hash, err := me.hashCopyFile(me.incompletePiecePath(p), me.completedPiecePath(p), p.Length())
- if err == nil && !bytes.Equal(hash, p.Hash()) {
- err = errors.New("piece incomplete")
- }
- if err != nil {
- me.deleteCompleted(p)
- return
- }
- me.removePath(me.incompletePiecePath(p))
- me.setCompletion(p, true)
- return
-}