]> Sergey Matveev's repositories - btrtrc.git/blob - webseed/client.go
Add a comment about not discarding in webseed OK response bodies
[btrtrc.git] / webseed / client.go
1 package webseed
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "io"
8         "log"
9         "net/http"
10         "strings"
11
12         "github.com/RoaringBitmap/roaring"
13         "github.com/anacrolix/torrent/common"
14         "github.com/anacrolix/torrent/metainfo"
15         "github.com/anacrolix/torrent/segments"
16 )
17
18 type RequestSpec = segments.Extent
19
20 type requestPartResult struct {
21         resp *http.Response
22         err  error
23 }
24
25 type requestPart struct {
26         req    *http.Request
27         e      segments.Extent
28         result chan requestPartResult
29 }
30
31 type Request struct {
32         cancel func()
33         Result chan RequestResult
34 }
35
36 func (r Request) Cancel() {
37         r.cancel()
38 }
39
40 type Client struct {
41         HttpClient *http.Client
42         Url        string
43         fileIndex  segments.Index
44         info       *metainfo.Info
45         // The pieces we can request with the Url. We're more likely to ban/block at the file-level
46         // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
47         // level. We can map our file-level adjustments to the pieces here. This probably need to be
48         // private in the future, if Client ever starts removing pieces.
49         Pieces roaring.Bitmap
50 }
51
52 func (me *Client) SetInfo(info *metainfo.Info) {
53         if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
54                 // In my experience, this is a non-conforming webseed. For example the
55                 // http://ia600500.us.archive.org/1/items URLs in archive.org torrents.
56                 return
57         }
58         me.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
59         me.info = info
60         me.Pieces.AddRange(0, uint64(info.NumPieces()))
61 }
62
63 type RequestResult struct {
64         Bytes []byte
65         Err   error
66 }
67
68 func (ws *Client) NewRequest(r RequestSpec) Request {
69         ctx, cancel := context.WithCancel(context.Background())
70         var requestParts []requestPart
71         if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
72                 req, err := NewRequest(ws.Url, i, ws.info, e.Start, e.Length)
73                 if err != nil {
74                         panic(err)
75                 }
76                 req = req.WithContext(ctx)
77                 part := requestPart{
78                         req:    req,
79                         result: make(chan requestPartResult, 1),
80                         e:      e,
81                 }
82                 go func() {
83                         resp, err := ws.HttpClient.Do(req)
84                         part.result <- requestPartResult{
85                                 resp: resp,
86                                 err:  err,
87                         }
88                 }()
89                 requestParts = append(requestParts, part)
90                 return true
91         }) {
92                 panic("request out of file bounds")
93         }
94         req := Request{
95                 cancel: cancel,
96                 Result: make(chan RequestResult, 1),
97         }
98         go func() {
99                 b, err := readRequestPartResponses(ctx, requestParts)
100                 req.Result <- RequestResult{
101                         Bytes: b,
102                         Err:   err,
103                 }
104         }()
105         return req
106 }
107
108 type ErrBadResponse struct {
109         Msg      string
110         Response *http.Response
111 }
112
113 func (me ErrBadResponse) Error() string {
114         return me.Msg
115 }
116
117 func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
118         result := <-part.result
119         if result.err != nil {
120                 return result.err
121         }
122         defer result.resp.Body.Close()
123         if ctx.Err() != nil {
124                 return ctx.Err()
125         }
126         switch result.resp.StatusCode {
127         case http.StatusPartialContent:
128                 copied, err := io.Copy(buf, result.resp.Body)
129                 if err != nil {
130                         return err
131                 }
132                 if copied != part.e.Length {
133                         return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
134                 }
135                 return nil
136         case http.StatusOK:
137                 // This number is based on
138                 // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
139                 // archive.org might be using a webserver implementation that refuses to do partial
140                 // responses to small files.
141                 if part.e.Start < 48<<10 {
142                         if part.e.Start != 0 {
143                                 log.Printf("resp status ok but requested range [url=%q, range=%q]",
144                                         part.req.URL,
145                                         part.req.Header.Get("Range"))
146                         }
147                         // Instead of discarding, we could try receiving all the chunks present in the response
148                         // body. I don't know how one would handle multiple chunk requests resulting in an OK
149                         // response for the same file. The request algorithm might be need to be smarter for
150                         // that.
151                         discarded, _ := io.CopyN(io.Discard, result.resp.Body, part.e.Start)
152                         if discarded != 0 {
153                                 log.Printf("discarded %v bytes in webseed request response part", discarded)
154                         }
155                         _, err := io.CopyN(buf, result.resp.Body, part.e.Length)
156                         return err
157                 } else {
158                         return ErrBadResponse{"resp status ok but requested range", result.resp}
159                 }
160         default:
161                 return ErrBadResponse{
162                         fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
163                         result.resp,
164                 }
165         }
166 }
167
168 func readRequestPartResponses(ctx context.Context, parts []requestPart) ([]byte, error) {
169         ctx, cancel := context.WithCancel(ctx)
170         defer cancel()
171         var buf bytes.Buffer
172         firstErr := make(chan error, 1)
173         go func() {
174                 for _, part := range parts {
175                         err := recvPartResult(ctx, &buf, part)
176                         if err != nil {
177                                 // Ensure no further unnecessary response reads occur.
178                                 cancel()
179                                 select {
180                                 case firstErr <- fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err):
181                                 default:
182                                 }
183                         }
184                 }
185                 select {
186                 case firstErr <- nil:
187                 default:
188                 }
189         }()
190         // This can't be merged into the return statement, because buf.Bytes is called first!
191         err := <-firstErr
192         return buf.Bytes(), err
193 }