var ss []string
slices.MakeInto(&ss, mi.Nodes)
cl.AddDHTNodes(ss)
- //for _, url := range mi.UrlList {
- //T.addWebSeed(url)
- //}
+ for _, url := range mi.UrlList {
+ T.addWebSeed(url)
+ }
return
}
UpdateRequests()
WriteInterested(interested bool) bool
Cancel(request) bool
+ // Return true if there's room for more activity.
Request(request) bool
ConnectionFlags() string
Close()
return cn.conn.LocalAddr()
}
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+func (cn *peer) supportsExtension(ext pp.ExtensionName) bool {
_, ok := cn.PeerExtensionIDs[ext]
return ok
}
cn.tickleWriter()
}
+// Returns true if there's room to write more.
func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
// Returns whether the connection could be useful to us. We're seeding and
// they want data, we don't have metainfo and they can provide it, etc.
-func (c *PeerConn) useful() bool {
+func (c *peer) useful() bool {
t := c.t
if c.closed.IsSet() {
return false
return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
}
-func (c *PeerConn) peerHasWantedPieces() bool {
+func (c *peer) peerHasWantedPieces() bool {
return !c._pieceRequestOrder.IsEmpty()
}
t.reconcileHandshakeStats(c)
}
-func (c *PeerConn) peerPriority() (peerPriority, error) {
+func (c *peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
}
return addrIpOrNil(c.remoteAddr)
}
-func (c *PeerConn) remoteIpPort() IpPort {
+func (c *peer) remoteIpPort() IpPort {
ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
return IpPort{ipa.IP, uint16(ipa.Port)}
}
}
}
-func (me Index) Locate(e Extent, output Callback) {
+func (me Index) Locate(e Extent, output Callback) bool {
first := sort.Search(len(me.segments), func(i int) bool {
_e := me.segments[i]
return _e.End() > e.Start
})
if first == len(me.segments) {
- return
+ return false
}
e.Start -= me.segments[first].Start
me.segments = me.segments[first:]
- Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+ return Scan(me.iterSegments(), e, func(i int, e Extent) bool {
return output(i+first, e)
})
}
LengthIter = func() (Length, bool)
)
-func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
+func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
i := 0
for needle.Length != 0 {
l, ok := haystack()
if !ok {
- return
+ return false
}
if needle.Start < l || needle.Start == l && l == 0 {
e1 := Extent{
}
if e1.Length >= 0 {
if !callback(i, e1) {
- return
+ return true
}
needle.Start = 0
needle.Length -= e1.Length
}
i++
}
+ return true
}
func LocaterFromLengthIter(li LengthIter) Locater {
- return func(e Extent, c Callback) {
- Scan(li, e, c)
+ return func(e Extent, c Callback) bool {
+ return Scan(li, e, c)
}
}
-type Locater func(Extent, Callback)
+type Locater func(Extent, Callback) bool
"fmt"
"io"
"math/rand"
+ "net/http"
"net/url"
+ "sort"
"sync"
"text/tabwriter"
"time"
"unsafe"
+ "github.com/anacrolix/torrent/common"
+ "github.com/anacrolix/torrent/segments"
+ "github.com/anacrolix/torrent/webseed"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
- info *metainfo.Info
- files *[]*File
+ info *metainfo.Info
+ fileIndex segments.Index
+ files *[]*File
webSeeds map[string]*peer
t.nameMu.Lock()
t.info = info
t.nameMu.Unlock()
+ t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
t.displayName = "" // Save a few bytes lol.
t.initFiles()
t.cacheLength()
spew.NewDefaultConfig()
spew.Fdump(w, t.statsLocked())
- conns := t.connsAsSlice()
- slices.Sort(conns, worseConn)
- for i, c := range conns {
+ peers := t.peersAsSlice()
+ sort.Slice(peers, func(i, j int) bool {
+ return worseConn(peers[i], peers[j])
+ })
+ for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
c.writeStatus(w, t)
}
})
}
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
func (t *Torrent) worstBadConn() *PeerConn {
wcs := worseConnSlice{t.unclosedConnsAsSlice()}
heap.Init(&wcs)
defer t.cl.unlock()
oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max
- wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+ wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+ return worseConn(&l.peer, &r.peer)
+ })
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(wcs.Pop().(*PeerConn))
}
}
}
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
- for c := range t.conns {
- ret = append(ret, c)
- }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+ t.iterPeers(func(p *peer) {
+ ret = append(ret, p)
+ })
return
}
if _, ok := t.webSeeds[url]; ok {
return
}
- p := &peer{
- t: t,
- connString: url,
- outgoing: true,
- network: "http",
- reconciledHandshakeStats: true,
- peerSentHaveAll: true,
- }
ws := webSeed{
- peer: p,
+ peer: peer{
+ t: t,
+ connString: url,
+ outgoing: true,
+ network: "http",
+ reconciledHandshakeStats: true,
+ peerSentHaveAll: true,
+ PeerMaxRequests: 10,
+ },
+ client: webseed.Client{
+ HttpClient: http.DefaultClient,
+ Url: url,
+ FileIndex: t.fileIndex,
+ Info: t.info,
+ Events: make(chan webseed.ClientEvent),
+ },
}
- p.PeerImpl = &ws
- t.webSeeds[url] = p
+ ws.peer.PeerImpl = &ws
+ t.webSeeds[url] = &ws.peer
}
import (
"net/http"
+
+ "github.com/anacrolix/torrent/segments"
+ "github.com/anacrolix/torrent/webseed"
)
+type httpRequestResult struct {
+ resp *http.Response
+ err error
+}
+
+type requestPart struct {
+ req *http.Request
+ e segments.Extent
+ result chan httpRequestResult
+}
+
+type webseedRequest struct {
+ cancel func()
+}
+
type webSeed struct {
- peer *peer
- httpClient *http.Client
- url string
+ client webseed.Client
+ peer peer
}
+type webseedClientEvent interface{}
+
+type webseedRequestFailed struct {
+ r request
+ err error
+}
+
+var _ PeerImpl = (*webSeed)(nil)
+
func (ws *webSeed) PostCancel(r request) {
- panic("implement me")
+ ws.Cancel(r)
}
func (ws *webSeed) WriteInterested(interested bool) bool {
}
func (ws *webSeed) Cancel(r request) bool {
- panic("implement me")
+ //panic("implement me")
+ return true
}
func (ws *webSeed) Request(r request) bool {
- panic("implement me")
+ ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
+ return true
}
func (ws *webSeed) ConnectionFlags() string {
package webseed
import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
"net/http"
- pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/segments"
)
-type RequestSpec = pp.RequestSpec
+type RequestSpec = segments.Extent
+
+type httpRequestResult struct {
+ resp *http.Response
+ err error
+}
+
+type requestPart struct {
+ req *http.Request
+ e segments.Extent
+ result chan httpRequestResult
+}
+
+type request struct {
+ cancel func()
+}
type Client struct {
HttpClient *http.Client
Url string
+ FileIndex segments.Index
+ Info *metainfo.Info
requests map[RequestSpec]request
+ Events chan ClientEvent
}
-type request struct {
- cancel func()
+type ClientEvent struct {
+ RequestSpec RequestSpec
+ Bytes []byte
+ Err error
+}
+
+func (ws *Client) Cancel(r RequestSpec) {
+ ws.requests[r].cancel()
+}
+
+func (ws *Client) Request(r RequestSpec) {
+ ctx, cancel := context.WithCancel(context.Background())
+ var requestParts []requestPart
+ if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
+ req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
+ if err != nil {
+ panic(err)
+ }
+ req = req.WithContext(ctx)
+ part := requestPart{
+ req: req,
+ result: make(chan httpRequestResult, 1),
+ e: e,
+ }
+ go func() {
+ resp, err := ws.HttpClient.Do(req)
+ part.result <- httpRequestResult{
+ resp: resp,
+ err: err,
+ }
+ }()
+ requestParts = append(requestParts, part)
+ return true
+ }) {
+ panic("request out of file bounds")
+ }
+ if ws.requests == nil {
+ ws.requests = make(map[RequestSpec]request)
+ }
+ ws.requests[r] = request{cancel}
+ go func() {
+ b, err := readRequestPartResponses(requestParts)
+ ws.Events <- ClientEvent{
+ RequestSpec: r,
+ Bytes: b,
+ Err: err,
+ }
+ }()
+}
+
+func recvPartResult(buf io.Writer, part requestPart) error {
+ result := <-part.result
+ if result.err != nil {
+ return result.err
+ }
+ defer result.resp.Body.Close()
+ if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent {
+ return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode)
+ }
+ copied, err := io.Copy(buf, result.resp.Body)
+ if err != nil {
+ return err
+ }
+ if copied != part.e.Length {
+ return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
+ }
+ return nil
}
-func (cl *Client) Request(r RequestSpec) {
- //cl.HttpClient.Do()
+func readRequestPartResponses(parts []requestPart) ([]byte, error) {
+ var buf bytes.Buffer
+ for _, part := range parts {
+ err := recvPartResult(&buf, part)
+ if err != nil {
+ return buf.Bytes(), err
+ }
+ }
+ return buf.Bytes(), nil
}
--- /dev/null
+package webseed
+
+import (
+ "fmt"
+ "net/http"
+ "path"
+ "strings"
+
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+// Creates a request per BEP 19.
+func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) {
+ fileInfo := info.UpvertedFiles()[fileIndex]
+ if strings.HasSuffix(url, "/") {
+ url += path.Join(append([]string{info.Name}, fileInfo.Path...)...)
+ }
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ if offset != 0 || length != fileInfo.Length {
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
+ }
+ return req, nil
+}
"github.com/anacrolix/multiless"
)
-func worseConn(l, r *PeerConn) bool {
+func worseConn(l, r *peer) bool {
less, ok := multiless.New().Bool(
l.useful(), r.useful()).CmpInt64(
l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64(
}
func (me worseConnSlice) Less(i, j int) bool {
- return worseConn(me.conns[i], me.conns[j])
+ return worseConn(&me.conns[i].peer, &me.conns[j].peer)
}
func (me *worseConnSlice) Pop() interface{} {