]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Allow configuring Client torrent data opener, config dir, disabling metainfo cache...
authorMatt Joiner <anacrolix@gmail.com>
Wed, 25 Feb 2015 03:48:39 +0000 (14:48 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 25 Feb 2015 03:48:39 +0000 (14:48 +1100)
client.go
client_test.go
config.go
data/file/file.go
fs/torrentfs_test.go
torrent.go
torrent_test.go

index 55bb9aca03c234280fae11929cf558ee80bb0d01..28ba233e14ab22a61a4959535c6787cdb02984bd 100644 (file)
--- a/client.go
+++ b/client.go
@@ -36,6 +36,8 @@ import (
        "syscall"
        "time"
 
+       filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
+
        "bitbucket.org/anacrolix/go.torrent/dht"
        "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
        "bitbucket.org/anacrolix/go.torrent/iplist"
@@ -80,6 +82,8 @@ const (
        // impact of a few bad apples. 4s loses 1% of successful handshakes that
        // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
        handshakeTimeout = 4 * time.Second
+
+       pruneInterval = 10 * time.Second
 )
 
 // Currently doesn't really queue, but should in the future.
@@ -116,6 +120,11 @@ type Client struct {
        disableTCP       bool
        ipBlockList      *iplist.IPList
        bannedTorrents   map[InfoHash]struct{}
+       _configDir       string
+       config           Config
+       pruneTimer       *time.Timer
+
+       torrentDataOpener TorrentDataOpener
 
        mu    sync.RWMutex
        event sync.Cond
@@ -219,8 +228,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
        }
 }
 
-// Read torrent data at the given offset. Returns ErrDataNotReady if the data
-// isn't available.
+// Read torrent data at the given offset. Will block until it is available.
 func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
@@ -248,10 +256,10 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err
        if len(p) == 0 {
                panic(len(p))
        }
-       for !piece.Complete() {
+       for !piece.Complete() && !t.isClosed() {
                piece.Event.Wait()
        }
-       return t.Data.ReadAt(p, off)
+       return t.data.ReadAt(p, off)
 }
 
 func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
@@ -272,7 +280,10 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
 }
 
 func (cl *Client) configDir() string {
-       return filepath.Join(os.Getenv("HOME"), ".config/torrent")
+       if cl._configDir == "" {
+               return filepath.Join(os.Getenv("HOME"), ".config/torrent")
+       }
+       return cl._configDir
 }
 
 func (cl *Client) ConfigDir() string {
@@ -393,6 +404,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                dataDir:          cfg.DataDir,
                disableUTP:       cfg.DisableUTP,
                disableTCP:       cfg.DisableTCP,
+               _configDir:       cfg.ConfigDir,
+               config:           *cfg,
+               torrentDataOpener: func(md *metainfo.Info) (TorrentData, error) {
+                       return filePkg.TorrentData(md, cfg.DataDir), nil
+               },
 
                quit:     make(chan struct{}),
                torrents: make(map[InfoHash]*torrent),
@@ -1163,7 +1179,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        // routine.
                        // c.PeerRequests[request] = struct{}{}
                        p := make([]byte, msg.Length)
-                       n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+                       n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
                        if err != nil {
                                return fmt.Errorf("reading t data to serve request %q: %s", request, err)
                        }
@@ -1499,22 +1515,10 @@ func (cl *Client) saveTorrentFile(t *torrent) error {
        return nil
 }
 
-func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
-       err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu)
-       if err != nil {
-               return
-       }
-
-       if err := cl.saveTorrentFile(t); err != nil {
-               log.Printf("error saving torrent file for %s: %s", t, err)
+func (cl *Client) startTorrent(t *torrent) {
+       if t.Info == nil || t.data == nil {
+               panic("nope")
        }
-
-       if strings.Contains(strings.ToLower(md.Name), "porn") {
-               cl.dropTorrent(t.InfoHash)
-               err = errors.New("no porn plx")
-               return
-       }
-
        // If the client intends to upload, it needs to know what state pieces are
        // in.
        if !cl.noUpload {
@@ -1529,9 +1533,43 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
                        }
                }()
        }
-
        cl.downloadStrategy.TorrentStarted(t)
+}
+
+// Storage cannot be changed once it's set.
+func (cl *Client) setStorage(t *torrent, td TorrentData) (err error) {
+       err = t.setStorage(td)
+       cl.event.Broadcast()
+       if err != nil {
+               return
+       }
+       cl.startTorrent(t)
+       return
+}
+
+type TorrentDataOpener func(*metainfo.Info) (TorrentData, error)
+
+func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
+       err = t.setMetadata(md, bytes, &cl.mu)
+       if err != nil {
+               return
+       }
+       if !cl.config.DisableMetainfoCache {
+               if err := cl.saveTorrentFile(t); err != nil {
+                       log.Printf("error saving torrent file for %s: %s", t, err)
+               }
+       }
+       if strings.Contains(strings.ToLower(md.Name), "porn") {
+               cl.dropTorrent(t.InfoHash)
+               err = errors.New("no porn plx")
+               return
+       }
        close(t.gotMetainfo)
+       td, err := cl.torrentDataOpener(&md)
+       if err != nil {
+               return
+       }
+       err = cl.setStorage(t, td)
        return
 }
 
@@ -1722,6 +1760,9 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
 
 // Returns nil metainfo if it isn't in the cache.
 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
+       if cl.config.DisableMetainfoCache {
+               return
+       }
        f, err := os.Open(cl.torrentFileCachePath(ih))
        if err != nil {
                if os.IsNotExist(err) {
@@ -1768,34 +1809,33 @@ func (cl *Client) AddMagnet(uri string) (T Torrent, err error) {
        return
 }
 
-// Actively prunes unused connections. This is required to make space to dial
-// for replacements.
-func (cl *Client) connectionPruner(t *torrent) {
-       for {
-               select {
-               case <-t.ceasingNetworking:
-                       return
-               case <-t.closing:
-                       return
-               case <-time.After(15 * time.Second):
+// Prunes unused connections. This is required to make space to dial for
+// replacements.
+func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
+       select {
+       case <-t.ceasingNetworking:
+               return
+       case <-t.closing:
+               return
+       default:
+       }
+       cl.mu.Lock()
+       license := len(t.Conns) - (socketsPerTorrent+1)/2
+       for _, c := range t.Conns {
+               if license <= 0 {
+                       break
                }
-               cl.mu.Lock()
-               license := len(t.Conns) - (socketsPerTorrent+1)/2
-               for _, c := range t.Conns {
-                       if license <= 0 {
-                               break
-                       }
-                       if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
-                               continue
-                       }
-                       if time.Now().Sub(c.completedHandshake) < time.Minute {
-                               continue
-                       }
-                       c.Close()
-                       license--
+               if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
+                       continue
                }
-               cl.mu.Unlock()
+               if time.Now().Sub(c.completedHandshake) < time.Minute {
+                       continue
+               }
+               c.Close()
+               license--
        }
+       cl.mu.Unlock()
+       t.pruneTimer.Reset(pruneInterval)
 }
 
 func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
@@ -1835,7 +1875,9 @@ func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Tor
                if me.dHT != nil {
                        go me.announceTorrentDHT(T.torrent, true)
                }
-               go me.connectionPruner(T.torrent)
+               T.torrent.pruneTimer = time.AfterFunc(0, func() {
+                       me.pruneConnectionsUnlocked(T.torrent)
+               })
        }
        return
 }
@@ -2178,7 +2220,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        p := t.Pieces[index]
-       for p.Hashing {
+       for p.Hashing || t.data == nil {
                cl.event.Wait()
        }
        if t.isClosed() {
index 71f419c7886fb6674a2ef6798fe2d4387b0764ec..c480fecd66c8266a9a72002d2d78d541019acccb 100644 (file)
@@ -70,7 +70,7 @@ func TestTorrentInitialState(t *testing.T) {
        if err != nil {
                t.Fatal(err)
        }
-       err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes, nil)
+       err = tor.setMetadata(mi.Info.Info, mi.Info.Bytes, nil)
        if err != nil {
                t.Fatal(err)
        }
index c78db5be6f84d246dd6f1e31ee863f82f67c1e4a..29d35c6ab5164e5c72ed94bf83e8ba66b66681b5 100644 (file)
--- a/config.go
+++ b/config.go
@@ -16,4 +16,8 @@ type Config struct {
        DisableUTP         bool
        DisableTCP         bool
        NoDefaultBlocklist bool
+       // Defaults to "$HOME/.config/torrent"
+       ConfigDir            string
+       DisableMetainfoCache bool
+       TorrentDataOpener
 }
index 7017e105efd4eb1e94acb3c5992d31b64a4c0adb..5e6608defb6f178cbe8111f26214f68af583d9c2 100644 (file)
@@ -13,12 +13,11 @@ type data struct {
        loc  string
 }
 
-func TorrentData(md *metainfo.Info, location string) (ret *data, err error) {
-       ret = &data{md, location}
-       return
+func TorrentData(md *metainfo.Info, location string) data {
+       return data{md, location}
 }
 
-func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
+func (me data) ReadAt(p []byte, off int64) (n int, err error) {
        for _, fi := range me.info.UpvertedFiles() {
                if off >= fi.Length {
                        off -= fi.Length
@@ -48,9 +47,9 @@ func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
        return
 }
 
-func (me *data) Close() {}
+func (me data) Close() {}
 
-func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
+func (me data) WriteAt(p []byte, off int64) (n int, err error) {
        for _, fi := range me.info.UpvertedFiles() {
                if off >= fi.Length {
                        off -= fi.Length
@@ -82,7 +81,7 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
        return
 }
 
-func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
+func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
        for _, fi := range me.info.UpvertedFiles() {
                if off >= fi.Length {
                        off -= fi.Length
@@ -112,6 +111,6 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
        return
 }
 
-func (me *data) fileInfoName(fi metainfo.FileInfo) string {
+func (me data) fileInfoName(fi metainfo.FileInfo) string {
        return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...)
 }
index 049e5f8cdc39d46fe14c1714fe075c1ac4c2059f..f6a3cef997b52d629a0dbbbbd0c6b2e1b2fc94e9 100644 (file)
@@ -169,6 +169,9 @@ func TestDownloadOnDemand(t *testing.T) {
                ListenAddr:      ":0",
 
                NoDefaultBlocklist: true,
+               // Ensure that the metainfo is obtained over the wire, since we added
+               // the torrent to the seeder by magnet.
+               DisableMetainfoCache: true,
        })
        if err != nil {
                t.Fatalf("error creating seeder client: %s", err)
@@ -183,7 +186,6 @@ func TestDownloadOnDemand(t *testing.T) {
                t.Fatal(err)
        }
        leecher, err := torrent.NewClient(&torrent.Config{
-               DataDir:         filepath.Join(layout.BaseDir, "download"),
                DisableTrackers: true,
                NoDHT:           true,
                ListenAddr:      ":0",
@@ -191,6 +193,10 @@ func TestDownloadOnDemand(t *testing.T) {
 
                NoDefaultBlocklist: true,
 
+               TorrentDataOpener: func(info *metainfo.Info) (torrent.TorrentData, error) {
+                       return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
+               },
+
                // This can be used to check if clients can connect to other clients
                // with the same ID.
 
index 450f15fe35a043290f209009740d793ab71c65b0..e962fdca0a739f6cc823af40730552adc794aeb6 100644 (file)
@@ -10,8 +10,6 @@ import (
        "sync"
        "time"
 
-       "bitbucket.org/anacrolix/go.torrent/data/file"
-
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
        "bitbucket.org/anacrolix/go.torrent/tracker"
        "bitbucket.org/anacrolix/go.torrent/util"
@@ -40,7 +38,7 @@ type peersKey struct {
        Port    int
 }
 
-type torrentData interface {
+type TorrentData interface {
        ReadAt(p []byte, off int64) (n int, err error)
        Close()
        WriteAt(p []byte, off int64) (n int, err error)
@@ -60,9 +58,7 @@ type torrent struct {
        Pieces   []*piece
        length   int64
 
-       // Prevent mutations to Data memory maps while in use as they're not safe.
-       dataLock sync.RWMutex
-       Data     torrentData
+       data TorrentData
 
        Info *MetaInfo
        // Active peer connections.
@@ -85,6 +81,8 @@ type torrent struct {
 
        gotMetainfo chan struct{}
        GotMetainfo <-chan struct{}
+
+       pruneTimer *time.Timer
 }
 
 func (t *torrent) numConnsUnchoked() (num int) {
@@ -129,6 +127,7 @@ func (t *torrent) ceaseNetworking() {
        for _, c := range t.Conns {
                c.Close()
        }
+       t.pruneTimer.Stop()
 }
 
 func (t *torrent) AddPeers(pp []Peer) {
@@ -183,7 +182,7 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) {
 }
 
 // Called when metadata for a torrent becomes available.
-func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) {
+func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
        t.Info = newMetaInfo(&md)
        t.length = 0
        for _, f := range t.Info.UpvertedFiles() {
@@ -204,11 +203,14 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
                        conn.Close()
                }
        }
-       t.Data, err = file.TorrentData(&md, dataDir)
-       if err != nil {
-               err = fmt.Errorf("error mmap'ing torrent data: %s", err)
-               return
+       return
+}
+
+func (t *torrent) setStorage(td TorrentData) (err error) {
+       if t.data != nil {
+               t.data.Close()
        }
+       t.data = td
        return
 }
 
@@ -477,12 +479,9 @@ func (t *torrent) close() (err error) {
        }
        t.ceaseNetworking()
        close(t.closing)
-       t.dataLock.Lock()
-       if t.Data != nil {
-               t.Data.Close()
-               t.Data = nil
+       if t.data != nil {
+               t.data.Close()
        }
-       t.dataLock.Unlock()
        for _, conn := range t.Conns {
                conn.Close()
        }
@@ -525,7 +524,7 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
 }
 
 func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
-       _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+       _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
        return
 }
 
@@ -583,9 +582,7 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
 
 func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
        hash := pieceHash.New()
-       t.dataLock.RLock()
-       t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
-       t.dataLock.RUnlock()
+       t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
        util.CopyExact(ps[:], hash.Sum(nil))
        return
 }
index ab92429bb06fb457c26b6451e6bb5a0bce8ff170..cd73b62ab13f0c46600df0ca740c28cb672fb13e 100644 (file)
@@ -3,6 +3,7 @@ package torrent
 import (
        "sync"
        "testing"
+       "time"
 
        "bitbucket.org/anacrolix/go.torrent/peer_protocol"
 )
@@ -45,6 +46,7 @@ func TestTorrentRequest(t *testing.T) {
 
 func TestTorrentDoubleClose(t *testing.T) {
        tt, err := newTorrent(InfoHash{}, nil, 0)
+       tt.pruneTimer = time.NewTimer(0)
        if err != nil {
                t.Fatal(err)
        }