From: Matt Joiner Date: Mon, 28 Mar 2016 10:57:04 +0000 (+1100) Subject: Get mmap storage working X-Git-Tag: v1.0.0~814 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=775cf538095845eea24a52d02a9489c4ad879b2b;p=btrtrc.git Get mmap storage working --- diff --git a/client.go b/client.go index 9ab59ce3..7f0d5484 100644 --- a/client.go +++ b/client.go @@ -144,7 +144,7 @@ type Client struct { utpSock *utp.Socket dHT *dht.Server ipBlockList iplist.Ranger - bannedTorrents map[InfoHash]struct{} + bannedTorrents map[metainfo.InfoHash]struct{} config Config pruneTimer *time.Timer extensionBytes peerExtensionBytes @@ -159,7 +159,7 @@ type Client struct { event sync.Cond closed missinggo.Event - torrents map[InfoHash]*torrent + torrents map[metainfo.InfoHash]*torrent } func (me *Client) IPBlockList() iplist.Ranger { @@ -190,7 +190,7 @@ func (me *Client) ListenAddr() (addr net.Addr) { } type hashSorter struct { - Hashes []InfoHash + Hashes []metainfo.InfoHash } func (me hashSorter) Len() int { @@ -338,7 +338,7 @@ func (cl *Client) initBannedTorrents() error { } defer f.Close() scanner := bufio.NewScanner(f) - cl.bannedTorrents = make(map[InfoHash]struct{}) + cl.bannedTorrents = make(map[metainfo.InfoHash]struct{}) for scanner.Scan() { if strings.HasPrefix(strings.TrimSpace(scanner.Text()), "#") { continue @@ -354,7 +354,7 @@ func (cl *Client) initBannedTorrents() error { if len(ihs) != 20 { return errors.New("bad infohash") } - var ih InfoHash + var ih metainfo.InfoHash CopyExact(&ih, ihs) cl.bannedTorrents[ih] = struct{}{} } @@ -380,7 +380,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { config: *cfg, defaultStorage: cfg.DefaultStorage, dopplegangerAddrs: make(map[string]struct{}), - torrents: make(map[InfoHash]*torrent), + torrents: make(map[metainfo.InfoHash]*torrent), } CopyExact(&cl.extensionBytes, defaultExtensionBytes) cl.event.L = &cl.mu @@ -577,7 +577,7 @@ func (cl *Client) incomingConnection(nc net.Conn, utp bool) { } // Returns a handle to the given torrent, if it's present in the client. -func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) { +func (cl *Client) Torrent(ih metainfo.InfoHash) (T Torrent, ok bool) { cl.mu.Lock() defer cl.mu.Unlock() t, ok := cl.torrents[ih] @@ -588,7 +588,7 @@ func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) { return } -func (me *Client) torrent(ih InfoHash) *torrent { +func (me *Client) torrent(ih metainfo.InfoHash) *torrent { return me.torrents[ih] } @@ -854,14 +854,14 @@ func (me *peerExtensionBytes) SupportsFast() bool { type handshakeResult struct { peerExtensionBytes peerID - InfoHash + metainfo.InfoHash } // ih is nil if we expect the peer to declare the InfoHash, such as when the // peer initiated the connection. Returns ok if the handshake was successful, // and err if there was an unexpected condition other than the peer simply // abandoning the handshake. -func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) { +func handshake(sock io.ReadWriter, ih *metainfo.InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) { // Bytes to be sent to the peer. Should never block the sender. postCh := make(chan []byte, 4) // A single error value sent when the writer completes. @@ -1025,7 +1025,7 @@ func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) { } // Returns !ok if handshake failed for valid reasons. -func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) { +func (cl *Client) connBTHandshake(c *connection, ih *metainfo.InfoHash) (ret metainfo.InfoHash, ok bool, err error) { res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes) if err != nil || !ok { return @@ -1192,7 +1192,7 @@ func (cl *Client) requestPendingMetadata(t *torrent, c *connection) { func (cl *Client) completedMetadata(t *torrent) { h := sha1.New() h.Write(t.MetaData) - var ih InfoHash + var ih metainfo.InfoHash CopyExact(&ih, h.Sum(nil)) if ih != t.InfoHash { log.Print("bad metadata") @@ -1683,7 +1683,7 @@ func (me *Client) addPeers(t *torrent, peers []Peer) { } } -func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string { +func (cl *Client) cachedMetaInfoFilename(ih metainfo.InfoHash) string { return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent") } @@ -1706,7 +1706,7 @@ func (cl *Client) saveTorrentFile(t *torrent) error { // able to save the torrent, but not load it again to check it. return nil } - if !bytes.Equal(mi.Info.Hash, t.InfoHash[:]) { + if !bytes.Equal(mi.Info.Hash.Bytes(), t.InfoHash[:]) { log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:]) } return nil @@ -1730,7 +1730,7 @@ func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err // Prepare a Torrent without any attachment to a Client. That means we can // initialize fields all fields that don't require the Client without locking // it. -func newTorrent(ih InfoHash) (t *torrent) { +func newTorrent(ih metainfo.InfoHash) (t *torrent) { t = &torrent{ InfoHash: ih, chunkSize: defaultChunkSize, @@ -1854,7 +1854,7 @@ func (t Torrent) DownloadAll() { // Returns nil metainfo if it isn't in the cache. Checks that the retrieved // metainfo has the correct infohash. -func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { +func (cl *Client) torrentCacheMetaInfo(ih metainfo.InfoHash) (mi *metainfo.MetaInfo, err error) { if cl.config.DisableMetainfoCache { return } @@ -1871,7 +1871,7 @@ func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err if err != nil { return } - if !bytes.Equal(mi.Info.Hash, ih[:]) { + if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) { err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:]) return } @@ -1883,7 +1883,7 @@ func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err type TorrentSpec struct { // The tiered tracker URIs. Trackers [][]string - InfoHash InfoHash + InfoHash metainfo.InfoHash Info *metainfo.InfoEx // The name to use if the Name field from the Info isn't available. DisplayName string @@ -1919,7 +1919,7 @@ func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) { spec.Trackers[0] = append(spec.Trackers[0], mi.Announce) } - CopyExact(&spec.InfoHash, &mi.Info.Hash) + CopyExact(&spec.InfoHash, mi.Info.Hash) return } @@ -1995,7 +1995,7 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er return } -func (me *Client) dropTorrent(infoHash InfoHash) (err error) { +func (me *Client) dropTorrent(infoHash metainfo.InfoHash) (err error) { t, ok := me.torrents[infoHash] if !ok { err = fmt.Errorf("no such torrent") diff --git a/client_test.go b/client_test.go index df024899..6a33f3b8 100644 --- a/client_test.go +++ b/client_test.go @@ -50,27 +50,19 @@ var TestingConfig = Config{ func TestClientDefault(t *testing.T) { cl, err := NewClient(&TestingConfig) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) cl.Close() } func TestAddDropTorrent(t *testing.T) { cl, err := NewClient(&TestingConfig) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer cl.Close() dir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(dir) tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) - if err != nil { - t.Fatal(err) - } - if !new { - t.FailNow() - } + require.NoError(t, err) + assert.True(t, new) tt.Drop() } @@ -95,7 +87,7 @@ func TestPieceHashSize(t *testing.T) { func TestTorrentInitialState(t *testing.T) { dir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(dir) - tor := newTorrent(func() (ih InfoHash) { + tor := newTorrent(func() (ih metainfo.InfoHash) { missinggo.CopyExact(ih[:], mi.Info.Hash) return }()) @@ -265,16 +257,23 @@ func TestClientTransferSmallCache(t *testing.T) { } func TestClientTransferVarious(t *testing.T) { - for _, responsive := range []bool{false, true} { - testClientTransfer(t, testClientTransferParams{ - Responsive: responsive, - }) - for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { + for _, ss := range []func(string) storage.I{ + storage.NewFile, + storage.NewMMap, + } { + for _, responsive := range []bool{false, true} { testClientTransfer(t, testClientTransferParams{ - Responsive: responsive, - SetReadahead: true, - Readahead: readahead, + Responsive: responsive, + SeederStorage: ss, }) + for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { + testClientTransfer(t, testClientTransferParams{ + SeederStorage: ss, + Responsive: responsive, + SetReadahead: true, + Readahead: readahead, + }) + } } } } @@ -286,6 +285,7 @@ type testClientTransferParams struct { ExportClientStatus bool SetLeecherStorageCapacity bool LeecherStorageCapacity int64 + SeederStorage func(string) storage.I } func testClientTransfer(t *testing.T, ps testClientTransferParams) { @@ -293,7 +293,11 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { defer os.RemoveAll(greetingTempDir) cfg := TestingConfig cfg.Seed = true - cfg.DataDir = greetingTempDir + if ps.SeederStorage != nil { + cfg.DefaultStorage = ps.SeederStorage(greetingTempDir) + } else { + cfg.DataDir = greetingTempDir + } seeder, err := NewClient(&cfg) require.NoError(t, err) defer seeder.Close() @@ -673,7 +677,7 @@ func TestAddTorrentMetainfoInCache(t *testing.T) { require.NoError(t, err) require.True(t, new) require.NotNil(t, tt.Info()) - _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash))) + _, err = os.Stat(filepath.Join(cfg.ConfigDir, "torrents", fmt.Sprintf("%x.torrent", mi.Info.Hash.Bytes()))) require.NoError(t, err) // Contains only the infohash. var ts TorrentSpec diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index b19ff343..bf8dc0b0 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -17,8 +17,8 @@ import ( "github.com/gosuri/uiprogress" "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/data/mmap" "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" ) func resolvedPeerAddrs(ss []string) (ret []torrent.Peer, err error) { @@ -121,13 +121,7 @@ func main() { tagflag.Parse(&opts, tagflag.SkipBadTypes()) clientConfig := opts.Config if opts.Mmap { - clientConfig.TorrentDataOpener = func(info *metainfo.Info) torrent.Storage { - ret, err := mmap.TorrentData(info, "") - if err != nil { - log.Fatalf("error opening torrent data for %q: %s", info.Name, err) - } - return ret - } + clientConfig.DefaultStorage = storage.NewMMap("") } client, err := torrent.NewClient(&clientConfig) diff --git a/metainfo/metainfo.go b/metainfo/metainfo.go index 0ded430b..87974825 100644 --- a/metainfo/metainfo.go +++ b/metainfo/metainfo.go @@ -157,7 +157,7 @@ func (me *Info) NumPieces() int { } type Piece struct { - Info *Info + Info *InfoEx i int } @@ -177,7 +177,7 @@ func (me Piece) Hash() (ret InfoHash) { return } -func (me *Info) Piece(i int) Piece { +func (me *InfoEx) Piece(i int) Piece { return Piece{me, i} } @@ -204,7 +204,7 @@ func (i *Info) UpvertedFiles() []FileInfo { // important to Bittorrent. type InfoEx struct { Info - Hash []byte + Hash *InfoHash Bytes []byte } @@ -214,14 +214,14 @@ var ( ) func (this *InfoEx) UnmarshalBencode(data []byte) error { - this.Bytes = make([]byte, 0, len(data)) - this.Bytes = append(this.Bytes, data...) + this.Bytes = append(make([]byte, 0, len(data)), data...) h := sha1.New() _, err := h.Write(this.Bytes) if err != nil { panic(err) } - this.Hash = h.Sum(nil) + this.Hash = new(InfoHash) + missinggo.CopyExact(this.Hash, h.Sum(nil)) return bencode.Unmarshal(data, &this.Info) } @@ -258,3 +258,15 @@ func (mi *MetaInfo) SetDefaults() { } type InfoHash [20]byte + +func (me *InfoHash) Bytes() []byte { + return me[:] +} + +func (ih *InfoHash) AsString() string { + return string(ih[:]) +} + +func (ih *InfoHash) HexString() string { + return fmt.Sprintf("%x", ih[:]) +} diff --git a/misc.go b/misc.go index 8f13a179..aec997d4 100644 --- a/misc.go +++ b/misc.go @@ -3,7 +3,6 @@ package torrent import ( "crypto" "errors" - "fmt" "time" "github.com/anacrolix/torrent/metainfo" @@ -23,18 +22,9 @@ const ( ) type ( - InfoHash [20]byte pieceSum [20]byte ) -func (ih InfoHash) AsString() string { - return string(ih[:]) -} - -func (ih InfoHash) HexString() string { - return fmt.Sprintf("%x", ih[:]) -} - func lastChunkSpec(pieceLength, chunkSize pp.Integer) (cs chunkSpec) { cs.Begin = (pieceLength - 1) / chunkSize * chunkSize cs.Length = pieceLength - cs.Begin diff --git a/storage/file.go b/storage/file.go index 58670ddd..7f9d61b3 100644 --- a/storage/file.go +++ b/storage/file.go @@ -15,7 +15,7 @@ type fileStorage struct { completed map[[20]byte]bool } -func NewFile(baseDir string) *fileStorage { +func NewFile(baseDir string) I { return &fileStorage{ baseDir: baseDir, } @@ -54,7 +54,7 @@ func (me *fileStoragePiece) MarkComplete() error { } type fileStorageTorrent struct { - info *metainfo.Info + info *metainfo.InfoEx baseDir string } diff --git a/storage/file_test.go b/storage/file_test.go index 55ed19bf..0297c389 100644 --- a/storage/file_test.go +++ b/storage/file_test.go @@ -20,10 +20,12 @@ func TestShortFile(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(td) data := NewFile(td) - info := &metainfo.Info{ - Name: "a", - Length: 2, - PieceLength: missinggo.MiB, + info := &metainfo.InfoEx{ + Info: metainfo.Info{ + Name: "a", + Length: 2, + PieceLength: missinggo.MiB, + }, } f, err := os.Create(filepath.Join(td, "a")) err = f.Truncate(1) diff --git a/storage/interface.go b/storage/interface.go index 50347763..0662ebd2 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -23,3 +23,20 @@ type Piece interface { // Returns true if the piece is complete. GetIsComplete() bool } + +// type PieceStorage interface { +// ReadAt(metainfo.Piece, []byte, int64) (int, error) +// WriteAt(metainfo.Piece, []byte, int64) (int, error) +// MarkComplete(metainfo.Piece) error +// GetIsComplete(metainfo.Piece) bool +// } + +// type wrappedPieceStorage struct { +// ps PieceStorage +// } + +// func WrapPieceStorage(ps PieceStorage) I { +// return wrappedPieceStorage{ps} +// } + +// func (me wrappedPieceStorage) Piece(metainfo.Piece) diff --git a/storage/mmap.go b/storage/mmap.go new file mode 100644 index 00000000..e24c9c69 --- /dev/null +++ b/storage/mmap.go @@ -0,0 +1,128 @@ +package storage + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "github.com/anacrolix/missinggo" + "github.com/edsrzf/mmap-go" + + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/mmap_span" +) + +type mmapStorage struct { + baseDir string + spans map[metainfo.InfoHash]mmap_span.MMapSpan + completed map[metainfo.InfoHash]bool +} + +func NewMMap(baseDir string) I { + return &mmapStorage{ + baseDir: baseDir, + } +} + +func (me *mmapStorage) lazySpan(info *metainfo.InfoEx) error { + if me.spans == nil { + me.spans = make(map[metainfo.InfoHash]mmap_span.MMapSpan) + } + if _, ok := me.spans[*info.Hash]; ok { + return nil + } + span, err := MMapTorrent(&info.Info, me.baseDir) + if err != nil { + return err + } + me.spans[*info.Hash] = span + return nil +} + +func (me *mmapStorage) Piece(p metainfo.Piece) Piece { + err := me.lazySpan(p.Info) + if err != nil { + panic(err) + } + return mmapStoragePiece{ + storage: me, + p: p, + ReaderAt: io.NewSectionReader(me.spans[*p.Info.Hash], p.Offset(), p.Length()), + WriterAt: missinggo.NewSectionWriter(me.spans[*p.Info.Hash], p.Offset(), p.Length()), + } +} + +type mmapStoragePiece struct { + storage *mmapStorage + p metainfo.Piece + io.ReaderAt + io.WriterAt +} + +func (me mmapStoragePiece) GetIsComplete() bool { + return me.storage.completed[me.p.Hash()] +} + +func (me mmapStoragePiece) MarkComplete() error { + if me.storage.completed == nil { + me.storage.completed = make(map[metainfo.InfoHash]bool) + } + me.storage.completed[me.p.Hash()] = true + return nil +} + +func MMapTorrent(md *metainfo.Info, location string) (mms mmap_span.MMapSpan, err error) { + defer func() { + if err != nil { + mms.Close() + } + }() + for _, miFile := range md.UpvertedFiles() { + fileName := filepath.Join(append([]string{location, md.Name}, miFile.Path...)...) + err = os.MkdirAll(filepath.Dir(fileName), 0777) + if err != nil { + err = fmt.Errorf("error creating data directory %q: %s", filepath.Dir(fileName), err) + return + } + var file *os.File + file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return + } + func() { + defer file.Close() + var fi os.FileInfo + fi, err = file.Stat() + if err != nil { + return + } + if fi.Size() < miFile.Length { + err = file.Truncate(miFile.Length) + if err != nil { + return + } + } + if miFile.Length == 0 { + // Can't mmap() regions with length 0. + return + } + var mMap mmap.MMap + mMap, err = mmap.MapRegion(file, + int(miFile.Length), // Probably not great on <64 bit systems. + mmap.RDWR, 0, 0) + if err != nil { + err = fmt.Errorf("error mapping file %q, length %d: %s", file.Name(), miFile.Length, err) + return + } + if int64(len(mMap)) != miFile.Length { + panic("mmap has wrong length") + } + mms.Append(mMap) + }() + if err != nil { + return + } + } + return +} diff --git a/t.go b/t.go index 59bb285f..a4be4f63 100644 --- a/t.go +++ b/t.go @@ -17,7 +17,7 @@ type Torrent struct { // The torrent's infohash. This is fixed and cannot change. It uniquely // identifies a torrent. -func (t Torrent) InfoHash() InfoHash { +func (t Torrent) InfoHash() metainfo.InfoHash { return t.torrent.InfoHash } @@ -29,7 +29,7 @@ func (t Torrent) GotInfo() <-chan struct{} { } // Returns the metainfo info dictionary, or nil if it's not yet available. -func (t Torrent) Info() *metainfo.Info { +func (t Torrent) Info() *metainfo.InfoEx { return t.torrent.Info } diff --git a/torrent.go b/torrent.go index 5385cdee..228135c3 100644 --- a/torrent.go +++ b/torrent.go @@ -45,7 +45,7 @@ type torrent struct { // announcing, and communicating with peers. ceasingNetworking chan struct{} - InfoHash InfoHash + InfoHash metainfo.InfoHash Pieces []piece // Values are the piece indices that changed. pieceStateChanges *pubsub.PubSub @@ -57,7 +57,7 @@ type torrent struct { storage storage.I // The info dict. Nil if we don't have it (yet). - Info *metainfo.Info + Info *metainfo.InfoEx // Active peer connections, running message stream loops. Conns []*connection // Set of addrs to which we're attempting to connect. Connections are @@ -225,7 +225,11 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) { err = fmt.Errorf("bad info: %s", err) return } - t.Info = md + t.Info = &metainfo.InfoEx{ + Info: *md, + Bytes: infoBytes, + Hash: &t.InfoHash, + } t.length = 0 for _, f := range t.Info.UpvertedFiles() { t.length += f.Length @@ -472,10 +476,7 @@ func (t *torrent) MetaInfo() *metainfo.MetaInfo { panic("info bytes not set") } return &metainfo.MetaInfo{ - Info: metainfo.InfoEx{ - Info: *t.Info, - Bytes: t.MetaData, - }, + Info: *t.Info, CreationDate: time.Now().Unix(), Comment: "dynamic metainfo from client", CreatedBy: "go.torrent",