]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Limit parallelism at the piece store http backend connection level instead of at...
authorMatt Joiner <anacrolix@gmail.com>
Thu, 24 Dec 2015 14:31:50 +0000 (01:31 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 24 Dec 2015 14:31:50 +0000 (01:31 +1100)
This is in preparation for improvements that HTTP2 should provide.

data/pieceStore/dataBackend/http/backend.go
data/pieceStore/store.go

index 42557fbfa629821712d43df9ae2979d6026758d3..f45fc6d562320e715d198372a08513fc07a69da4 100644 (file)
@@ -2,9 +2,12 @@ package httpDataBackend
 
 import (
        "io"
+       "net"
        "net/http"
        "net/url"
        "path"
+       "sync"
+       "time"
 
        "github.com/anacrolix/missinggo/httpfile"
        "github.com/anacrolix/missinggo/httptoo"
@@ -12,16 +15,64 @@ import (
        "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
 )
 
-var client = http.DefaultClient
+// A net.Conn that releases a handle from the dial pool when closed.
+type dialPoolNetConn struct {
+       net.Conn
+
+       dialPool chan struct{}
+       mu       sync.Mutex
+       released bool
+}
+
+func (me *dialPoolNetConn) Close() error {
+       err := me.Conn.Close()
+       me.mu.Lock()
+       if !me.released {
+               <-me.dialPool
+               me.released = true
+       }
+       me.mu.Unlock()
+       return err
+}
 
 type backend struct {
        // Backend URL.
        url url.URL
+
+       FS httpfile.FS
 }
 
 func New(u url.URL) *backend {
+       // Limit concurrent connections at a time.
+       dialPool := make(chan struct{}, 5)
+       // Allows an extra connection through once a second to break deadlocks and
+       // help with stalls.
+       ticker := time.NewTicker(time.Second)
        return &backend{
                url: *httptoo.CopyURL(&u),
+               FS: httpfile.FS{
+                       Client: &http.Client{
+                               Transport: &http.Transport{
+                                       Dial: func(_net, addr string) (net.Conn, error) {
+                                               select {
+                                               case dialPool <- struct{}{}:
+                                               case <-ticker.C:
+                                                       go func() { dialPool <- struct{}{} }()
+                                               }
+                                               nc, err := net.Dial(_net, addr)
+                                               if err != nil {
+                                                       <-dialPool
+                                                       return nil, err
+                                               }
+                                               return &dialPoolNetConn{
+                                                       Conn:     nc,
+                                                       dialPool: dialPool,
+                                               }, nil
+                                       },
+                               },
+                       },
+                       // Client: http.DefaultClient,
+               },
        }
 }
 
@@ -41,25 +92,25 @@ func (me *backend) urlStr(_path string) string {
 }
 
 func (me *backend) Delete(path string) (err error) {
-       err = httpfile.Delete(me.urlStr(path))
+       err = me.FS.Delete(me.urlStr(path))
        err = fixErrNotFound(err)
        return
 }
 
 func (me *backend) GetLength(path string) (ret int64, err error) {
-       ret, err = httpfile.GetLength(me.urlStr(path))
+       ret, err = me.FS.GetLength(me.urlStr(path))
        err = fixErrNotFound(err)
        return
 }
 
 func (me *backend) Open(path string, flags int) (ret dataBackend.File, err error) {
-       ret, err = httpfile.Open(me.urlStr(path), flags)
+       ret, err = me.FS.Open(me.urlStr(path), flags)
        err = fixErrNotFound(err)
        return
 }
 
 func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
-       ret, err = httpfile.OpenSectionReader(me.urlStr(path), off, n)
+       ret, err = me.FS.OpenSectionReader(me.urlStr(path), off, n)
        err = fixErrNotFound(err)
        return
 }
index e15c2c3db89d8e63dea8633d198c0c9641d31e69..822ccef5457567412ebc134c3d04faf135853f9a 100644 (file)
@@ -14,14 +14,14 @@ import (
        "sync"
        "time"
 
+       "github.com/bradfitz/iter"
+
        "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
        "github.com/anacrolix/torrent/metainfo"
 )
 
 type store struct {
        db dataBackend.I
-       // Limit backend requests.
-       requestPool chan struct{}
 
        mu sync.Mutex
        // The cached completion state for pieces.
@@ -46,8 +46,7 @@ func (me *store) OpenTorrentData(info *metainfo.Info) *data {
 
 func New(db dataBackend.I) *store {
        s := &store{
-               db:          db,
-               requestPool: make(chan struct{}, 5),
+               db: db,
        }
        return s
 }
@@ -111,10 +110,6 @@ func (me *store) pieceComplete(p metainfo.Piece) bool {
        if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second {
                return false
        }
-       me.requestPool <- struct{}{}
-       defer func() {
-               <-me.requestPool
-       }()
        length, err := me.db.GetLength(me.completedPiecePath(p))
        if err == dataBackend.ErrNotFound {
                me.setCompletion(p, false)
@@ -138,10 +133,6 @@ func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err
                err = errors.New("already have piece")
                return
        }
-       me.requestPool <- struct{}{}
-       defer func() {
-               <-me.requestPool
-       }()
        f, err := me.db.Open(me.incompletePiecePath(p), os.O_WRONLY|os.O_CREATE)
        if err != nil {
                err = fmt.Errorf("error opening %q: %s", me.incompletePiecePath(p), err)
@@ -161,26 +152,6 @@ func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err
        return
 }
 
-// Wraps a Closer, releases a slot from a channel pool the first time Close is
-// called.
-type poolCloser struct {
-       mu       sync.Mutex
-       released bool
-       pool     <-chan struct{}
-       io.Closer
-}
-
-func (me *poolCloser) Close() (err error) {
-       err = me.Closer.Close()
-       me.mu.Lock()
-       if !me.released {
-               <-me.pool
-               me.released = true
-       }
-       me.mu.Unlock()
-       return
-}
-
 func (me *store) forgetCompletions() {
        me.mu.Lock()
        me.completion = nil
@@ -188,7 +159,6 @@ func (me *store) forgetCompletions() {
 }
 
 func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) {
-       me.requestPool <- struct{}{}
        rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n)
        if err == dataBackend.ErrNotFound {
                if me.isComplete(p) {
@@ -198,26 +168,15 @@ func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadClose
                rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n)
        }
        if err == dataBackend.ErrNotFound {
-               <-me.requestPool
                err = io.ErrUnexpectedEOF
                return
        }
        if err != nil {
-               <-me.requestPool
                return
        }
        // Wrap up the response body so that the request slot is released when the
        // response body is closed.
-       ret = struct {
-               io.Reader
-               io.Closer
-       }{
-               rc,
-               &poolCloser{
-                       pool:   me.requestPool,
-                       Closer: rc,
-               },
-       }
+       ret = rc
        return
 }
 
@@ -235,10 +194,6 @@ func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err
 }
 
 func (me *store) removePath(path string) (err error) {
-       me.requestPool <- struct{}{}
-       defer func() {
-               <-me.requestPool
-       }()
        err = me.db.Delete(path)
        return
 }
@@ -253,12 +208,6 @@ func (me *store) deleteCompleted(p metainfo.Piece) {
 }
 
 func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) {
-       // Yes, 2 requests occur here simultaneously, but we're not trying to be
-       // pedantic.
-       me.requestPool <- struct{}{}
-       defer func() {
-               <-me.requestPool
-       }()
        src, err := me.db.OpenSection(from, 0, n)
        if err != nil {
                return