13 "github.com/anacrolix/dht/v2/krpc"
16 // Client interacts with UDP trackers via its Writer and Dispatcher. It has no knowledge of
17 // connection specifics.
21 connIdIssued time.Time
23 shouldReconnectOverride func() bool
25 Dispatcher *Dispatcher
29 func (cl *Client) Announce(
30 ctx context.Context, req AnnounceRequest, opts Options,
31 // Decides whether the response body is IPv6 or IPv4, see BEP 15.
32 ipv6 func(net.Addr) bool,
34 respHdr AnnounceResponseHeader,
35 // A slice of krpc.NodeAddr, likely wrapped in an appropriate unmarshalling wrapper.
36 peers AnnounceResponsePeers,
39 respBody, addr, err := cl.request(ctx, ActionAnnounce, append(mustMarshal(req), opts.Encode()...))
43 r := bytes.NewBuffer(respBody)
44 err = Read(r, &respHdr)
46 err = fmt.Errorf("reading response header: %w", err)
50 peers = &krpc.CompactIPv6NodeAddrs{}
52 peers = &krpc.CompactIPv4NodeAddrs{}
54 err = peers.UnmarshalBinary(r.Bytes())
56 err = fmt.Errorf("reading response peers: %w", err)
61 // There's no way to pass options in a scrape, since we don't when the request body ends.
62 func (cl *Client) Scrape(
63 ctx context.Context, ihs []InfoHash,
65 out ScrapeResponse, err error,
67 respBody, _, err := cl.request(ctx, ActionScrape, mustMarshal(ScrapeRequest(ihs)))
71 r := bytes.NewBuffer(respBody)
73 var item ScrapeInfohashResult
78 out = append(out, item)
80 if len(out) > len(ihs) {
81 err = fmt.Errorf("got %v results but expected %v", len(out), len(ihs))
87 func (cl *Client) shouldReconnectDefault() bool {
88 return cl.connIdIssued.IsZero() || time.Since(cl.connIdIssued) >= time.Minute
91 func (cl *Client) shouldReconnect() bool {
92 if cl.shouldReconnectOverride != nil {
93 return cl.shouldReconnectOverride()
95 return cl.shouldReconnectDefault()
98 func (cl *Client) connect(ctx context.Context) (err error) {
99 if !cl.shouldReconnect() {
102 return cl.doConnectRoundTrip(ctx)
105 // This just does the connect request and updates local state if it succeeds.
106 func (cl *Client) doConnectRoundTrip(ctx context.Context) (err error) {
107 respBody, _, err := cl.request(ctx, ActionConnect, nil)
111 var connResp ConnectionResponse
112 err = binary.Read(bytes.NewReader(respBody), binary.BigEndian, &connResp)
116 cl.connId = connResp.ConnectionId
117 cl.connIdIssued = time.Now()
118 //log.Printf("conn id set to %x", cl.connId)
122 func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id ConnectionId, err error) {
123 if action == ActionConnect {
124 id = ConnectRequestConnectionId
127 err = cl.connect(ctx)
135 func (cl *Client) writeRequest(
136 ctx context.Context, action Action, body []byte, tId TransactionId, buf *bytes.Buffer,
140 var connId ConnectionId
141 if action == ActionConnect {
142 connId = ConnectRequestConnectionId
144 // We lock here while establishing a connection ID, and then ensuring that the request is
145 // written before allowing the connection ID to change again. This is to ensure the server
146 // doesn't assign us another ID before we've sent this request. Note that this doesn't allow
147 // for us to return if the context is cancelled while we wait to obtain a new ID.
150 connId, err = cl.connIdForRequest(ctx, action)
156 err = Write(buf, RequestHeader{
157 ConnectionId: connId,
165 _, err = cl.Writer.Write(buf.Bytes())
166 //log.Printf("sent request with conn id %x", connId)
170 func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
173 err = cl.writeRequest(ctx, action, body, tId, &buf)
180 case <-time.After(timeout(n)):
185 const ConnectionIdMissmatchNul = "Connection ID missmatch.\x00"
187 type ErrorResponse struct {
191 func (me ErrorResponse) Error() string {
192 return fmt.Sprintf("error response: %#q", me.Message)
195 func (cl *Client) request(ctx context.Context, action Action, body []byte) (respBody []byte, addr net.Addr, err error) {
196 respChan := make(chan DispatchedResponse, 1)
197 t := cl.Dispatcher.NewTransaction(func(dr DispatchedResponse) {
201 ctx, cancel := context.WithCancel(ctx)
203 writeErr := make(chan error, 1)
205 writeErr <- cl.requestWriter(ctx, action, body, t.Id())
208 case dr := <-respChan:
209 if dr.Header.Action == action {
212 } else if dr.Header.Action == ActionError {
213 // udp://tracker.torrent.eu.org:451/announce frequently returns "Connection ID
215 err = ErrorResponse{Message: string(dr.Body)}
217 err = fmt.Errorf("unexpected response action %v", dr.Header.Action)
219 case err = <-writeErr:
220 err = fmt.Errorf("write error: %w", err)