From: Matt Joiner Date: Thu, 26 Sep 2013 09:49:15 +0000 (+1000) Subject: Start implementing a basic client X-Git-Tag: v1.0.0~1821 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=816c263760ba1674677fd151bd43d9e1103df4bd;p=btrtrc.git Start implementing a basic client --- diff --git a/client.go b/client.go new file mode 100644 index 00000000..a3706ae4 --- /dev/null +++ b/client.go @@ -0,0 +1,204 @@ +package torrent + +import ( + "crypto" + "errors" + metainfo "github.com/nsf/libtorgo/torrent" + "io" + "launchpad.net/gommap" + "os" + "path/filepath" +) + +const ( + PieceHash = crypto.SHA1 +) + +type infoHash [20]byte + +type pieceSum [20]byte + +func copyHashSum(dst, src []byte) { + if len(dst) != len(src) || copy(dst, src) != len(dst) { + panic("hash sum sizes differ") + } +} + +func BytesInfoHash(b []byte) (ih infoHash) { + if len(b) != len(ih) { + panic("bad infohash bytes") + } + return +} + +type pieceState uint8 + +const ( + pieceStateUnknown = iota + pieceStateComplete + pieceStateIncomplete +) + +type piece struct { + State pieceState + Hash pieceSum +} + +type torrent struct { + InfoHash infoHash + Pieces []piece + Data MMapSpan + MetaInfo *metainfo.MetaInfo +} + +func (t torrent) PieceSize(piece int) (size int64) { + if piece == len(t.Pieces)-1 { + size = t.Data.Size() % t.MetaInfo.PieceLength + } + if size == 0 { + size = t.MetaInfo.PieceLength + } + return +} + +func (t torrent) PieceReader(piece int) io.Reader { + return io.NewSectionReader(t.Data, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength) +} + +func (t torrent) HashPiece(piece int) (ps pieceSum) { + hash := PieceHash.New() + n, err := io.Copy(hash, t.PieceReader(piece)) + if err != nil { + panic(err) + } + if n != t.PieceSize(piece) { + panic("hashed wrong number of bytes") + } + copyHashSum(ps[:], hash.Sum(nil)) + return +} + +type client struct { + DataDir string + + noTorrents chan struct{} + addTorrent chan *torrent + torrents map[infoHash]*torrent + torrentFinished chan infoHash + actorTask chan func() +} + +func NewClient(dataDir string) *client { + c := &client{ + DataDir: dataDir, + + noTorrents: make(chan struct{}), + addTorrent: make(chan *torrent), + torrents: make(map[infoHash]*torrent), + torrentFinished: make(chan infoHash), + actorTask: make(chan func()), + } + go c.run() + return c +} + +func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) { + defer func() { + if err != nil { + mms.Close() + mms = nil + } + }() + for _, miFile := range metaInfo.Files { + fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...) + var file *os.File + file, err = os.Open(fileName) + if err != nil { + return + } + var mMap gommap.MMap + mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) + if err != nil { + return + } + mms = append(mms, MMap{mMap}) + } + return +} + +func (me *client) AddTorrent(metaInfo *metainfo.MetaInfo) error { + torrent := &torrent{ + InfoHash: BytesInfoHash(metaInfo.InfoHash), + } + for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { + hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] + if len(hash) != PieceHash.Size() { + return errors.New("bad piece hash in metainfo") + } + piece := piece{} + copyHashSum(piece.Hash[:], hash) + torrent.Pieces = append(torrent.Pieces, piece) + } + var err error + torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir) + if err != nil { + return err + } + me.addTorrent <- torrent + return nil +} + +func (me *client) WaitAll() { + <-me.noTorrents +} + +func (me *client) Close() { +} + +func (me *client) withContext(f func()) { + me.actorTask <- f +} + +func (me *client) pieceHashed(ih infoHash, piece int, correct bool) { + torrent := me.torrents[ih] + torrent.Pieces[piece].State = func() pieceState { + if correct { + return pieceStateComplete + } else { + return pieceStateIncomplete + } + }() + for _, piece := range torrent.Pieces { + if piece.State == pieceStateUnknown { + return + } + } + me.torrentFinished <- ih +} + +func (me *client) run() { + for { + noTorrents := me.noTorrents + if len(me.torrents) != 0 { + noTorrents = nil + } + select { + case noTorrents <- struct{}{}: + case torrent := <-me.addTorrent: + if _, ok := me.torrents[torrent.InfoHash]; ok { + break + } + me.torrents[torrent.InfoHash] = torrent + for i := range torrent.Pieces { + go func(piece int) { + sum := torrent.HashPiece(piece) + me.withContext(func() { + me.pieceHashed(torrent.InfoHash, piece, sum == torrent.Pieces[piece].Hash) + }) + }(i) + } + case infoHash := <-me.torrentFinished: + delete(me.torrents, infoHash) + } + } +} diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go new file mode 100644 index 00000000..372d397c --- /dev/null +++ b/cmd/torrent/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "bitbucket.org/anacrolix/go.torrent" + "flag" + metainfo "github.com/nsf/libtorgo/torrent" + "log" +) + +var ( + downloadDir = flag.String("downloadDir", "", "directory to store download torrent data") +) + +func init() { + flag.Parse() +} + +func main() { + client := torrent.NewClient(*downloadDir) + for _, arg := range flag.Args() { + metaInfo, err := metainfo.LoadFromFile(arg) + if err != nil { + log.Fatal(err) + } + err = client.AddTorrent(metaInfo) + if err != nil { + log.Fatal(err) + } + } + client.WaitAll() + client.Close() +} diff --git a/mmap_span.go b/mmap_span.go index e8514168..f7d91059 100644 --- a/mmap_span.go +++ b/mmap_span.go @@ -5,33 +5,39 @@ import ( "launchpad.net/gommap" ) -type Mmap struct { +type MMap struct { gommap.MMap } -func (me Mmap) Size() int64 { +func (me MMap) Size() int64 { return int64(len(me.MMap)) } -type MmapSpan []Mmap +type MMapSpan []MMap -func (me MmapSpan) span() (s span) { +func (me MMapSpan) span() (s span) { for _, mmap := range me { s = append(s, mmap) } return } -func (me MmapSpan) Size() (ret int64) { +func (me MMapSpan) Close() { + for _, mMap := range me { + mMap.UnsafeUnmap() + } +} + +func (me MMapSpan) Size() (ret int64) { for _, mmap := range me { ret += mmap.Size() } return } -func (me MmapSpan) ReadAt(p []byte, off int64) (n int, err error) { +func (me MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { me.span().ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) { - _n := copy(p, interval.(Mmap).MMap[intervalOffset:]) + _n := copy(p, interval.(MMap).MMap[intervalOffset:]) p = p[_n:] n += _n return len(p) == 0 @@ -42,9 +48,9 @@ func (me MmapSpan) ReadAt(p []byte, off int64) (n int, err error) { return } -func (me MmapSpan) WriteAt(p []byte, off int64) (n int, err error) { +func (me MMapSpan) WriteAt(p []byte, off int64) (n int, err error) { me.span().ApplyTo(off, func(iOff int64, i sizer) (stop bool) { - _n := copy(i.(Mmap).MMap[iOff:], p) + _n := copy(i.(MMap).MMap[iOff:], p) p = p[_n:] n += _n return len(p) == 0