From: Matt Joiner Date: Thu, 24 Dec 2015 14:31:50 +0000 (+1100) Subject: Limit parallelism at the piece store http backend connection level instead of at... X-Git-Tag: v1.0.0~953 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=08f8d7bd194ee51c790480fc0f5ebf695753c074;p=btrtrc.git Limit parallelism at the piece store http backend connection level instead of at the request level This is in preparation for improvements that HTTP2 should provide. --- diff --git a/data/pieceStore/dataBackend/http/backend.go b/data/pieceStore/dataBackend/http/backend.go index 42557fbf..f45fc6d5 100644 --- a/data/pieceStore/dataBackend/http/backend.go +++ b/data/pieceStore/dataBackend/http/backend.go @@ -2,9 +2,12 @@ package httpDataBackend import ( "io" + "net" "net/http" "net/url" "path" + "sync" + "time" "github.com/anacrolix/missinggo/httpfile" "github.com/anacrolix/missinggo/httptoo" @@ -12,16 +15,64 @@ import ( "github.com/anacrolix/torrent/data/pieceStore/dataBackend" ) -var client = http.DefaultClient +// A net.Conn that releases a handle from the dial pool when closed. +type dialPoolNetConn struct { + net.Conn + + dialPool chan struct{} + mu sync.Mutex + released bool +} + +func (me *dialPoolNetConn) Close() error { + err := me.Conn.Close() + me.mu.Lock() + if !me.released { + <-me.dialPool + me.released = true + } + me.mu.Unlock() + return err +} type backend struct { // Backend URL. url url.URL + + FS httpfile.FS } func New(u url.URL) *backend { + // Limit concurrent connections at a time. + dialPool := make(chan struct{}, 5) + // Allows an extra connection through once a second to break deadlocks and + // help with stalls. + ticker := time.NewTicker(time.Second) return &backend{ url: *httptoo.CopyURL(&u), + FS: httpfile.FS{ + Client: &http.Client{ + Transport: &http.Transport{ + Dial: func(_net, addr string) (net.Conn, error) { + select { + case dialPool <- struct{}{}: + case <-ticker.C: + go func() { dialPool <- struct{}{} }() + } + nc, err := net.Dial(_net, addr) + if err != nil { + <-dialPool + return nil, err + } + return &dialPoolNetConn{ + Conn: nc, + dialPool: dialPool, + }, nil + }, + }, + }, + // Client: http.DefaultClient, + }, } } @@ -41,25 +92,25 @@ func (me *backend) urlStr(_path string) string { } func (me *backend) Delete(path string) (err error) { - err = httpfile.Delete(me.urlStr(path)) + err = me.FS.Delete(me.urlStr(path)) err = fixErrNotFound(err) return } func (me *backend) GetLength(path string) (ret int64, err error) { - ret, err = httpfile.GetLength(me.urlStr(path)) + ret, err = me.FS.GetLength(me.urlStr(path)) err = fixErrNotFound(err) return } func (me *backend) Open(path string, flags int) (ret dataBackend.File, err error) { - ret, err = httpfile.Open(me.urlStr(path), flags) + ret, err = me.FS.Open(me.urlStr(path), flags) err = fixErrNotFound(err) return } func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) { - ret, err = httpfile.OpenSectionReader(me.urlStr(path), off, n) + ret, err = me.FS.OpenSectionReader(me.urlStr(path), off, n) err = fixErrNotFound(err) return } diff --git a/data/pieceStore/store.go b/data/pieceStore/store.go index e15c2c3d..822ccef5 100644 --- a/data/pieceStore/store.go +++ b/data/pieceStore/store.go @@ -14,14 +14,14 @@ import ( "sync" "time" + "github.com/bradfitz/iter" + "github.com/anacrolix/torrent/data/pieceStore/dataBackend" "github.com/anacrolix/torrent/metainfo" ) type store struct { db dataBackend.I - // Limit backend requests. - requestPool chan struct{} mu sync.Mutex // The cached completion state for pieces. @@ -46,8 +46,7 @@ func (me *store) OpenTorrentData(info *metainfo.Info) *data { func New(db dataBackend.I) *store { s := &store{ - db: db, - requestPool: make(chan struct{}, 5), + db: db, } return s } @@ -111,10 +110,6 @@ func (me *store) pieceComplete(p metainfo.Piece) bool { if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second { return false } - me.requestPool <- struct{}{} - defer func() { - <-me.requestPool - }() length, err := me.db.GetLength(me.completedPiecePath(p)) if err == dataBackend.ErrNotFound { me.setCompletion(p, false) @@ -138,10 +133,6 @@ func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err err = errors.New("already have piece") return } - me.requestPool <- struct{}{} - defer func() { - <-me.requestPool - }() f, err := me.db.Open(me.incompletePiecePath(p), os.O_WRONLY|os.O_CREATE) if err != nil { err = fmt.Errorf("error opening %q: %s", me.incompletePiecePath(p), err) @@ -161,26 +152,6 @@ func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err return } -// Wraps a Closer, releases a slot from a channel pool the first time Close is -// called. -type poolCloser struct { - mu sync.Mutex - released bool - pool <-chan struct{} - io.Closer -} - -func (me *poolCloser) Close() (err error) { - err = me.Closer.Close() - me.mu.Lock() - if !me.released { - <-me.pool - me.released = true - } - me.mu.Unlock() - return -} - func (me *store) forgetCompletions() { me.mu.Lock() me.completion = nil @@ -188,7 +159,6 @@ func (me *store) forgetCompletions() { } func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) { - me.requestPool <- struct{}{} rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n) if err == dataBackend.ErrNotFound { if me.isComplete(p) { @@ -198,26 +168,15 @@ func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadClose rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n) } if err == dataBackend.ErrNotFound { - <-me.requestPool err = io.ErrUnexpectedEOF return } if err != nil { - <-me.requestPool return } // Wrap up the response body so that the request slot is released when the // response body is closed. - ret = struct { - io.Reader - io.Closer - }{ - rc, - &poolCloser{ - pool: me.requestPool, - Closer: rc, - }, - } + ret = rc return } @@ -235,10 +194,6 @@ func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err } func (me *store) removePath(path string) (err error) { - me.requestPool <- struct{}{} - defer func() { - <-me.requestPool - }() err = me.db.Delete(path) return } @@ -253,12 +208,6 @@ func (me *store) deleteCompleted(p metainfo.Piece) { } func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) { - // Yes, 2 requests occur here simultaneously, but we're not trying to be - // pedantic. - me.requestPool <- struct{}{} - defer func() { - <-me.requestPool - }() src, err := me.db.OpenSection(from, 0, n) if err != nil { return