From 054ea59e6d9f3a44811f0da6bb5dc5e1ac692369 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 2 Jun 2020 13:54:26 +1000 Subject: [PATCH] Remove events from webseed Manage this stuff inside the webseed peer instead. --- torrent.go | 6 +++--- web_seed.go | 54 ++++++++++++++++++++--------------------------- webseed/client.go | 45 ++++++++++++++++++--------------------- 3 files changed, 47 insertions(+), 58 deletions(-) diff --git a/torrent.go b/torrent.go index fcbf7ae9..c785575c 100644 --- a/torrent.go +++ b/torrent.go @@ -2009,6 +2009,7 @@ func (t *Torrent) addWebSeed(url string) { if _, ok := t.webSeeds[url]; ok { return } + const maxRequests = 10 ws := webSeed{ peer: peer{ t: t, @@ -2017,17 +2018,16 @@ func (t *Torrent) addWebSeed(url string) { network: "http", reconciledHandshakeStats: true, peerSentHaveAll: true, - PeerMaxRequests: 10, + PeerMaxRequests: maxRequests, }, client: webseed.Client{ HttpClient: http.DefaultClient, Url: url, FileIndex: t.fileIndex, Info: t.info, - Events: make(chan webseed.ClientEvent), }, + requests: make(map[request]webseed.Request, maxRequests), } - go ws.eventProcessor() ws.peer.PeerImpl = &ws t.webSeeds[url] = &ws.peer } diff --git a/web_seed.go b/web_seed.go index 6e88d34b..13acc41d 100644 --- a/web_seed.go +++ b/web_seed.go @@ -24,15 +24,9 @@ type webseedRequest struct { } type webSeed struct { - client webseed.Client - peer peer -} - -type webseedClientEvent interface{} - -type webseedRequestFailed struct { - r request - err error + client webseed.Client + requests map[request]webseed.Request + peer peer } var _ PeerImpl = (*webSeed)(nil) @@ -46,12 +40,18 @@ func (ws *webSeed) WriteInterested(interested bool) bool { } func (ws *webSeed) Cancel(r request) bool { - //panic("implement me") + ws.requests[r].Cancel() return true } +func (ws *webSeed) intoSpec(r request) webseed.RequestSpec { + return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} +} + func (ws *webSeed) Request(r request) bool { - ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}) + webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) + ws.requests[r] = webseedRequest + go ws.requestResultHandler(r, webseedRequest) return true } @@ -68,25 +68,17 @@ func (ws *webSeed) UpdateRequests() { func (ws *webSeed) Close() {} -func (ws *webSeed) eventProcessor() { - for ev := range ws.client.Events { - if ev.Err != nil { - panic(ev) - } - r, ok := ws.peer.t.offsetRequest(ev.RequestSpec.Start) - if !ok { - panic(ev) - } - ws.peer.t.cl.lock() - err := ws.peer.receiveChunk(&pp.Message{ - Type: pp.Piece, - Index: r.Index, - Begin: r.Begin, - Piece: ev.Bytes, - }) - ws.peer.t.cl.unlock() - if err != nil { - panic(err) - } +func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) { + webseedRequestResult := <-webseedRequest.Result + ws.peer.t.cl.lock() + err := ws.peer.receiveChunk(&pp.Message{ + Type: pp.Piece, + Index: r.Index, + Begin: r.Begin, + Piece: webseedRequestResult.Bytes, + }) + ws.peer.t.cl.unlock() + if err != nil { + panic(err) } } diff --git a/webseed/client.go b/webseed/client.go index 7d7ce24d..0b98b931 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -13,7 +13,7 @@ import ( type RequestSpec = segments.Extent -type httpRequestResult struct { +type requestPartResult struct { resp *http.Response err error } @@ -21,11 +21,16 @@ type httpRequestResult struct { type requestPart struct { req *http.Request e segments.Extent - result chan httpRequestResult + result chan requestPartResult } -type request struct { +type Request struct { cancel func() + Result chan RequestResult +} + +func (r Request) Cancel() { + r.cancel() } type Client struct { @@ -33,22 +38,14 @@ type Client struct { Url string FileIndex segments.Index Info *metainfo.Info - - requests map[RequestSpec]request - Events chan ClientEvent -} - -type ClientEvent struct { - RequestSpec RequestSpec - Bytes []byte - Err error } -func (ws *Client) Cancel(r RequestSpec) { - ws.requests[r].cancel() +type RequestResult struct { + Bytes []byte + Err error } -func (ws *Client) Request(r RequestSpec) { +func (ws *Client) NewRequest(r RequestSpec) Request { ctx, cancel := context.WithCancel(context.Background()) var requestParts []requestPart if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool { @@ -59,12 +56,12 @@ func (ws *Client) Request(r RequestSpec) { req = req.WithContext(ctx) part := requestPart{ req: req, - result: make(chan httpRequestResult, 1), + result: make(chan requestPartResult, 1), e: e, } go func() { resp, err := ws.HttpClient.Do(req) - part.result <- httpRequestResult{ + part.result <- requestPartResult{ resp: resp, err: err, } @@ -74,18 +71,18 @@ func (ws *Client) Request(r RequestSpec) { }) { panic("request out of file bounds") } - if ws.requests == nil { - ws.requests = make(map[RequestSpec]request) + req := Request{ + cancel: cancel, + Result: make(chan RequestResult, 1), } - ws.requests[r] = request{cancel} go func() { b, err := readRequestPartResponses(requestParts) - ws.Events <- ClientEvent{ - RequestSpec: r, - Bytes: b, - Err: err, + req.Result <- RequestResult{ + Bytes: b, + Err: err, } }() + return req } func recvPartResult(buf io.Writer, part requestPart) error { -- 2.44.0