if _, ok := t.webSeeds[url]; ok {
return
}
+ const maxRequests = 10
ws := webSeed{
peer: peer{
t: t,
network: "http",
reconciledHandshakeStats: true,
peerSentHaveAll: true,
- PeerMaxRequests: 10,
+ PeerMaxRequests: maxRequests,
},
client: webseed.Client{
HttpClient: http.DefaultClient,
Url: url,
FileIndex: t.fileIndex,
Info: t.info,
- Events: make(chan webseed.ClientEvent),
},
+ requests: make(map[request]webseed.Request, maxRequests),
}
- go ws.eventProcessor()
ws.peer.PeerImpl = &ws
t.webSeeds[url] = &ws.peer
}
}
type webSeed struct {
- client webseed.Client
- peer peer
-}
-
-type webseedClientEvent interface{}
-
-type webseedRequestFailed struct {
- r request
- err error
+ client webseed.Client
+ requests map[request]webseed.Request
+ peer peer
}
var _ PeerImpl = (*webSeed)(nil)
}
func (ws *webSeed) Cancel(r request) bool {
- //panic("implement me")
+ ws.requests[r].Cancel()
return true
}
+func (ws *webSeed) intoSpec(r request) webseed.RequestSpec {
+ return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+}
+
func (ws *webSeed) Request(r request) bool {
- ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
+ webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+ ws.requests[r] = webseedRequest
+ go ws.requestResultHandler(r, webseedRequest)
return true
}
func (ws *webSeed) Close() {}
-func (ws *webSeed) eventProcessor() {
- for ev := range ws.client.Events {
- if ev.Err != nil {
- panic(ev)
- }
- r, ok := ws.peer.t.offsetRequest(ev.RequestSpec.Start)
- if !ok {
- panic(ev)
- }
- ws.peer.t.cl.lock()
- err := ws.peer.receiveChunk(&pp.Message{
- Type: pp.Piece,
- Index: r.Index,
- Begin: r.Begin,
- Piece: ev.Bytes,
- })
- ws.peer.t.cl.unlock()
- if err != nil {
- panic(err)
- }
+func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
+ webseedRequestResult := <-webseedRequest.Result
+ ws.peer.t.cl.lock()
+ err := ws.peer.receiveChunk(&pp.Message{
+ Type: pp.Piece,
+ Index: r.Index,
+ Begin: r.Begin,
+ Piece: webseedRequestResult.Bytes,
+ })
+ ws.peer.t.cl.unlock()
+ if err != nil {
+ panic(err)
}
}
type RequestSpec = segments.Extent
-type httpRequestResult struct {
+type requestPartResult struct {
resp *http.Response
err error
}
type requestPart struct {
req *http.Request
e segments.Extent
- result chan httpRequestResult
+ result chan requestPartResult
}
-type request struct {
+type Request struct {
cancel func()
+ Result chan RequestResult
+}
+
+func (r Request) Cancel() {
+ r.cancel()
}
type Client struct {
Url string
FileIndex segments.Index
Info *metainfo.Info
-
- requests map[RequestSpec]request
- Events chan ClientEvent
-}
-
-type ClientEvent struct {
- RequestSpec RequestSpec
- Bytes []byte
- Err error
}
-func (ws *Client) Cancel(r RequestSpec) {
- ws.requests[r].cancel()
+type RequestResult struct {
+ Bytes []byte
+ Err error
}
-func (ws *Client) Request(r RequestSpec) {
+func (ws *Client) NewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
req = req.WithContext(ctx)
part := requestPart{
req: req,
- result: make(chan httpRequestResult, 1),
+ result: make(chan requestPartResult, 1),
e: e,
}
go func() {
resp, err := ws.HttpClient.Do(req)
- part.result <- httpRequestResult{
+ part.result <- requestPartResult{
resp: resp,
err: err,
}
}) {
panic("request out of file bounds")
}
- if ws.requests == nil {
- ws.requests = make(map[RequestSpec]request)
+ req := Request{
+ cancel: cancel,
+ Result: make(chan RequestResult, 1),
}
- ws.requests[r] = request{cancel}
go func() {
b, err := readRequestPartResponses(requestParts)
- ws.Events <- ClientEvent{
- RequestSpec: r,
- Bytes: b,
- Err: err,
+ req.Result <- RequestResult{
+ Bytes: b,
+ Err: err,
}
}()
+ return req
}
func recvPartResult(buf io.Writer, part requestPart) error {