return
}
+func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ t, ok := me.torrents[infoHash]
+ if !ok {
+ err = fmt.Errorf("no such torrent")
+ return
+ }
+ err = t.Close()
+ if err != nil {
+ panic(err)
+ }
+ delete(me.torrents, infoHash)
+ return
+}
+
func (me *Client) addTorrent(t *torrent) (err error) {
if _, ok := me.torrents[t.InfoHash]; ok {
err = fmt.Errorf("torrent infohash collision")
return
}
+func (me *Client) AddTorrentFromFile(name string) (err error) {
+ mi, err := metainfo.LoadFromFile(name)
+ if err != nil {
+ err = fmt.Errorf("error loading metainfo from file: %s", err)
+ return
+ }
+ return me.AddTorrent(mi)
+}
+
func (cl *Client) listenerAnnouncePort() (port int16) {
l := cl.Listener
if l == nil {
for p.Hashing {
cl.event.Wait()
}
+ if t.isClosed() {
+ cl.mu.Unlock()
+ return
+ }
p.Hashing = true
p.QueuedForHash = false
cl.mu.Unlock()
package main
import (
+ "bitbucket.org/anacrolix/go.torrent/util"
+ "bitbucket.org/anacrolix/go.torrent/util/dirwatch"
"flag"
"log"
"net"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"bitbucket.org/anacrolix/go.torrent/fs"
- "github.com/anacrolix/libtorgo/metainfo"
)
var (
}()
}
-func addTorrent(c *torrent.Client, file string) {
- metaInfo, err := metainfo.LoadFromFile(file)
- if err != nil {
- log.Print(err)
- return
- }
- err = c.AddTorrent(metaInfo)
- if err != nil {
- log.Print(err)
- return
- }
-}
-
-func addTorrentDir(c *torrent.Client, _path string) {
- torrentDir, err := os.Open(torrentPath)
- defer torrentDir.Close()
- if err != nil {
- log.Fatal(err)
- }
- names, err := torrentDir.Readdirnames(-1)
- if err != nil {
- log.Fatal(err)
- }
- for _, name := range names {
- go addTorrent(c, filepath.Join(_path, name))
- }
-}
-
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
if testPeerAddr != nil {
client.WriteStatus(w)
})
client.Start()
- addTorrentDir(client, torrentPath)
+ dw, err := dirwatch.New(torrentPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ go func() {
+ for ev := range dw.Events {
+ switch ev.Change {
+ case dirwatch.Added:
+ if ev.TorrentFilePath != "" {
+ err := client.AddTorrentFromFile(ev.TorrentFilePath)
+ if err != nil {
+ log.Printf("error adding torrent to client: %s", err)
+ }
+ } else if ev.Magnet != "" {
+ err := client.AddMagnet(ev.Magnet)
+ if err != nil {
+ log.Printf("error adding magnet: %s", err)
+ }
+ }
+ case dirwatch.Removed:
+ err := client.DropTorrent(ev.InfoHash)
+ if err != nil {
+ log.Printf("error dropping torrent: %s", err)
+ }
+ }
+ }
+ }()
resolveTestPeerAddr()
fs := torrentfs.New(client)
go func() {
"io"
"log"
"net"
+ "sync"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
}
type torrent struct {
+ closed bool
InfoHash InfoHash
Pieces []*torrentPiece
PiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
- Info *metainfo.Info
- Conns []*connection
- Peers []Peer
- Priorities *list.List
+ // Prevent mutations to Data memory maps while in use as they're not safe.
+ dataLock sync.RWMutex
+ Info *metainfo.Info
+ Conns []*connection
+ Peers []Peer
+ Priorities *list.List
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
return int64(t.LastPieceSize()) + int64(len(t.Pieces)-1)*int64(t.UsualPieceSize())
}
+func (t *torrent) isClosed() bool {
+ return t.closed
+}
+
func (t *torrent) Close() (err error) {
+ t.closed = true
+ t.dataLock.Lock()
t.Data.Close()
+ t.Data = nil
+ t.dataLock.Unlock()
for _, conn := range t.Conns {
conn.Close()
}
func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New()
+ t.dataLock.RLock()
n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
+ t.dataLock.RUnlock()
if err != nil {
panic(err)
}
if pp.Integer(n) != t.PieceLength(piece) {
- log.Print(t.Info)
+ // log.Print(t.Info)
panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
}
copyHashSum(ps[:], hash.Sum(nil))
--- /dev/null
+package dirwatch
+
+import (
+ "bitbucket.org/anacrolix/go.torrent"
+ "github.com/anacrolix/libtorgo/metainfo"
+ "github.com/go-fsnotify/fsnotify"
+ "log"
+ "os"
+ "path/filepath"
+)
+
+type Change uint
+
+const (
+ Added Change = iota
+ Removed
+)
+
+type Event struct {
+ Magnet string
+ Change
+ TorrentFilePath string
+ InfoHash torrent.InfoHash
+}
+
+type Instance struct {
+ w *fsnotify.Watcher
+ dirName string
+ Events chan Event
+ torrentFileInfoHashes map[string]torrent.InfoHash
+}
+
+func (me *Instance) handleEvents() {
+ for e := range me.w.Events {
+ log.Printf("event: %s", e)
+ me.processFile(e.Name)
+ }
+}
+
+func (me *Instance) handleErrors() {
+ for err := range me.w.Errors {
+ log.Printf("error in torrent directory watcher: %s", err)
+ }
+}
+
+func torrentFileInfoHash(fileName string) (ih torrent.InfoHash, ok bool) {
+ mi, _ := metainfo.LoadFromFile(fileName)
+ if mi == nil {
+ return
+ }
+ if 20 != copy(ih[:], mi.Info.Hash) {
+ panic(mi.Info.Hash)
+ }
+ ok = true
+ return
+}
+
+func (me *Instance) processFile(name string) {
+ name = filepath.Clean(name)
+ log.Print(name)
+ if filepath.Ext(name) != ".torrent" {
+ return
+ }
+ ih, ok := me.torrentFileInfoHashes[name]
+ if ok {
+ me.Events <- Event{
+ TorrentFilePath: name,
+ Change: Removed,
+ InfoHash: ih,
+ }
+ }
+ delete(me.torrentFileInfoHashes, name)
+ ih, ok = torrentFileInfoHash(name)
+ if ok {
+ me.torrentFileInfoHashes[name] = ih
+ me.Events <- Event{
+ TorrentFilePath: name,
+ Change: Added,
+ InfoHash: ih,
+ }
+ }
+}
+
+func (me *Instance) addDir() (err error) {
+ f, err := os.Open(me.dirName)
+ if err != nil {
+ return
+ }
+ defer f.Close()
+ names, err := f.Readdirnames(-1)
+ if err != nil {
+ return
+ }
+ for _, n := range names {
+ me.processFile(filepath.Join(me.dirName, n))
+ }
+ return
+}
+
+func New(dirName string) (i *Instance, err error) {
+ w, err := fsnotify.NewWatcher()
+ if err != nil {
+ return
+ }
+ err = w.Add(dirName)
+ if err != nil {
+ w.Close()
+ return
+ }
+ i = &Instance{
+ w: w,
+ dirName: dirName,
+ Events: make(chan Event),
+ torrentFileInfoHashes: make(map[string]torrent.InfoHash, 20),
+ }
+ go func() {
+ i.addDir()
+ go i.handleEvents()
+ go i.handleErrors()
+ }()
+ return
+}