From 6158b1f6ec184cb77a8270596b7c3759f779b451 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 23 Jul 2014 01:54:11 +1000 Subject: [PATCH] torrentfs: Match the active torrents to those found in the given directory --- client.go | 29 +++++++++ cmd/torrentfs/main.go | 59 +++++++++--------- torrent.go | 24 ++++++-- util/dirwatch/dirwatch.go | 122 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 35 deletions(-) create mode 100644 util/dirwatch/dirwatch.go diff --git a/client.go b/client.go index 9463a8e1..0567192b 100644 --- a/client.go +++ b/client.go @@ -895,6 +895,22 @@ func (cl *Client) AddMagnet(uri string) (err error) { return } +func (me *Client) DropTorrent(infoHash InfoHash) (err error) { + me.mu.Lock() + defer me.mu.Unlock() + t, ok := me.torrents[infoHash] + if !ok { + err = fmt.Errorf("no such torrent") + return + } + err = t.Close() + if err != nil { + panic(err) + } + delete(me.torrents, infoHash) + return +} + func (me *Client) addTorrent(t *torrent) (err error) { if _, ok := me.torrents[t.InfoHash]; ok { err = fmt.Errorf("torrent infohash collision") @@ -929,6 +945,15 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) { return } +func (me *Client) AddTorrentFromFile(name string) (err error) { + mi, err := metainfo.LoadFromFile(name) + if err != nil { + err = fmt.Errorf("error loading metainfo from file: %s", err) + return + } + return me.AddTorrent(mi) +} + func (cl *Client) listenerAnnouncePort() (port int16) { l := cl.Listener if l == nil { @@ -1206,6 +1231,10 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { for p.Hashing { cl.event.Wait() } + if t.isClosed() { + cl.mu.Unlock() + return + } p.Hashing = true p.QueuedForHash = false cl.mu.Unlock() diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 21444052..dfe82050 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -1,6 +1,8 @@ package main import ( + "bitbucket.org/anacrolix/go.torrent/util" + "bitbucket.org/anacrolix/go.torrent/util/dirwatch" "flag" "log" "net" @@ -17,7 +19,6 @@ import ( fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" "bitbucket.org/anacrolix/go.torrent/fs" - "github.com/anacrolix/libtorgo/metainfo" ) var ( @@ -68,34 +69,6 @@ func setSignalHandlers() { }() } -func addTorrent(c *torrent.Client, file string) { - metaInfo, err := metainfo.LoadFromFile(file) - if err != nil { - log.Print(err) - return - } - err = c.AddTorrent(metaInfo) - if err != nil { - log.Print(err) - return - } -} - -func addTorrentDir(c *torrent.Client, _path string) { - torrentDir, err := os.Open(torrentPath) - defer torrentDir.Close() - if err != nil { - log.Fatal(err) - } - names, err := torrentDir.Readdirnames(-1) - if err != nil { - log.Fatal(err) - } - for _, name := range names { - go addTorrent(c, filepath.Join(_path, name)) - } -} - func addTestPeer(client *torrent.Client) { for _, t := range client.Torrents() { if testPeerAddr != nil { @@ -140,7 +113,33 @@ func main() { client.WriteStatus(w) }) client.Start() - addTorrentDir(client, torrentPath) + dw, err := dirwatch.New(torrentPath) + if err != nil { + log.Fatal(err) + } + go func() { + for ev := range dw.Events { + switch ev.Change { + case dirwatch.Added: + if ev.TorrentFilePath != "" { + err := client.AddTorrentFromFile(ev.TorrentFilePath) + if err != nil { + log.Printf("error adding torrent to client: %s", err) + } + } else if ev.Magnet != "" { + err := client.AddMagnet(ev.Magnet) + if err != nil { + log.Printf("error adding magnet: %s", err) + } + } + case dirwatch.Removed: + err := client.DropTorrent(ev.InfoHash) + if err != nil { + log.Printf("error dropping torrent: %s", err) + } + } + } + }() resolveTestPeerAddr() fs := torrentfs.New(client) go func() { diff --git a/torrent.go b/torrent.go index 1e842029..7140652f 100644 --- a/torrent.go +++ b/torrent.go @@ -11,6 +11,7 @@ import ( "io" "log" "net" + "sync" ) func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) { @@ -35,14 +36,17 @@ type torrentPiece struct { } type torrent struct { + closed bool InfoHash InfoHash Pieces []*torrentPiece PiecesByBytesLeft *OrderedList Data mmap_span.MMapSpan - Info *metainfo.Info - Conns []*connection - Peers []Peer - Priorities *list.List + // Prevent mutations to Data memory maps while in use as they're not safe. + dataLock sync.RWMutex + Info *metainfo.Info + Conns []*connection + Peers []Peer + Priorities *list.List // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key. Trackers [][]tracker.Client @@ -276,8 +280,16 @@ func (t *torrent) Length() int64 { return int64(t.LastPieceSize()) + int64(len(t.Pieces)-1)*int64(t.UsualPieceSize()) } +func (t *torrent) isClosed() bool { + return t.closed +} + func (t *torrent) Close() (err error) { + t.closed = true + t.dataLock.Lock() t.Data.Close() + t.Data = nil + t.dataLock.Unlock() for _, conn := range t.Conns { conn.Close() } @@ -373,12 +385,14 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) { func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() + t.dataLock.RLock() n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) + t.dataLock.RUnlock() if err != nil { panic(err) } if pp.Integer(n) != t.PieceLength(piece) { - log.Print(t.Info) + // log.Print(t.Info) panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece)) } copyHashSum(ps[:], hash.Sum(nil)) diff --git a/util/dirwatch/dirwatch.go b/util/dirwatch/dirwatch.go new file mode 100644 index 00000000..7689edd0 --- /dev/null +++ b/util/dirwatch/dirwatch.go @@ -0,0 +1,122 @@ +package dirwatch + +import ( + "bitbucket.org/anacrolix/go.torrent" + "github.com/anacrolix/libtorgo/metainfo" + "github.com/go-fsnotify/fsnotify" + "log" + "os" + "path/filepath" +) + +type Change uint + +const ( + Added Change = iota + Removed +) + +type Event struct { + Magnet string + Change + TorrentFilePath string + InfoHash torrent.InfoHash +} + +type Instance struct { + w *fsnotify.Watcher + dirName string + Events chan Event + torrentFileInfoHashes map[string]torrent.InfoHash +} + +func (me *Instance) handleEvents() { + for e := range me.w.Events { + log.Printf("event: %s", e) + me.processFile(e.Name) + } +} + +func (me *Instance) handleErrors() { + for err := range me.w.Errors { + log.Printf("error in torrent directory watcher: %s", err) + } +} + +func torrentFileInfoHash(fileName string) (ih torrent.InfoHash, ok bool) { + mi, _ := metainfo.LoadFromFile(fileName) + if mi == nil { + return + } + if 20 != copy(ih[:], mi.Info.Hash) { + panic(mi.Info.Hash) + } + ok = true + return +} + +func (me *Instance) processFile(name string) { + name = filepath.Clean(name) + log.Print(name) + if filepath.Ext(name) != ".torrent" { + return + } + ih, ok := me.torrentFileInfoHashes[name] + if ok { + me.Events <- Event{ + TorrentFilePath: name, + Change: Removed, + InfoHash: ih, + } + } + delete(me.torrentFileInfoHashes, name) + ih, ok = torrentFileInfoHash(name) + if ok { + me.torrentFileInfoHashes[name] = ih + me.Events <- Event{ + TorrentFilePath: name, + Change: Added, + InfoHash: ih, + } + } +} + +func (me *Instance) addDir() (err error) { + f, err := os.Open(me.dirName) + if err != nil { + return + } + defer f.Close() + names, err := f.Readdirnames(-1) + if err != nil { + return + } + for _, n := range names { + me.processFile(filepath.Join(me.dirName, n)) + } + return +} + +func New(dirName string) (i *Instance, err error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return + } + err = w.Add(dirName) + if err != nil { + w.Close() + return + } + i = &Instance{ + w: w, + dirName: dirName, + Events: make(chan Event), + torrentFileInfoHashes: make(map[string]torrent.InfoHash, 20), + } + go func() { + i.addDir() + go i.handleEvents() + go i.handleErrors() + }() + return +} -- 2.48.1