]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/udp.go
Merge branch 'v1.19'
[btrtrc.git] / tracker / udp.go
1 package tracker
2
3 import (
4         "bytes"
5         "context"
6         "encoding"
7         "encoding/binary"
8         "fmt"
9         "io"
10         "math/rand"
11         "net"
12         "net/url"
13         "time"
14
15         "github.com/anacrolix/dht/v2/krpc"
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/pproffd"
18         "github.com/pkg/errors"
19 )
20
21 type Action int32
22
23 const (
24         ActionConnect Action = iota
25         ActionAnnounce
26         ActionScrape
27         ActionError
28
29         connectRequestConnectionId = 0x41727101980
30
31         // BEP 41
32         optionTypeEndOfOptions = 0
33         optionTypeNOP          = 1
34         optionTypeURLData      = 2
35 )
36
37 type ConnectionRequest struct {
38         ConnectionId int64
39         Action       int32
40         TransctionId int32
41 }
42
43 type ConnectionResponse struct {
44         ConnectionId int64
45 }
46
47 type ResponseHeader struct {
48         Action        Action
49         TransactionId int32
50 }
51
52 type RequestHeader struct {
53         ConnectionId  int64
54         Action        Action
55         TransactionId int32
56 } // 16 bytes
57
58 type AnnounceResponseHeader struct {
59         Interval int32
60         Leechers int32
61         Seeders  int32
62 }
63
64 func newTransactionId() int32 {
65         return int32(rand.Uint32())
66 }
67
68 func timeout(contiguousTimeouts int) (d time.Duration) {
69         if contiguousTimeouts > 8 {
70                 contiguousTimeouts = 8
71         }
72         d = 15 * time.Second
73         for ; contiguousTimeouts > 0; contiguousTimeouts-- {
74                 d *= 2
75         }
76         return
77 }
78
79 type udpAnnounce struct {
80         contiguousTimeouts   int
81         connectionIdReceived time.Time
82         connectionId         int64
83         socket               net.Conn
84         url                  url.URL
85         a                    *Announce
86 }
87
88 func (c *udpAnnounce) Close() error {
89         if c.socket != nil {
90                 return c.socket.Close()
91         }
92         return nil
93 }
94
95 func (c *udpAnnounce) ipv6() bool {
96         if c.a.UdpNetwork == "udp6" {
97                 return true
98         }
99         rip := missinggo.AddrIP(c.socket.RemoteAddr())
100         return rip.To16() != nil && rip.To4() == nil
101 }
102
103 func (c *udpAnnounce) Do(req AnnounceRequest) (res AnnounceResponse, err error) {
104         err = c.connect()
105         if err != nil {
106                 return
107         }
108         reqURI := c.url.RequestURI()
109         if c.ipv6() {
110                 // BEP 15
111                 req.IPAddress = 0
112         } else if req.IPAddress == 0 && c.a.ClientIp4.IP != nil {
113                 req.IPAddress = binary.BigEndian.Uint32(c.a.ClientIp4.IP.To4())
114         }
115         // Clearly this limits the request URI to 255 bytes. BEP 41 supports
116         // longer but I'm not fussed.
117         options := append([]byte{optionTypeURLData, byte(len(reqURI))}, []byte(reqURI)...)
118         b, err := c.request(ActionAnnounce, req, options)
119         if err != nil {
120                 return
121         }
122         var h AnnounceResponseHeader
123         err = readBody(b, &h)
124         if err != nil {
125                 if err == io.EOF {
126                         err = io.ErrUnexpectedEOF
127                 }
128                 err = fmt.Errorf("error parsing announce response: %s", err)
129                 return
130         }
131         res.Interval = h.Interval
132         res.Leechers = h.Leechers
133         res.Seeders = h.Seeders
134         nas := func() interface {
135                 encoding.BinaryUnmarshaler
136                 NodeAddrs() []krpc.NodeAddr
137         } {
138                 if c.ipv6() {
139                         return &krpc.CompactIPv6NodeAddrs{}
140                 } else {
141                         return &krpc.CompactIPv4NodeAddrs{}
142                 }
143         }()
144         err = nas.UnmarshalBinary(b.Bytes())
145         if err != nil {
146                 return
147         }
148         for _, cp := range nas.NodeAddrs() {
149                 res.Peers = append(res.Peers, Peer{}.FromNodeAddr(cp))
150         }
151         return
152 }
153
154 // body is the binary serializable request body. trailer is optional data
155 // following it, such as for BEP 41.
156 func (c *udpAnnounce) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
157         var buf bytes.Buffer
158         err = binary.Write(&buf, binary.BigEndian, h)
159         if err != nil {
160                 panic(err)
161         }
162         if body != nil {
163                 err = binary.Write(&buf, binary.BigEndian, body)
164                 if err != nil {
165                         panic(err)
166                 }
167         }
168         _, err = buf.Write(trailer)
169         if err != nil {
170                 return
171         }
172         n, err := c.socket.Write(buf.Bytes())
173         if err != nil {
174                 return
175         }
176         if n != buf.Len() {
177                 panic("write should send all or error")
178         }
179         return
180 }
181
182 func read(r io.Reader, data interface{}) error {
183         return binary.Read(r, binary.BigEndian, data)
184 }
185
186 func write(w io.Writer, data interface{}) error {
187         return binary.Write(w, binary.BigEndian, data)
188 }
189
190 // args is the binary serializable request body. trailer is optional data
191 // following it, such as for BEP 41.
192 func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (*bytes.Buffer, error) {
193         tid := newTransactionId()
194         if err := errors.Wrap(
195                 c.write(
196                         &RequestHeader{
197                                 ConnectionId:  c.connectionId,
198                                 Action:        action,
199                                 TransactionId: tid,
200                         }, args, options),
201                 "writing request",
202         ); err != nil {
203                 return nil, err
204         }
205         c.socket.SetReadDeadline(time.Now().Add(timeout(c.contiguousTimeouts)))
206         b := make([]byte, 0x800) // 2KiB
207         for {
208                 var (
209                         n        int
210                         readErr  error
211                         readDone = make(chan struct{})
212                 )
213                 go func() {
214                         defer close(readDone)
215                         n, readErr = c.socket.Read(b)
216                 }()
217                 ctx := c.a.Context
218                 if ctx == nil {
219                         ctx = context.Background()
220                 }
221                 select {
222                 case <-ctx.Done():
223                         return nil, ctx.Err()
224                 case <-readDone:
225                 }
226                 if opE, ok := readErr.(*net.OpError); ok && opE.Timeout() {
227                         c.contiguousTimeouts++
228                 }
229                 if readErr != nil {
230                         return nil, errors.Wrap(readErr, "reading from socket")
231                 }
232                 buf := bytes.NewBuffer(b[:n])
233                 var h ResponseHeader
234                 err := binary.Read(buf, binary.BigEndian, &h)
235                 switch err {
236                 default:
237                         panic(err)
238                 case io.ErrUnexpectedEOF, io.EOF:
239                         continue
240                 case nil:
241                 }
242                 if h.TransactionId != tid {
243                         continue
244                 }
245                 c.contiguousTimeouts = 0
246                 if h.Action == ActionError {
247                         err = errors.New(buf.String())
248                 }
249                 return buf, err
250         }
251 }
252
253 func readBody(r io.Reader, data ...interface{}) (err error) {
254         for _, datum := range data {
255                 err = binary.Read(r, binary.BigEndian, datum)
256                 if err != nil {
257                         break
258                 }
259         }
260         return
261 }
262
263 func (c *udpAnnounce) connected() bool {
264         return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
265 }
266
267 func (c *udpAnnounce) dialNetwork() string {
268         if c.a.UdpNetwork != "" {
269                 return c.a.UdpNetwork
270         }
271         return "udp"
272 }
273
274 func (c *udpAnnounce) connect() (err error) {
275         if c.connected() {
276                 return nil
277         }
278         c.connectionId = connectRequestConnectionId
279         if c.socket == nil {
280                 hmp := missinggo.SplitHostMaybePort(c.url.Host)
281                 if hmp.NoPort {
282                         hmp.NoPort = false
283                         hmp.Port = 80
284                 }
285                 c.socket, err = net.Dial(c.dialNetwork(), hmp.String())
286                 if err != nil {
287                         return
288                 }
289                 c.socket = pproffd.WrapNetConn(c.socket)
290         }
291         b, err := c.request(ActionConnect, nil, nil)
292         if err != nil {
293                 return
294         }
295         var res ConnectionResponse
296         err = readBody(b, &res)
297         if err != nil {
298                 return
299         }
300         c.connectionId = res.ConnectionId
301         c.connectionIdReceived = time.Now()
302         return
303 }
304
305 // TODO: Split on IPv6, as BEP 15 says response peer decoding depends on
306 // network in use.
307 func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) {
308         ua := udpAnnounce{
309                 url: *_url,
310                 a:   &opt,
311         }
312         defer ua.Close()
313         return ua.Do(opt.Request)
314 }