import (
"io"
+ "net"
"net/http"
"net/url"
"path"
+ "sync"
+ "time"
"github.com/anacrolix/missinggo/httpfile"
"github.com/anacrolix/missinggo/httptoo"
"github.com/anacrolix/torrent/data/pieceStore/dataBackend"
)
-var client = http.DefaultClient
+// A net.Conn that releases a handle from the dial pool when closed.
+type dialPoolNetConn struct {
+ net.Conn
+
+ dialPool chan struct{}
+ mu sync.Mutex
+ released bool
+}
+
+func (me *dialPoolNetConn) Close() error {
+ err := me.Conn.Close()
+ me.mu.Lock()
+ if !me.released {
+ <-me.dialPool
+ me.released = true
+ }
+ me.mu.Unlock()
+ return err
+}
type backend struct {
// Backend URL.
url url.URL
+
+ FS httpfile.FS
}
func New(u url.URL) *backend {
+ // Limit concurrent connections at a time.
+ dialPool := make(chan struct{}, 5)
+ // Allows an extra connection through once a second to break deadlocks and
+ // help with stalls.
+ ticker := time.NewTicker(time.Second)
return &backend{
url: *httptoo.CopyURL(&u),
+ FS: httpfile.FS{
+ Client: &http.Client{
+ Transport: &http.Transport{
+ Dial: func(_net, addr string) (net.Conn, error) {
+ select {
+ case dialPool <- struct{}{}:
+ case <-ticker.C:
+ go func() { dialPool <- struct{}{} }()
+ }
+ nc, err := net.Dial(_net, addr)
+ if err != nil {
+ <-dialPool
+ return nil, err
+ }
+ return &dialPoolNetConn{
+ Conn: nc,
+ dialPool: dialPool,
+ }, nil
+ },
+ },
+ },
+ // Client: http.DefaultClient,
+ },
}
}
}
func (me *backend) Delete(path string) (err error) {
- err = httpfile.Delete(me.urlStr(path))
+ err = me.FS.Delete(me.urlStr(path))
err = fixErrNotFound(err)
return
}
func (me *backend) GetLength(path string) (ret int64, err error) {
- ret, err = httpfile.GetLength(me.urlStr(path))
+ ret, err = me.FS.GetLength(me.urlStr(path))
err = fixErrNotFound(err)
return
}
func (me *backend) Open(path string, flags int) (ret dataBackend.File, err error) {
- ret, err = httpfile.Open(me.urlStr(path), flags)
+ ret, err = me.FS.Open(me.urlStr(path), flags)
err = fixErrNotFound(err)
return
}
func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
- ret, err = httpfile.OpenSectionReader(me.urlStr(path), off, n)
+ ret, err = me.FS.OpenSectionReader(me.urlStr(path), off, n)
err = fixErrNotFound(err)
return
}
"sync"
"time"
+ "github.com/bradfitz/iter"
+
"github.com/anacrolix/torrent/data/pieceStore/dataBackend"
"github.com/anacrolix/torrent/metainfo"
)
type store struct {
db dataBackend.I
- // Limit backend requests.
- requestPool chan struct{}
mu sync.Mutex
// The cached completion state for pieces.
func New(db dataBackend.I) *store {
s := &store{
- db: db,
- requestPool: make(chan struct{}, 5),
+ db: db,
}
return s
}
if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second {
return false
}
- me.requestPool <- struct{}{}
- defer func() {
- <-me.requestPool
- }()
length, err := me.db.GetLength(me.completedPiecePath(p))
if err == dataBackend.ErrNotFound {
me.setCompletion(p, false)
err = errors.New("already have piece")
return
}
- me.requestPool <- struct{}{}
- defer func() {
- <-me.requestPool
- }()
f, err := me.db.Open(me.incompletePiecePath(p), os.O_WRONLY|os.O_CREATE)
if err != nil {
err = fmt.Errorf("error opening %q: %s", me.incompletePiecePath(p), err)
return
}
-// Wraps a Closer, releases a slot from a channel pool the first time Close is
-// called.
-type poolCloser struct {
- mu sync.Mutex
- released bool
- pool <-chan struct{}
- io.Closer
-}
-
-func (me *poolCloser) Close() (err error) {
- err = me.Closer.Close()
- me.mu.Lock()
- if !me.released {
- <-me.pool
- me.released = true
- }
- me.mu.Unlock()
- return
-}
-
func (me *store) forgetCompletions() {
me.mu.Lock()
me.completion = nil
}
func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) {
- me.requestPool <- struct{}{}
rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n)
if err == dataBackend.ErrNotFound {
if me.isComplete(p) {
rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n)
}
if err == dataBackend.ErrNotFound {
- <-me.requestPool
err = io.ErrUnexpectedEOF
return
}
if err != nil {
- <-me.requestPool
return
}
// Wrap up the response body so that the request slot is released when the
// response body is closed.
- ret = struct {
- io.Reader
- io.Closer
- }{
- rc,
- &poolCloser{
- pool: me.requestPool,
- Closer: rc,
- },
- }
+ ret = rc
return
}
}
func (me *store) removePath(path string) (err error) {
- me.requestPool <- struct{}{}
- defer func() {
- <-me.requestPool
- }()
err = me.db.Delete(path)
return
}
}
func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) {
- // Yes, 2 requests occur here simultaneously, but we're not trying to be
- // pedantic.
- me.requestPool <- struct{}{}
- defer func() {
- <-me.requestPool
- }()
src, err := me.db.OpenSection(from, 0, n)
if err != nil {
return