]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/udp.go
033598e50f5163ab489038f51515c601e8a757c7
[btrtrc.git] / tracker / udp.go
1 package tracker
2
3 import (
4         "bytes"
5         "context"
6         "encoding"
7         "encoding/binary"
8         "fmt"
9         "io"
10         "math/rand"
11         "net"
12         "net/url"
13         "time"
14
15         "github.com/anacrolix/dht/v2/krpc"
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/pproffd"
18         "github.com/pkg/errors"
19 )
20
21 type Action int32
22
23 const (
24         ActionConnect Action = iota
25         ActionAnnounce
26         ActionScrape
27         ActionError
28
29         connectRequestConnectionId = 0x41727101980
30
31         // BEP 41
32         optionTypeEndOfOptions = 0
33         optionTypeNOP          = 1
34         optionTypeURLData      = 2
35 )
36
37 type ConnectionRequest struct {
38         ConnectionId int64
39         Action       int32
40         TransctionId int32
41 }
42
43 type ConnectionResponse struct {
44         ConnectionId int64
45 }
46
47 type ResponseHeader struct {
48         Action        Action
49         TransactionId int32
50 }
51
52 type RequestHeader struct {
53         ConnectionId  int64
54         Action        Action
55         TransactionId int32
56 } // 16 bytes
57
58 type AnnounceResponseHeader struct {
59         Interval int32
60         Leechers int32
61         Seeders  int32
62 }
63
64 func newTransactionId() int32 {
65         return int32(rand.Uint32())
66 }
67
68 func timeout(contiguousTimeouts int) (d time.Duration) {
69         if contiguousTimeouts > 8 {
70                 contiguousTimeouts = 8
71         }
72         d = 15 * time.Second
73         for ; contiguousTimeouts > 0; contiguousTimeouts-- {
74                 d *= 2
75         }
76         return
77 }
78
79 type udpAnnounce struct {
80         contiguousTimeouts   int
81         connectionIdReceived time.Time
82         connectionId         int64
83         socket               net.Conn
84         url                  url.URL
85         a                    *Announce
86 }
87
88 func (c *udpAnnounce) Close() error {
89         if c.socket != nil {
90                 return c.socket.Close()
91         }
92         return nil
93 }
94
95 func (c *udpAnnounce) ipv6() bool {
96         if c.a.UdpNetwork == "udp6" {
97                 return true
98         }
99         rip := missinggo.AddrIP(c.socket.RemoteAddr())
100         return rip.To16() != nil && rip.To4() == nil
101 }
102
103 func (c *udpAnnounce) Do(req AnnounceRequest) (res AnnounceResponse, err error) {
104         err = c.connect()
105         if err != nil {
106                 return
107         }
108         reqURI := c.url.RequestURI()
109         if c.ipv6() {
110                 // BEP 15
111                 req.IPAddress = 0
112         } else if req.IPAddress == 0 && c.a.ClientIp4.IP != nil {
113                 req.IPAddress = binary.BigEndian.Uint32(c.a.ClientIp4.IP.To4())
114         }
115         // Clearly this limits the request URI to 255 bytes. BEP 41 supports
116         // longer but I'm not fussed.
117         options := append([]byte{optionTypeURLData, byte(len(reqURI))}, []byte(reqURI)...)
118         vars.Add("udp tracker announces", 1)
119         b, err := c.request(ActionAnnounce, req, options)
120         if err != nil {
121                 return
122         }
123         var h AnnounceResponseHeader
124         err = readBody(b, &h)
125         if err != nil {
126                 if err == io.EOF {
127                         err = io.ErrUnexpectedEOF
128                 }
129                 err = fmt.Errorf("error parsing announce response: %s", err)
130                 return
131         }
132         res.Interval = h.Interval
133         res.Leechers = h.Leechers
134         res.Seeders = h.Seeders
135         nas := func() interface {
136                 encoding.BinaryUnmarshaler
137                 NodeAddrs() []krpc.NodeAddr
138         } {
139                 if c.ipv6() {
140                         return &krpc.CompactIPv6NodeAddrs{}
141                 } else {
142                         return &krpc.CompactIPv4NodeAddrs{}
143                 }
144         }()
145         err = nas.UnmarshalBinary(b.Bytes())
146         if err != nil {
147                 return
148         }
149         for _, cp := range nas.NodeAddrs() {
150                 res.Peers = append(res.Peers, Peer{}.FromNodeAddr(cp))
151         }
152         return
153 }
154
155 // body is the binary serializable request body. trailer is optional data
156 // following it, such as for BEP 41.
157 func (c *udpAnnounce) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
158         var buf bytes.Buffer
159         err = binary.Write(&buf, binary.BigEndian, h)
160         if err != nil {
161                 panic(err)
162         }
163         if body != nil {
164                 err = binary.Write(&buf, binary.BigEndian, body)
165                 if err != nil {
166                         panic(err)
167                 }
168         }
169         _, err = buf.Write(trailer)
170         if err != nil {
171                 return
172         }
173         n, err := c.socket.Write(buf.Bytes())
174         if err != nil {
175                 return
176         }
177         if n != buf.Len() {
178                 panic("write should send all or error")
179         }
180         return
181 }
182
183 func read(r io.Reader, data interface{}) error {
184         return binary.Read(r, binary.BigEndian, data)
185 }
186
187 func write(w io.Writer, data interface{}) error {
188         return binary.Write(w, binary.BigEndian, data)
189 }
190
191 // args is the binary serializable request body. trailer is optional data
192 // following it, such as for BEP 41.
193 func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (*bytes.Buffer, error) {
194         tid := newTransactionId()
195         if err := errors.Wrap(
196                 c.write(
197                         &RequestHeader{
198                                 ConnectionId:  c.connectionId,
199                                 Action:        action,
200                                 TransactionId: tid,
201                         }, args, options),
202                 "writing request",
203         ); err != nil {
204                 return nil, err
205         }
206         c.socket.SetReadDeadline(time.Now().Add(timeout(c.contiguousTimeouts)))
207         b := make([]byte, 0x800) // 2KiB
208         for {
209                 var (
210                         n        int
211                         readErr  error
212                         readDone = make(chan struct{})
213                 )
214                 go func() {
215                         defer close(readDone)
216                         n, readErr = c.socket.Read(b)
217                 }()
218                 ctx := c.a.Context
219                 if ctx == nil {
220                         ctx = context.Background()
221                 }
222                 select {
223                 case <-ctx.Done():
224                         return nil, ctx.Err()
225                 case <-readDone:
226                 }
227                 if opE, ok := readErr.(*net.OpError); ok && opE.Timeout() {
228                         c.contiguousTimeouts++
229                 }
230                 if readErr != nil {
231                         return nil, errors.Wrap(readErr, "reading from socket")
232                 }
233                 buf := bytes.NewBuffer(b[:n])
234                 var h ResponseHeader
235                 err := binary.Read(buf, binary.BigEndian, &h)
236                 switch err {
237                 default:
238                         panic(err)
239                 case io.ErrUnexpectedEOF, io.EOF:
240                         continue
241                 case nil:
242                 }
243                 if h.TransactionId != tid {
244                         continue
245                 }
246                 c.contiguousTimeouts = 0
247                 if h.Action == ActionError {
248                         err = errors.New(buf.String())
249                 }
250                 return buf, err
251         }
252 }
253
254 func readBody(r io.Reader, data ...interface{}) (err error) {
255         for _, datum := range data {
256                 err = binary.Read(r, binary.BigEndian, datum)
257                 if err != nil {
258                         break
259                 }
260         }
261         return
262 }
263
264 func (c *udpAnnounce) connected() bool {
265         return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
266 }
267
268 func (c *udpAnnounce) dialNetwork() string {
269         if c.a.UdpNetwork != "" {
270                 return c.a.UdpNetwork
271         }
272         return "udp"
273 }
274
275 func (c *udpAnnounce) connect() (err error) {
276         if c.connected() {
277                 return nil
278         }
279         c.connectionId = connectRequestConnectionId
280         if c.socket == nil {
281                 hmp := missinggo.SplitHostMaybePort(c.url.Host)
282                 if hmp.NoPort {
283                         hmp.NoPort = false
284                         hmp.Port = 80
285                 }
286                 c.socket, err = net.Dial(c.dialNetwork(), hmp.String())
287                 if err != nil {
288                         return
289                 }
290                 c.socket = pproffd.WrapNetConn(c.socket)
291         }
292         vars.Add("udp tracker connects", 1)
293         b, err := c.request(ActionConnect, nil, nil)
294         if err != nil {
295                 return
296         }
297         var res ConnectionResponse
298         err = readBody(b, &res)
299         if err != nil {
300                 return
301         }
302         c.connectionId = res.ConnectionId
303         c.connectionIdReceived = time.Now()
304         return
305 }
306
307 // TODO: Split on IPv6, as BEP 15 says response peer decoding depends on
308 // network in use.
309 func announceUDP(opt Announce, _url *url.URL) (AnnounceResponse, error) {
310         ua := udpAnnounce{
311                 url: *_url,
312                 a:   &opt,
313         }
314         defer ua.Close()
315         return ua.Do(opt.Request)
316 }