From ff53ab860c6e0b45830f6fbb4c95ae49af18e1e3 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 1 Jun 2020 18:25:45 +1000 Subject: [PATCH] Further progress on webseeding --- client.go | 6 +-- peerconn.go | 12 +++-- segments/index.go | 6 +-- segments/segments.go | 13 +++--- torrent.go | 67 +++++++++++++++++---------- web_seed.go | 40 +++++++++++++--- webseed/client.go | 107 ++++++++++++++++++++++++++++++++++++++++--- webseed/misc.go | 26 +++++++++++ worst_conns.go | 4 +- 9 files changed, 225 insertions(+), 56 deletions(-) create mode 100644 webseed/misc.go diff --git a/client.go b/client.go index a607b912..9861f7ef 100644 --- a/client.go +++ b/client.go @@ -1233,9 +1233,9 @@ func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) { var ss []string slices.MakeInto(&ss, mi.Nodes) cl.AddDHTNodes(ss) - //for _, url := range mi.UrlList { - //T.addWebSeed(url) - //} + for _, url := range mi.UrlList { + T.addWebSeed(url) + } return } diff --git a/peerconn.go b/peerconn.go index 0d8edcf4..b10c95c0 100644 --- a/peerconn.go +++ b/peerconn.go @@ -39,6 +39,7 @@ type PeerImpl interface { UpdateRequests() WriteInterested(interested bool) bool Cancel(request) bool + // Return true if there's room for more activity. Request(request) bool ConnectionFlags() string Close() @@ -211,7 +212,7 @@ func (cn *PeerConn) localAddr() net.Addr { return cn.conn.LocalAddr() } -func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool { +func (cn *peer) supportsExtension(ext pp.ExtensionName) bool { _, ok := cn.PeerExtensionIDs[ext] return ok } @@ -373,6 +374,7 @@ func (cn *PeerConn) post(msg pp.Message) { cn.tickleWriter() } +// Returns true if there's room to write more. func (cn *PeerConn) write(msg pp.Message) bool { cn.wroteMsg(&msg) cn.writeBuffer.Write(msg.MustMarshalBinary()) @@ -954,7 +956,7 @@ func (cn *PeerConn) readBytes(n int64) { // Returns whether the connection could be useful to us. We're seeding and // they want data, we don't have metainfo and they can provide it, etc. -func (c *PeerConn) useful() bool { +func (c *peer) useful() bool { t := c.t if c.closed.IsSet() { return false @@ -1448,7 +1450,7 @@ func (cn *peer) netGoodPiecesDirtied() int64 { return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64() } -func (c *PeerConn) peerHasWantedPieces() bool { +func (c *peer) peerHasWantedPieces() bool { return !c._pieceRequestOrder.IsEmpty() } @@ -1541,7 +1543,7 @@ func (c *PeerConn) setTorrent(t *Torrent) { t.reconcileHandshakeStats(c) } -func (c *PeerConn) peerPriority() (peerPriority, error) { +func (c *peer) peerPriority() (peerPriority, error) { return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) } @@ -1549,7 +1551,7 @@ func (c *peer) remoteIp() net.IP { return addrIpOrNil(c.remoteAddr) } -func (c *PeerConn) remoteIpPort() IpPort { +func (c *peer) remoteIpPort() IpPort { ipa, _ := tryIpPortFromNetAddr(c.remoteAddr) return IpPort{ipa.IP, uint16(ipa.Port)} } diff --git a/segments/index.go b/segments/index.go index c469734d..6717dcba 100644 --- a/segments/index.go +++ b/segments/index.go @@ -29,17 +29,17 @@ func (me Index) iterSegments() func() (Length, bool) { } } -func (me Index) Locate(e Extent, output Callback) { +func (me Index) Locate(e Extent, output Callback) bool { first := sort.Search(len(me.segments), func(i int) bool { _e := me.segments[i] return _e.End() > e.Start }) if first == len(me.segments) { - return + return false } e.Start -= me.segments[first].Start me.segments = me.segments[first:] - Scan(me.iterSegments(), e, func(i int, e Extent) bool { + return Scan(me.iterSegments(), e, func(i int, e Extent) bool { return output(i+first, e) }) } diff --git a/segments/segments.go b/segments/segments.go index 4b3f5e8b..90e77ce0 100644 --- a/segments/segments.go +++ b/segments/segments.go @@ -27,12 +27,12 @@ type ( LengthIter = func() (Length, bool) ) -func Scan(haystack func() (Length, bool), needle Extent, callback Callback) { +func Scan(haystack LengthIter, needle Extent, callback Callback) bool { i := 0 for needle.Length != 0 { l, ok := haystack() if !ok { - return + return false } if needle.Start < l || needle.Start == l && l == 0 { e1 := Extent{ @@ -41,7 +41,7 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) { } if e1.Length >= 0 { if !callback(i, e1) { - return + return true } needle.Start = 0 needle.Length -= e1.Length @@ -51,12 +51,13 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) { } i++ } + return true } func LocaterFromLengthIter(li LengthIter) Locater { - return func(e Extent, c Callback) { - Scan(li, e, c) + return func(e Extent, c Callback) bool { + return Scan(li, e, c) } } -type Locater func(Extent, Callback) +type Locater func(Extent, Callback) bool diff --git a/torrent.go b/torrent.go index c87ddc4d..bf18a441 100644 --- a/torrent.go +++ b/torrent.go @@ -8,12 +8,17 @@ import ( "fmt" "io" "math/rand" + "net/http" "net/url" + "sort" "sync" "text/tabwriter" "time" "unsafe" + "github.com/anacrolix/torrent/common" + "github.com/anacrolix/torrent/segments" + "github.com/anacrolix/torrent/webseed" "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" @@ -74,8 +79,9 @@ type Torrent struct { metainfo metainfo.MetaInfo // The info dict. nil if we don't have it (yet). - info *metainfo.Info - files *[]*File + info *metainfo.Info + fileIndex segments.Index + files *[]*File webSeeds map[string]*peer @@ -392,6 +398,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { t.nameMu.Lock() t.info = info t.nameMu.Unlock() + t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles())) t.displayName = "" // Save a few bytes lol. t.initFiles() t.cacheLength() @@ -630,9 +637,11 @@ func (t *Torrent) writeStatus(w io.Writer) { spew.NewDefaultConfig() spew.Fdump(w, t.statsLocked()) - conns := t.connsAsSlice() - slices.Sort(conns, worseConn) - for i, c := range conns { + peers := t.peersAsSlice() + sort.Slice(peers, func(i, j int) bool { + return worseConn(peers[i], peers[j]) + }) + for i, c := range peers { fmt.Fprintf(w, "%2d. ", i+1) c.writeStatus(w, t) } @@ -849,10 +858,9 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool { }) } -// The worst connection is one that hasn't been sent, or sent anything useful -// for the longest. A bad connection is one that usually sends us unwanted -// pieces, or has been in worser half of the established connections for more -// than a minute. +// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad +// connection is one that usually sends us unwanted pieces, or has been in worser half of the +// established connections for more than a minute. func (t *Torrent) worstBadConn() *PeerConn { wcs := worseConnSlice{t.unclosedConnsAsSlice()} heap.Init(&wcs) @@ -1650,7 +1658,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { defer t.cl.unlock() oldMax = t.maxEstablishedConns t.maxEstablishedConns = max - wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn) + wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool { + return worseConn(&l.peer, &r.peer) + }) for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { t.dropConnection(wcs.Pop().(*PeerConn)) } @@ -1844,10 +1854,10 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) { } } -func (t *Torrent) connsAsSlice() (ret []*PeerConn) { - for c := range t.conns { - ret = append(ret, c) - } +func (t *Torrent) peersAsSlice() (ret []*peer) { + t.iterPeers(func(p *peer) { + ret = append(ret, p) + }) return } @@ -1999,19 +2009,26 @@ func (t *Torrent) addWebSeed(url string) { if _, ok := t.webSeeds[url]; ok { return } - p := &peer{ - t: t, - connString: url, - outgoing: true, - network: "http", - reconciledHandshakeStats: true, - peerSentHaveAll: true, - } ws := webSeed{ - peer: p, + peer: peer{ + t: t, + connString: url, + outgoing: true, + network: "http", + reconciledHandshakeStats: true, + peerSentHaveAll: true, + PeerMaxRequests: 10, + }, + client: webseed.Client{ + HttpClient: http.DefaultClient, + Url: url, + FileIndex: t.fileIndex, + Info: t.info, + Events: make(chan webseed.ClientEvent), + }, } - p.PeerImpl = &ws - t.webSeeds[url] = p + ws.peer.PeerImpl = &ws + t.webSeeds[url] = &ws.peer } diff --git a/web_seed.go b/web_seed.go index 5d22606d..4db8b725 100644 --- a/web_seed.go +++ b/web_seed.go @@ -2,16 +2,42 @@ package torrent import ( "net/http" + + "github.com/anacrolix/torrent/segments" + "github.com/anacrolix/torrent/webseed" ) +type httpRequestResult struct { + resp *http.Response + err error +} + +type requestPart struct { + req *http.Request + e segments.Extent + result chan httpRequestResult +} + +type webseedRequest struct { + cancel func() +} + type webSeed struct { - peer *peer - httpClient *http.Client - url string + client webseed.Client + peer peer } +type webseedClientEvent interface{} + +type webseedRequestFailed struct { + r request + err error +} + +var _ PeerImpl = (*webSeed)(nil) + func (ws *webSeed) PostCancel(r request) { - panic("implement me") + ws.Cancel(r) } func (ws *webSeed) WriteInterested(interested bool) bool { @@ -19,11 +45,13 @@ func (ws *webSeed) WriteInterested(interested bool) bool { } func (ws *webSeed) Cancel(r request) bool { - panic("implement me") + //panic("implement me") + return true } func (ws *webSeed) Request(r request) bool { - panic("implement me") + ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}) + return true } func (ws *webSeed) ConnectionFlags() string { diff --git a/webseed/client.go b/webseed/client.go index eba6a90c..7d7ce24d 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -1,24 +1,119 @@ package webseed import ( + "bytes" + "context" + "fmt" + "io" "net/http" - pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/segments" ) -type RequestSpec = pp.RequestSpec +type RequestSpec = segments.Extent + +type httpRequestResult struct { + resp *http.Response + err error +} + +type requestPart struct { + req *http.Request + e segments.Extent + result chan httpRequestResult +} + +type request struct { + cancel func() +} type Client struct { HttpClient *http.Client Url string + FileIndex segments.Index + Info *metainfo.Info requests map[RequestSpec]request + Events chan ClientEvent } -type request struct { - cancel func() +type ClientEvent struct { + RequestSpec RequestSpec + Bytes []byte + Err error +} + +func (ws *Client) Cancel(r RequestSpec) { + ws.requests[r].cancel() +} + +func (ws *Client) Request(r RequestSpec) { + ctx, cancel := context.WithCancel(context.Background()) + var requestParts []requestPart + if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool { + req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length) + if err != nil { + panic(err) + } + req = req.WithContext(ctx) + part := requestPart{ + req: req, + result: make(chan httpRequestResult, 1), + e: e, + } + go func() { + resp, err := ws.HttpClient.Do(req) + part.result <- httpRequestResult{ + resp: resp, + err: err, + } + }() + requestParts = append(requestParts, part) + return true + }) { + panic("request out of file bounds") + } + if ws.requests == nil { + ws.requests = make(map[RequestSpec]request) + } + ws.requests[r] = request{cancel} + go func() { + b, err := readRequestPartResponses(requestParts) + ws.Events <- ClientEvent{ + RequestSpec: r, + Bytes: b, + Err: err, + } + }() +} + +func recvPartResult(buf io.Writer, part requestPart) error { + result := <-part.result + if result.err != nil { + return result.err + } + defer result.resp.Body.Close() + if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent { + return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode) + } + copied, err := io.Copy(buf, result.resp.Body) + if err != nil { + return err + } + if copied != part.e.Length { + return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length) + } + return nil } -func (cl *Client) Request(r RequestSpec) { - //cl.HttpClient.Do() +func readRequestPartResponses(parts []requestPart) ([]byte, error) { + var buf bytes.Buffer + for _, part := range parts { + err := recvPartResult(&buf, part) + if err != nil { + return buf.Bytes(), err + } + } + return buf.Bytes(), nil } diff --git a/webseed/misc.go b/webseed/misc.go new file mode 100644 index 00000000..140bc23b --- /dev/null +++ b/webseed/misc.go @@ -0,0 +1,26 @@ +package webseed + +import ( + "fmt" + "net/http" + "path" + "strings" + + "github.com/anacrolix/torrent/metainfo" +) + +// Creates a request per BEP 19. +func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) { + fileInfo := info.UpvertedFiles()[fileIndex] + if strings.HasSuffix(url, "/") { + url += path.Join(append([]string{info.Name}, fileInfo.Path...)...) + } + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if offset != 0 || length != fileInfo.Length { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) + } + return req, nil +} diff --git a/worst_conns.go b/worst_conns.go index 17188505..36ac6bc5 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -8,7 +8,7 @@ import ( "github.com/anacrolix/multiless" ) -func worseConn(l, r *PeerConn) bool { +func worseConn(l, r *peer) bool { less, ok := multiless.New().Bool( l.useful(), r.useful()).CmpInt64( l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64( @@ -45,7 +45,7 @@ func (me worseConnSlice) Len() int { } func (me worseConnSlice) Less(i, j int) bool { - return worseConn(me.conns[i], me.conns[j]) + return worseConn(&me.conns[i].peer, &me.conns[j].peer) } func (me *worseConnSlice) Pop() interface{} { -- 2.48.1