]> Sergey Matveev's repositories - btrtrc.git/blob - tracker/udp/client.go
Cancel writer after request
[btrtrc.git] / tracker / udp / client.go
1 package udp
2
3 import (
4         "bytes"
5         "context"
6         "encoding/binary"
7         "errors"
8         "fmt"
9         "io"
10         "time"
11 )
12
13 type Client struct {
14         connId       ConnectionId
15         connIdIssued time.Time
16         Dispatcher   *Dispatcher
17         Writer       io.Writer
18 }
19
20 func (cl *Client) Announce(
21         ctx context.Context, req AnnounceRequest, peers AnnounceResponsePeers, opts Options,
22 ) (
23         respHdr AnnounceResponseHeader, err error,
24 ) {
25         body, err := marshal(req)
26         if err != nil {
27                 return
28         }
29         respBody, err := cl.request(ctx, ActionAnnounce, append(body, opts.Encode()...))
30         if err != nil {
31                 return
32         }
33         r := bytes.NewBuffer(respBody)
34         err = Read(r, &respHdr)
35         if err != nil {
36                 err = fmt.Errorf("reading response header: %w", err)
37                 return
38         }
39         err = peers.UnmarshalBinary(r.Bytes())
40         if err != nil {
41                 err = fmt.Errorf("reading response peers: %w", err)
42         }
43         return
44 }
45
46 func (cl *Client) connect(ctx context.Context) (err error) {
47         if time.Since(cl.connIdIssued) < time.Minute {
48                 return nil
49         }
50         respBody, err := cl.request(ctx, ActionConnect, nil)
51         if err != nil {
52                 return err
53         }
54         var connResp ConnectionResponse
55         err = binary.Read(bytes.NewReader(respBody), binary.BigEndian, &connResp)
56         if err != nil {
57                 return
58         }
59         cl.connId = connResp.ConnectionId
60         cl.connIdIssued = time.Now()
61         return
62 }
63
64 func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id ConnectionId, err error) {
65         if action == ActionConnect {
66                 id = ConnectRequestConnectionId
67                 return
68         }
69         err = cl.connect(ctx)
70         if err != nil {
71                 return
72         }
73         id = cl.connId
74         return
75 }
76
77 func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
78         var buf bytes.Buffer
79         for n := 0; ; n++ {
80                 var connId ConnectionId
81                 connId, err = cl.connIdForRequest(ctx, action)
82                 if err != nil {
83                         return
84                 }
85                 buf.Reset()
86                 err = binary.Write(&buf, binary.BigEndian, RequestHeader{
87                         ConnectionId:  connId,
88                         Action:        action,
89                         TransactionId: tId,
90                 })
91                 if err != nil {
92                         panic(err)
93                 }
94                 buf.Write(body)
95                 _, err = cl.Writer.Write(buf.Bytes())
96                 if err != nil {
97                         return
98                 }
99                 select {
100                 case <-ctx.Done():
101                         return ctx.Err()
102                 case <-time.After(timeout(n)):
103                 }
104         }
105 }
106
107 func (cl *Client) request(ctx context.Context, action Action, body []byte) (respBody []byte, err error) {
108         respChan := make(chan DispatchedResponse, 1)
109         t := cl.Dispatcher.NewTransaction(func(dr DispatchedResponse) {
110                 respChan <- dr
111         })
112         defer t.End()
113         ctx, cancel := context.WithCancel(ctx)
114         defer cancel()
115         writeErr := make(chan error, 1)
116         go func() {
117                 writeErr <- cl.requestWriter(ctx, action, body, t.Id())
118         }()
119         select {
120         case dr := <-respChan:
121                 if dr.Header.Action == action {
122                         respBody = dr.Body
123                 } else if dr.Header.Action == ActionError {
124                         err = errors.New(string(dr.Body))
125                 } else {
126                         err = fmt.Errorf("unexpected response action %v", dr.Header.Action)
127                 }
128         case err = <-writeErr:
129                 err = fmt.Errorf("write error: %w", err)
130         case <-ctx.Done():
131                 err = ctx.Err()
132         }
133         return
134 }