]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Further progress on webseeding
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 Jun 2020 08:25:45 +0000 (18:25 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 Jun 2020 08:25:45 +0000 (18:25 +1000)
client.go
peerconn.go
segments/index.go
segments/segments.go
torrent.go
web_seed.go
webseed/client.go
webseed/misc.go [new file with mode: 0644]
worst_conns.go

index a607b912477232d54b9c15c024b7f628af7f279d..9861f7ef3ee052c9aa3e664ed6fe933a8b25f840 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1233,9 +1233,9 @@ func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
        var ss []string
        slices.MakeInto(&ss, mi.Nodes)
        cl.AddDHTNodes(ss)
-       //for _, url := range mi.UrlList {
-       //T.addWebSeed(url)
-       //}
+       for _, url := range mi.UrlList {
+               T.addWebSeed(url)
+       }
        return
 }
 
index 0d8edcf4ad83a8eb93220eaf5d29bed88cfbbbb9..b10c95c03f9df47e2ed68b27a0b5e45eb4b05bfe 100644 (file)
@@ -39,6 +39,7 @@ type PeerImpl interface {
        UpdateRequests()
        WriteInterested(interested bool) bool
        Cancel(request) bool
+       // Return true if there's room for more activity.
        Request(request) bool
        ConnectionFlags() string
        Close()
@@ -211,7 +212,7 @@ func (cn *PeerConn) localAddr() net.Addr {
        return cn.conn.LocalAddr()
 }
 
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+func (cn *peer) supportsExtension(ext pp.ExtensionName) bool {
        _, ok := cn.PeerExtensionIDs[ext]
        return ok
 }
@@ -373,6 +374,7 @@ func (cn *PeerConn) post(msg pp.Message) {
        cn.tickleWriter()
 }
 
+// Returns true if there's room to write more.
 func (cn *PeerConn) write(msg pp.Message) bool {
        cn.wroteMsg(&msg)
        cn.writeBuffer.Write(msg.MustMarshalBinary())
@@ -954,7 +956,7 @@ func (cn *PeerConn) readBytes(n int64) {
 
 // Returns whether the connection could be useful to us. We're seeding and
 // they want data, we don't have metainfo and they can provide it, etc.
-func (c *PeerConn) useful() bool {
+func (c *peer) useful() bool {
        t := c.t
        if c.closed.IsSet() {
                return false
@@ -1448,7 +1450,7 @@ func (cn *peer) netGoodPiecesDirtied() int64 {
        return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
 }
 
-func (c *PeerConn) peerHasWantedPieces() bool {
+func (c *peer) peerHasWantedPieces() bool {
        return !c._pieceRequestOrder.IsEmpty()
 }
 
@@ -1541,7 +1543,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
        t.reconcileHandshakeStats(c)
 }
 
-func (c *PeerConn) peerPriority() (peerPriority, error) {
+func (c *peer) peerPriority() (peerPriority, error) {
        return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
 }
 
@@ -1549,7 +1551,7 @@ func (c *peer) remoteIp() net.IP {
        return addrIpOrNil(c.remoteAddr)
 }
 
-func (c *PeerConn) remoteIpPort() IpPort {
+func (c *peer) remoteIpPort() IpPort {
        ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
        return IpPort{ipa.IP, uint16(ipa.Port)}
 }
index c469734d1928e764f6aa52cb12af52d9151f5da4..6717dcba463db0ae0ed059a56283b26fc5dd1cdc 100644 (file)
@@ -29,17 +29,17 @@ func (me Index) iterSegments() func() (Length, bool) {
        }
 }
 
-func (me Index) Locate(e Extent, output Callback) {
+func (me Index) Locate(e Extent, output Callback) bool {
        first := sort.Search(len(me.segments), func(i int) bool {
                _e := me.segments[i]
                return _e.End() > e.Start
        })
        if first == len(me.segments) {
-               return
+               return false
        }
        e.Start -= me.segments[first].Start
        me.segments = me.segments[first:]
-       Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+       return Scan(me.iterSegments(), e, func(i int, e Extent) bool {
                return output(i+first, e)
        })
 }
index 4b3f5e8b67c4fb26bbb939df60fadb182488be89..90e77ce0d7eb21d98cc4300f5fcfdde856971c29 100644 (file)
@@ -27,12 +27,12 @@ type (
        LengthIter = func() (Length, bool)
 )
 
-func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
+func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
        i := 0
        for needle.Length != 0 {
                l, ok := haystack()
                if !ok {
-                       return
+                       return false
                }
                if needle.Start < l || needle.Start == l && l == 0 {
                        e1 := Extent{
@@ -41,7 +41,7 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
                        }
                        if e1.Length >= 0 {
                                if !callback(i, e1) {
-                                       return
+                                       return true
                                }
                                needle.Start = 0
                                needle.Length -= e1.Length
@@ -51,12 +51,13 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
                }
                i++
        }
+       return true
 }
 
 func LocaterFromLengthIter(li LengthIter) Locater {
-       return func(e Extent, c Callback) {
-               Scan(li, e, c)
+       return func(e Extent, c Callback) bool {
+               return Scan(li, e, c)
        }
 }
 
-type Locater func(Extent, Callback)
+type Locater func(Extent, Callback) bool
index c87ddc4dc2b21c93ef5932bfc83c11e6d723bdd1..bf18a441f8fa86a94cb3d6617b20fbbf798d491d 100644 (file)
@@ -8,12 +8,17 @@ import (
        "fmt"
        "io"
        "math/rand"
+       "net/http"
        "net/url"
+       "sort"
        "sync"
        "text/tabwriter"
        "time"
        "unsafe"
 
+       "github.com/anacrolix/torrent/common"
+       "github.com/anacrolix/torrent/segments"
+       "github.com/anacrolix/torrent/webseed"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -74,8 +79,9 @@ type Torrent struct {
        metainfo metainfo.MetaInfo
 
        // The info dict. nil if we don't have it (yet).
-       info  *metainfo.Info
-       files *[]*File
+       info      *metainfo.Info
+       fileIndex segments.Index
+       files     *[]*File
 
        webSeeds map[string]*peer
 
@@ -392,6 +398,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
        t.initFiles()
        t.cacheLength()
@@ -630,9 +637,11 @@ func (t *Torrent) writeStatus(w io.Writer) {
        spew.NewDefaultConfig()
        spew.Fdump(w, t.statsLocked())
 
-       conns := t.connsAsSlice()
-       slices.Sort(conns, worseConn)
-       for i, c := range conns {
+       peers := t.peersAsSlice()
+       sort.Slice(peers, func(i, j int) bool {
+               return worseConn(peers[i], peers[j])
+       })
+       for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
                c.writeStatus(w, t)
        }
@@ -849,10 +858,9 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
        })
 }
 
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
 func (t *Torrent) worstBadConn() *PeerConn {
        wcs := worseConnSlice{t.unclosedConnsAsSlice()}
        heap.Init(&wcs)
@@ -1650,7 +1658,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        defer t.cl.unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
-       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+               return worseConn(&l.peer, &r.peer)
+       })
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(wcs.Pop().(*PeerConn))
        }
@@ -1844,10 +1854,10 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
        }
 }
 
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
-       for c := range t.conns {
-               ret = append(ret, c)
-       }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+       t.iterPeers(func(p *peer) {
+               ret = append(ret, p)
+       })
        return
 }
 
@@ -1999,19 +2009,26 @@ func (t *Torrent) addWebSeed(url string) {
        if _, ok := t.webSeeds[url]; ok {
                return
        }
-       p := &peer{
-               t:                        t,
-               connString:               url,
-               outgoing:                 true,
-               network:                  "http",
-               reconciledHandshakeStats: true,
-               peerSentHaveAll:          true,
-       }
        ws := webSeed{
-               peer: p,
+               peer: peer{
+                       t:                        t,
+                       connString:               url,
+                       outgoing:                 true,
+                       network:                  "http",
+                       reconciledHandshakeStats: true,
+                       peerSentHaveAll:          true,
+                       PeerMaxRequests:          10,
+               },
+               client: webseed.Client{
+                       HttpClient: http.DefaultClient,
+                       Url:        url,
+                       FileIndex:  t.fileIndex,
+                       Info:       t.info,
+                       Events:     make(chan webseed.ClientEvent),
+               },
        }
-       p.PeerImpl = &ws
-       t.webSeeds[url] = p
+       ws.peer.PeerImpl = &ws
+       t.webSeeds[url] = &ws.peer
 
 }
 
index 5d22606d313c29803a1ddffbf482fef70c6f64d1..4db8b725c0029d94f23b724020aa8add59ff71f4 100644 (file)
@@ -2,16 +2,42 @@ package torrent
 
 import (
        "net/http"
+
+       "github.com/anacrolix/torrent/segments"
+       "github.com/anacrolix/torrent/webseed"
 )
 
+type httpRequestResult struct {
+       resp *http.Response
+       err  error
+}
+
+type requestPart struct {
+       req    *http.Request
+       e      segments.Extent
+       result chan httpRequestResult
+}
+
+type webseedRequest struct {
+       cancel func()
+}
+
 type webSeed struct {
-       peer       *peer
-       httpClient *http.Client
-       url        string
+       client webseed.Client
+       peer   peer
 }
 
+type webseedClientEvent interface{}
+
+type webseedRequestFailed struct {
+       r   request
+       err error
+}
+
+var _ PeerImpl = (*webSeed)(nil)
+
 func (ws *webSeed) PostCancel(r request) {
-       panic("implement me")
+       ws.Cancel(r)
 }
 
 func (ws *webSeed) WriteInterested(interested bool) bool {
@@ -19,11 +45,13 @@ func (ws *webSeed) WriteInterested(interested bool) bool {
 }
 
 func (ws *webSeed) Cancel(r request) bool {
-       panic("implement me")
+       //panic("implement me")
+       return true
 }
 
 func (ws *webSeed) Request(r request) bool {
-       panic("implement me")
+       ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
+       return true
 }
 
 func (ws *webSeed) ConnectionFlags() string {
index eba6a90ccd5fdec3228c1fea471487f1a9c7d347..7d7ce24d602469d255dc8b0746e4c18168e0cdf6 100644 (file)
 package webseed
 
 import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
        "net/http"
 
-       pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/segments"
 )
 
-type RequestSpec = pp.RequestSpec
+type RequestSpec = segments.Extent
+
+type httpRequestResult struct {
+       resp *http.Response
+       err  error
+}
+
+type requestPart struct {
+       req    *http.Request
+       e      segments.Extent
+       result chan httpRequestResult
+}
+
+type request struct {
+       cancel func()
+}
 
 type Client struct {
        HttpClient *http.Client
        Url        string
+       FileIndex  segments.Index
+       Info       *metainfo.Info
 
        requests map[RequestSpec]request
+       Events   chan ClientEvent
 }
 
-type request struct {
-       cancel func()
+type ClientEvent struct {
+       RequestSpec RequestSpec
+       Bytes       []byte
+       Err         error
+}
+
+func (ws *Client) Cancel(r RequestSpec) {
+       ws.requests[r].cancel()
+}
+
+func (ws *Client) Request(r RequestSpec) {
+       ctx, cancel := context.WithCancel(context.Background())
+       var requestParts []requestPart
+       if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
+               req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
+               if err != nil {
+                       panic(err)
+               }
+               req = req.WithContext(ctx)
+               part := requestPart{
+                       req:    req,
+                       result: make(chan httpRequestResult, 1),
+                       e:      e,
+               }
+               go func() {
+                       resp, err := ws.HttpClient.Do(req)
+                       part.result <- httpRequestResult{
+                               resp: resp,
+                               err:  err,
+                       }
+               }()
+               requestParts = append(requestParts, part)
+               return true
+       }) {
+               panic("request out of file bounds")
+       }
+       if ws.requests == nil {
+               ws.requests = make(map[RequestSpec]request)
+       }
+       ws.requests[r] = request{cancel}
+       go func() {
+               b, err := readRequestPartResponses(requestParts)
+               ws.Events <- ClientEvent{
+                       RequestSpec: r,
+                       Bytes:       b,
+                       Err:         err,
+               }
+       }()
+}
+
+func recvPartResult(buf io.Writer, part requestPart) error {
+       result := <-part.result
+       if result.err != nil {
+               return result.err
+       }
+       defer result.resp.Body.Close()
+       if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent {
+               return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode)
+       }
+       copied, err := io.Copy(buf, result.resp.Body)
+       if err != nil {
+               return err
+       }
+       if copied != part.e.Length {
+               return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
+       }
+       return nil
 }
 
-func (cl *Client) Request(r RequestSpec) {
-       //cl.HttpClient.Do()
+func readRequestPartResponses(parts []requestPart) ([]byte, error) {
+       var buf bytes.Buffer
+       for _, part := range parts {
+               err := recvPartResult(&buf, part)
+               if err != nil {
+                       return buf.Bytes(), err
+               }
+       }
+       return buf.Bytes(), nil
 }
diff --git a/webseed/misc.go b/webseed/misc.go
new file mode 100644 (file)
index 0000000..140bc23
--- /dev/null
@@ -0,0 +1,26 @@
+package webseed
+
+import (
+       "fmt"
+       "net/http"
+       "path"
+       "strings"
+
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+// Creates a request per BEP 19.
+func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) {
+       fileInfo := info.UpvertedFiles()[fileIndex]
+       if strings.HasSuffix(url, "/") {
+               url += path.Join(append([]string{info.Name}, fileInfo.Path...)...)
+       }
+       req, err := http.NewRequest(http.MethodGet, url, nil)
+       if err != nil {
+               return nil, err
+       }
+       if offset != 0 || length != fileInfo.Length {
+               req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
+       }
+       return req, nil
+}
index 171885054de7af12baa3c80ce0e80a0f9fcb693d..36ac6bc5e6584fcdbcf98427a18ce498e2cf1899 100644 (file)
@@ -8,7 +8,7 @@ import (
        "github.com/anacrolix/multiless"
 )
 
-func worseConn(l, r *PeerConn) bool {
+func worseConn(l, r *peer) bool {
        less, ok := multiless.New().Bool(
                l.useful(), r.useful()).CmpInt64(
                l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64(
@@ -45,7 +45,7 @@ func (me worseConnSlice) Len() int {
 }
 
 func (me worseConnSlice) Less(i, j int) bool {
-       return worseConn(me.conns[i], me.conns[j])
+       return worseConn(&me.conns[i].peer, &me.conns[j].peer)
 }
 
 func (me *worseConnSlice) Pop() interface{} {