]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Remove events from webseed
authorMatt Joiner <anacrolix@gmail.com>
Tue, 2 Jun 2020 03:54:26 +0000 (13:54 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 2 Jun 2020 03:54:26 +0000 (13:54 +1000)
Manage this stuff inside the webseed peer instead.

torrent.go
web_seed.go
webseed/client.go

index fcbf7ae9798029024380a90a416d7632f9a25d03..c785575c47181b7e240b1b3b4a0ef23499353d27 100644 (file)
@@ -2009,6 +2009,7 @@ func (t *Torrent) addWebSeed(url string) {
        if _, ok := t.webSeeds[url]; ok {
                return
        }
+       const maxRequests = 10
        ws := webSeed{
                peer: peer{
                        t:                        t,
@@ -2017,17 +2018,16 @@ func (t *Torrent) addWebSeed(url string) {
                        network:                  "http",
                        reconciledHandshakeStats: true,
                        peerSentHaveAll:          true,
-                       PeerMaxRequests:          10,
+                       PeerMaxRequests:          maxRequests,
                },
                client: webseed.Client{
                        HttpClient: http.DefaultClient,
                        Url:        url,
                        FileIndex:  t.fileIndex,
                        Info:       t.info,
-                       Events:     make(chan webseed.ClientEvent),
                },
+               requests: make(map[request]webseed.Request, maxRequests),
        }
-       go ws.eventProcessor()
        ws.peer.PeerImpl = &ws
        t.webSeeds[url] = &ws.peer
 }
index 6e88d34b0626e5a883f7931caea7f718b25bcfec..13acc41dd0d76bd636f3724523679c1e8b1d0700 100644 (file)
@@ -24,15 +24,9 @@ type webseedRequest struct {
 }
 
 type webSeed struct {
-       client webseed.Client
-       peer   peer
-}
-
-type webseedClientEvent interface{}
-
-type webseedRequestFailed struct {
-       r   request
-       err error
+       client   webseed.Client
+       requests map[request]webseed.Request
+       peer     peer
 }
 
 var _ PeerImpl = (*webSeed)(nil)
@@ -46,12 +40,18 @@ func (ws *webSeed) WriteInterested(interested bool) bool {
 }
 
 func (ws *webSeed) Cancel(r request) bool {
-       //panic("implement me")
+       ws.requests[r].Cancel()
        return true
 }
 
+func (ws *webSeed) intoSpec(r request) webseed.RequestSpec {
+       return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+}
+
 func (ws *webSeed) Request(r request) bool {
-       ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
+       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+       ws.requests[r] = webseedRequest
+       go ws.requestResultHandler(r, webseedRequest)
        return true
 }
 
@@ -68,25 +68,17 @@ func (ws *webSeed) UpdateRequests() {
 
 func (ws *webSeed) Close() {}
 
-func (ws *webSeed) eventProcessor() {
-       for ev := range ws.client.Events {
-               if ev.Err != nil {
-                       panic(ev)
-               }
-               r, ok := ws.peer.t.offsetRequest(ev.RequestSpec.Start)
-               if !ok {
-                       panic(ev)
-               }
-               ws.peer.t.cl.lock()
-               err := ws.peer.receiveChunk(&pp.Message{
-                       Type:  pp.Piece,
-                       Index: r.Index,
-                       Begin: r.Begin,
-                       Piece: ev.Bytes,
-               })
-               ws.peer.t.cl.unlock()
-               if err != nil {
-                       panic(err)
-               }
+func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
+       webseedRequestResult := <-webseedRequest.Result
+       ws.peer.t.cl.lock()
+       err := ws.peer.receiveChunk(&pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: webseedRequestResult.Bytes,
+       })
+       ws.peer.t.cl.unlock()
+       if err != nil {
+               panic(err)
        }
 }
index 7d7ce24d602469d255dc8b0746e4c18168e0cdf6..0b98b931d647abfffd21e1c49b6d7043589ac2b1 100644 (file)
@@ -13,7 +13,7 @@ import (
 
 type RequestSpec = segments.Extent
 
-type httpRequestResult struct {
+type requestPartResult struct {
        resp *http.Response
        err  error
 }
@@ -21,11 +21,16 @@ type httpRequestResult struct {
 type requestPart struct {
        req    *http.Request
        e      segments.Extent
-       result chan httpRequestResult
+       result chan requestPartResult
 }
 
-type request struct {
+type Request struct {
        cancel func()
+       Result chan RequestResult
+}
+
+func (r Request) Cancel() {
+       r.cancel()
 }
 
 type Client struct {
@@ -33,22 +38,14 @@ type Client struct {
        Url        string
        FileIndex  segments.Index
        Info       *metainfo.Info
-
-       requests map[RequestSpec]request
-       Events   chan ClientEvent
-}
-
-type ClientEvent struct {
-       RequestSpec RequestSpec
-       Bytes       []byte
-       Err         error
 }
 
-func (ws *Client) Cancel(r RequestSpec) {
-       ws.requests[r].cancel()
+type RequestResult struct {
+       Bytes []byte
+       Err   error
 }
 
-func (ws *Client) Request(r RequestSpec) {
+func (ws *Client) NewRequest(r RequestSpec) Request {
        ctx, cancel := context.WithCancel(context.Background())
        var requestParts []requestPart
        if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
@@ -59,12 +56,12 @@ func (ws *Client) Request(r RequestSpec) {
                req = req.WithContext(ctx)
                part := requestPart{
                        req:    req,
-                       result: make(chan httpRequestResult, 1),
+                       result: make(chan requestPartResult, 1),
                        e:      e,
                }
                go func() {
                        resp, err := ws.HttpClient.Do(req)
-                       part.result <- httpRequestResult{
+                       part.result <- requestPartResult{
                                resp: resp,
                                err:  err,
                        }
@@ -74,18 +71,18 @@ func (ws *Client) Request(r RequestSpec) {
        }) {
                panic("request out of file bounds")
        }
-       if ws.requests == nil {
-               ws.requests = make(map[RequestSpec]request)
+       req := Request{
+               cancel: cancel,
+               Result: make(chan RequestResult, 1),
        }
-       ws.requests[r] = request{cancel}
        go func() {
                b, err := readRequestPartResponses(requestParts)
-               ws.Events <- ClientEvent{
-                       RequestSpec: r,
-                       Bytes:       b,
-                       Err:         err,
+               req.Result <- RequestResult{
+                       Bytes: b,
+                       Err:   err,
                }
        }()
+       return req
 }
 
 func recvPartResult(buf io.Writer, part requestPart) error {