]> Sergey Matveev's repositories - btrtrc.git/blob - webseed/client.go
Do webseed request parts sequentially
[btrtrc.git] / webseed / client.go
1 package webseed
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "io"
8         "log"
9         "net/http"
10         "strings"
11
12         "github.com/RoaringBitmap/roaring"
13         "github.com/anacrolix/torrent/common"
14         "github.com/anacrolix/torrent/metainfo"
15         "github.com/anacrolix/torrent/segments"
16 )
17
18 type RequestSpec = segments.Extent
19
20 type requestPartResult struct {
21         resp *http.Response
22         err  error
23 }
24
25 type requestPart struct {
26         req    *http.Request
27         e      segments.Extent
28         result chan requestPartResult
29         start  func()
30 }
31
32 type Request struct {
33         cancel func()
34         Result chan RequestResult
35 }
36
37 func (r Request) Cancel() {
38         r.cancel()
39 }
40
41 type Client struct {
42         HttpClient *http.Client
43         Url        string
44         fileIndex  segments.Index
45         info       *metainfo.Info
46         // The pieces we can request with the Url. We're more likely to ban/block at the file-level
47         // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
48         // level. We can map our file-level adjustments to the pieces here. This probably need to be
49         // private in the future, if Client ever starts removing pieces.
50         Pieces roaring.Bitmap
51 }
52
53 func (me *Client) SetInfo(info *metainfo.Info) {
54         if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
55                 // In my experience, this is a non-conforming webseed. For example the
56                 // http://ia600500.us.archive.org/1/items URLs in archive.org torrents.
57                 return
58         }
59         me.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
60         me.info = info
61         me.Pieces.AddRange(0, uint64(info.NumPieces()))
62 }
63
64 type RequestResult struct {
65         Bytes []byte
66         Err   error
67 }
68
69 func (ws *Client) NewRequest(r RequestSpec) Request {
70         ctx, cancel := context.WithCancel(context.Background())
71         var requestParts []requestPart
72         if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
73                 req, err := NewRequest(ws.Url, i, ws.info, e.Start, e.Length)
74                 if err != nil {
75                         panic(err)
76                 }
77                 req = req.WithContext(ctx)
78                 part := requestPart{
79                         req:    req,
80                         result: make(chan requestPartResult, 1),
81                         e:      e,
82                 }
83                 part.start = func() {
84                         go func() {
85                                 resp, err := ws.HttpClient.Do(req)
86                                 part.result <- requestPartResult{
87                                         resp: resp,
88                                         err:  err,
89                                 }
90                         }()
91                 }
92                 requestParts = append(requestParts, part)
93                 return true
94         }) {
95                 panic("request out of file bounds")
96         }
97         req := Request{
98                 cancel: cancel,
99                 Result: make(chan RequestResult, 1),
100         }
101         go func() {
102                 b, err := readRequestPartResponses(ctx, requestParts)
103                 req.Result <- RequestResult{
104                         Bytes: b,
105                         Err:   err,
106                 }
107         }()
108         return req
109 }
110
111 type ErrBadResponse struct {
112         Msg      string
113         Response *http.Response
114 }
115
116 func (me ErrBadResponse) Error() string {
117         return me.Msg
118 }
119
120 func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
121         result := <-part.result
122         // Make sure there's no further results coming, it should be a one-shot channel.
123         close(part.result)
124         if result.err != nil {
125                 return result.err
126         }
127         defer result.resp.Body.Close()
128         if ctx.Err() != nil {
129                 return ctx.Err()
130         }
131         switch result.resp.StatusCode {
132         case http.StatusPartialContent:
133                 copied, err := io.Copy(buf, result.resp.Body)
134                 if err != nil {
135                         return err
136                 }
137                 if copied != part.e.Length {
138                         return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
139                 }
140                 return nil
141         case http.StatusOK:
142                 // This number is based on
143                 // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
144                 // archive.org might be using a webserver implementation that refuses to do partial
145                 // responses to small files.
146                 if part.e.Start < 48<<10 {
147                         if part.e.Start != 0 {
148                                 log.Printf("resp status ok but requested range [url=%q, range=%q]",
149                                         part.req.URL,
150                                         part.req.Header.Get("Range"))
151                         }
152                         // Instead of discarding, we could try receiving all the chunks present in the response
153                         // body. I don't know how one would handle multiple chunk requests resulting in an OK
154                         // response for the same file. The request algorithm might be need to be smarter for
155                         // that.
156                         discarded, _ := io.CopyN(io.Discard, result.resp.Body, part.e.Start)
157                         if discarded != 0 {
158                                 log.Printf("discarded %v bytes in webseed request response part", discarded)
159                         }
160                         _, err := io.CopyN(buf, result.resp.Body, part.e.Length)
161                         return err
162                 } else {
163                         return ErrBadResponse{"resp status ok but requested range", result.resp}
164                 }
165         default:
166                 return ErrBadResponse{
167                         fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
168                         result.resp,
169                 }
170         }
171 }
172
173 func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
174         var buf bytes.Buffer
175         for _, part := range parts {
176                 part.start()
177                 err = recvPartResult(ctx, &buf, part)
178                 if err != nil {
179                         err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
180                         break
181                 }
182         }
183         return buf.Bytes(), err
184 }