]> Sergey Matveev's repositories - btrtrc.git/blob - webseed/client.go
Handle 503 returns from webseed peer endpoints
[btrtrc.git] / webseed / client.go
1 package webseed
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "strings"
12
13         "github.com/RoaringBitmap/roaring"
14         "github.com/anacrolix/torrent/common"
15         "github.com/anacrolix/torrent/metainfo"
16         "github.com/anacrolix/torrent/segments"
17 )
18
19 type RequestSpec = segments.Extent
20
21 type requestPartResult struct {
22         resp *http.Response
23         err  error
24 }
25
26 type requestPart struct {
27         req    *http.Request
28         e      segments.Extent
29         result chan requestPartResult
30         start  func()
31 }
32
33 type Request struct {
34         cancel func()
35         Result chan RequestResult
36 }
37
38 func (r Request) Cancel() {
39         r.cancel()
40 }
41
42 type Client struct {
43         HttpClient *http.Client
44         Url        string
45         fileIndex  segments.Index
46         info       *metainfo.Info
47         // The pieces we can request with the Url. We're more likely to ban/block at the file-level
48         // given that's how requests are mapped to webseeds, but the torrent.Client works at the piece
49         // level. We can map our file-level adjustments to the pieces here. This probably need to be
50         // private in the future, if Client ever starts removing pieces.
51         Pieces roaring.Bitmap
52 }
53
54 func (me *Client) SetInfo(info *metainfo.Info) {
55         if !strings.HasSuffix(me.Url, "/") && info.IsDir() {
56                 // In my experience, this is a non-conforming webseed. For example the
57                 // http://ia600500.us.archive.org/1/items URLs in archive.org torrents.
58                 return
59         }
60         me.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
61         me.info = info
62         me.Pieces.AddRange(0, uint64(info.NumPieces()))
63 }
64
65 type RequestResult struct {
66         Bytes []byte
67         Err   error
68 }
69
70 func (ws *Client) NewRequest(r RequestSpec) Request {
71         ctx, cancel := context.WithCancel(context.Background())
72         var requestParts []requestPart
73         if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
74                 req, err := NewRequest(ws.Url, i, ws.info, e.Start, e.Length)
75                 if err != nil {
76                         panic(err)
77                 }
78                 req = req.WithContext(ctx)
79                 part := requestPart{
80                         req:    req,
81                         result: make(chan requestPartResult, 1),
82                         e:      e,
83                 }
84                 part.start = func() {
85                         go func() {
86                                 resp, err := ws.HttpClient.Do(req)
87                                 part.result <- requestPartResult{
88                                         resp: resp,
89                                         err:  err,
90                                 }
91                         }()
92                 }
93                 requestParts = append(requestParts, part)
94                 return true
95         }) {
96                 panic("request out of file bounds")
97         }
98         req := Request{
99                 cancel: cancel,
100                 Result: make(chan RequestResult, 1),
101         }
102         go func() {
103                 b, err := readRequestPartResponses(ctx, requestParts)
104                 req.Result <- RequestResult{
105                         Bytes: b,
106                         Err:   err,
107                 }
108         }()
109         return req
110 }
111
112 type ErrBadResponse struct {
113         Msg      string
114         Response *http.Response
115 }
116
117 func (me ErrBadResponse) Error() string {
118         return me.Msg
119 }
120
121 func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error {
122         result := <-part.result
123         // Make sure there's no further results coming, it should be a one-shot channel.
124         close(part.result)
125         if result.err != nil {
126                 return result.err
127         }
128         defer result.resp.Body.Close()
129         if ctx.Err() != nil {
130                 return ctx.Err()
131         }
132         switch result.resp.StatusCode {
133         case http.StatusPartialContent:
134                 copied, err := io.Copy(buf, result.resp.Body)
135                 if err != nil {
136                         return err
137                 }
138                 if copied != part.e.Length {
139                         return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
140                 }
141                 return nil
142         case http.StatusOK:
143                 // This number is based on
144                 // https://archive.org/download/BloodyPitOfHorror/BloodyPitOfHorror.asr.srt. It seems that
145                 // archive.org might be using a webserver implementation that refuses to do partial
146                 // responses to small files.
147                 if part.e.Start < 48<<10 {
148                         if part.e.Start != 0 {
149                                 log.Printf("resp status ok but requested range [url=%q, range=%q]",
150                                         part.req.URL,
151                                         part.req.Header.Get("Range"))
152                         }
153                         // Instead of discarding, we could try receiving all the chunks present in the response
154                         // body. I don't know how one would handle multiple chunk requests resulting in an OK
155                         // response for the same file. The request algorithm might be need to be smarter for
156                         // that.
157                         discarded, _ := io.CopyN(io.Discard, result.resp.Body, part.e.Start)
158                         if discarded != 0 {
159                                 log.Printf("discarded %v bytes in webseed request response part", discarded)
160                         }
161                         _, err := io.CopyN(buf, result.resp.Body, part.e.Length)
162                         return err
163                 } else {
164                         return ErrBadResponse{"resp status ok but requested range", result.resp}
165                 }
166         case http.StatusServiceUnavailable:
167                 return ErrTooFast
168         default:
169                 return ErrBadResponse{
170                         fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
171                         result.resp,
172                 }
173         }
174 }
175
176 var ErrTooFast = errors.New("making requests too fast")
177
178 func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) {
179         var buf bytes.Buffer
180         for _, part := range parts {
181                 part.start()
182                 err = recvPartResult(ctx, &buf, part)
183                 if err != nil {
184                         err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
185                         break
186                 }
187         }
188         return buf.Bytes(), err
189 }