]> Sergey Matveev's repositories - btrtrc.git/commitdiff
torrentfs: Match the active torrents to those found in the given directory
authorMatt Joiner <anacrolix@gmail.com>
Tue, 22 Jul 2014 15:54:11 +0000 (01:54 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 22 Jul 2014 15:54:11 +0000 (01:54 +1000)
client.go
cmd/torrentfs/main.go
torrent.go
util/dirwatch/dirwatch.go [new file with mode: 0644]

index 9463a8e1f570c156ee14b5663d2a227b7c2124b8..0567192b72efe19f0990285b65db66912cd8df45 100644 (file)
--- a/client.go
+++ b/client.go
@@ -895,6 +895,22 @@ func (cl *Client) AddMagnet(uri string) (err error) {
        return
 }
 
+func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       t, ok := me.torrents[infoHash]
+       if !ok {
+               err = fmt.Errorf("no such torrent")
+               return
+       }
+       err = t.Close()
+       if err != nil {
+               panic(err)
+       }
+       delete(me.torrents, infoHash)
+       return
+}
+
 func (me *Client) addTorrent(t *torrent) (err error) {
        if _, ok := me.torrents[t.InfoHash]; ok {
                err = fmt.Errorf("torrent infohash collision")
@@ -929,6 +945,15 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
        return
 }
 
+func (me *Client) AddTorrentFromFile(name string) (err error) {
+       mi, err := metainfo.LoadFromFile(name)
+       if err != nil {
+               err = fmt.Errorf("error loading metainfo from file: %s", err)
+               return
+       }
+       return me.AddTorrent(mi)
+}
+
 func (cl *Client) listenerAnnouncePort() (port int16) {
        l := cl.Listener
        if l == nil {
@@ -1206,6 +1231,10 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
        for p.Hashing {
                cl.event.Wait()
        }
+       if t.isClosed() {
+               cl.mu.Unlock()
+               return
+       }
        p.Hashing = true
        p.QueuedForHash = false
        cl.mu.Unlock()
index 21444052a9dc08f7c1853987011857ba887c3310..dfe820501dd4b0f28d981c25b92b181005aa8ed0 100644 (file)
@@ -1,6 +1,8 @@
 package main
 
 import (
+       "bitbucket.org/anacrolix/go.torrent/util"
+       "bitbucket.org/anacrolix/go.torrent/util/dirwatch"
        "flag"
        "log"
        "net"
@@ -17,7 +19,6 @@ import (
        fusefs "bazil.org/fuse/fs"
        "bitbucket.org/anacrolix/go.torrent"
        "bitbucket.org/anacrolix/go.torrent/fs"
-       "github.com/anacrolix/libtorgo/metainfo"
 )
 
 var (
@@ -68,34 +69,6 @@ func setSignalHandlers() {
        }()
 }
 
-func addTorrent(c *torrent.Client, file string) {
-       metaInfo, err := metainfo.LoadFromFile(file)
-       if err != nil {
-               log.Print(err)
-               return
-       }
-       err = c.AddTorrent(metaInfo)
-       if err != nil {
-               log.Print(err)
-               return
-       }
-}
-
-func addTorrentDir(c *torrent.Client, _path string) {
-       torrentDir, err := os.Open(torrentPath)
-       defer torrentDir.Close()
-       if err != nil {
-               log.Fatal(err)
-       }
-       names, err := torrentDir.Readdirnames(-1)
-       if err != nil {
-               log.Fatal(err)
-       }
-       for _, name := range names {
-               go addTorrent(c, filepath.Join(_path, name))
-       }
-}
-
 func addTestPeer(client *torrent.Client) {
        for _, t := range client.Torrents() {
                if testPeerAddr != nil {
@@ -140,7 +113,33 @@ func main() {
                client.WriteStatus(w)
        })
        client.Start()
-       addTorrentDir(client, torrentPath)
+       dw, err := dirwatch.New(torrentPath)
+       if err != nil {
+               log.Fatal(err)
+       }
+       go func() {
+               for ev := range dw.Events {
+                       switch ev.Change {
+                       case dirwatch.Added:
+                               if ev.TorrentFilePath != "" {
+                                       err := client.AddTorrentFromFile(ev.TorrentFilePath)
+                                       if err != nil {
+                                               log.Printf("error adding torrent to client: %s", err)
+                                       }
+                               } else if ev.Magnet != "" {
+                                       err := client.AddMagnet(ev.Magnet)
+                                       if err != nil {
+                                               log.Printf("error adding magnet: %s", err)
+                                       }
+                               }
+                       case dirwatch.Removed:
+                               err := client.DropTorrent(ev.InfoHash)
+                               if err != nil {
+                                       log.Printf("error dropping torrent: %s", err)
+                               }
+                       }
+               }
+       }()
        resolveTestPeerAddr()
        fs := torrentfs.New(client)
        go func() {
index 1e842029df9aced0f5ed462821c7e752cf5ece75..7140652f5fd31963f1823cb1f1ffc4edeba8d489 100644 (file)
@@ -11,6 +11,7 @@ import (
        "io"
        "log"
        "net"
+       "sync"
 )
 
 func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
@@ -35,14 +36,17 @@ type torrentPiece struct {
 }
 
 type torrent struct {
+       closed            bool
        InfoHash          InfoHash
        Pieces            []*torrentPiece
        PiecesByBytesLeft *OrderedList
        Data              mmap_span.MMapSpan
-       Info              *metainfo.Info
-       Conns             []*connection
-       Peers             []Peer
-       Priorities        *list.List
+       // Prevent mutations to Data memory maps while in use as they're not safe.
+       dataLock   sync.RWMutex
+       Info       *metainfo.Info
+       Conns      []*connection
+       Peers      []Peer
+       Priorities *list.List
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.
        Trackers      [][]tracker.Client
@@ -276,8 +280,16 @@ func (t *torrent) Length() int64 {
        return int64(t.LastPieceSize()) + int64(len(t.Pieces)-1)*int64(t.UsualPieceSize())
 }
 
+func (t *torrent) isClosed() bool {
+       return t.closed
+}
+
 func (t *torrent) Close() (err error) {
+       t.closed = true
+       t.dataLock.Lock()
        t.Data.Close()
+       t.Data = nil
+       t.dataLock.Unlock()
        for _, conn := range t.Conns {
                conn.Close()
        }
@@ -373,12 +385,14 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
 
 func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
        hash := pieceHash.New()
+       t.dataLock.RLock()
        n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
+       t.dataLock.RUnlock()
        if err != nil {
                panic(err)
        }
        if pp.Integer(n) != t.PieceLength(piece) {
-               log.Print(t.Info)
+               // log.Print(t.Info)
                panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
        }
        copyHashSum(ps[:], hash.Sum(nil))
diff --git a/util/dirwatch/dirwatch.go b/util/dirwatch/dirwatch.go
new file mode 100644 (file)
index 0000000..7689edd
--- /dev/null
@@ -0,0 +1,122 @@
+package dirwatch
+
+import (
+       "bitbucket.org/anacrolix/go.torrent"
+       "github.com/anacrolix/libtorgo/metainfo"
+       "github.com/go-fsnotify/fsnotify"
+       "log"
+       "os"
+       "path/filepath"
+)
+
+type Change uint
+
+const (
+       Added Change = iota
+       Removed
+)
+
+type Event struct {
+       Magnet string
+       Change
+       TorrentFilePath string
+       InfoHash        torrent.InfoHash
+}
+
+type Instance struct {
+       w                     *fsnotify.Watcher
+       dirName               string
+       Events                chan Event
+       torrentFileInfoHashes map[string]torrent.InfoHash
+}
+
+func (me *Instance) handleEvents() {
+       for e := range me.w.Events {
+               log.Printf("event: %s", e)
+               me.processFile(e.Name)
+       }
+}
+
+func (me *Instance) handleErrors() {
+       for err := range me.w.Errors {
+               log.Printf("error in torrent directory watcher: %s", err)
+       }
+}
+
+func torrentFileInfoHash(fileName string) (ih torrent.InfoHash, ok bool) {
+       mi, _ := metainfo.LoadFromFile(fileName)
+       if mi == nil {
+               return
+       }
+       if 20 != copy(ih[:], mi.Info.Hash) {
+               panic(mi.Info.Hash)
+       }
+       ok = true
+       return
+}
+
+func (me *Instance) processFile(name string) {
+       name = filepath.Clean(name)
+       log.Print(name)
+       if filepath.Ext(name) != ".torrent" {
+               return
+       }
+       ih, ok := me.torrentFileInfoHashes[name]
+       if ok {
+               me.Events <- Event{
+                       TorrentFilePath: name,
+                       Change:          Removed,
+                       InfoHash:        ih,
+               }
+       }
+       delete(me.torrentFileInfoHashes, name)
+       ih, ok = torrentFileInfoHash(name)
+       if ok {
+               me.torrentFileInfoHashes[name] = ih
+               me.Events <- Event{
+                       TorrentFilePath: name,
+                       Change:          Added,
+                       InfoHash:        ih,
+               }
+       }
+}
+
+func (me *Instance) addDir() (err error) {
+       f, err := os.Open(me.dirName)
+       if err != nil {
+               return
+       }
+       defer f.Close()
+       names, err := f.Readdirnames(-1)
+       if err != nil {
+               return
+       }
+       for _, n := range names {
+               me.processFile(filepath.Join(me.dirName, n))
+       }
+       return
+}
+
+func New(dirName string) (i *Instance, err error) {
+       w, err := fsnotify.NewWatcher()
+       if err != nil {
+               return
+       }
+       err = w.Add(dirName)
+       if err != nil {
+               w.Close()
+               return
+       }
+       i = &Instance{
+               w:                     w,
+               dirName:               dirName,
+               Events:                make(chan Event),
+               torrentFileInfoHashes: make(map[string]torrent.InfoHash, 20),
+       }
+       go func() {
+               i.addDir()
+               go i.handleEvents()
+               go i.handleErrors()
+       }()
+       return
+}