13 "github.com/anacrolix/dht/v2/krpc"
16 // Client interacts with UDP trackers via its Writer and Dispatcher. It has no knowledge of
17 // connection specifics.
21 connIdIssued time.Time
22 Dispatcher *Dispatcher
26 func (cl *Client) Announce(
27 ctx context.Context, req AnnounceRequest, opts Options,
28 // Decides whether the response body is IPv6 or IPv4, see BEP 15.
29 ipv6 func(net.Addr) bool,
31 respHdr AnnounceResponseHeader,
32 // A slice of krpc.NodeAddr, likely wrapped in an appropriate unmarshalling wrapper.
33 peers AnnounceResponsePeers,
36 respBody, addr, err := cl.request(ctx, ActionAnnounce, append(mustMarshal(req), opts.Encode()...))
40 r := bytes.NewBuffer(respBody)
41 err = Read(r, &respHdr)
43 err = fmt.Errorf("reading response header: %w", err)
47 peers = &krpc.CompactIPv6NodeAddrs{}
49 peers = &krpc.CompactIPv4NodeAddrs{}
51 err = peers.UnmarshalBinary(r.Bytes())
53 err = fmt.Errorf("reading response peers: %w", err)
58 // There's no way to pass options in a scrape, since we don't when the request body ends.
59 func (cl *Client) Scrape(
60 ctx context.Context, ihs []InfoHash,
62 out ScrapeResponse, err error,
64 respBody, _, err := cl.request(ctx, ActionScrape, mustMarshal(ScrapeRequest(ihs)))
68 r := bytes.NewBuffer(respBody)
70 var item ScrapeInfohashResult
75 out = append(out, item)
77 if len(out) > len(ihs) {
78 err = fmt.Errorf("got %v results but expected %v", len(out), len(ihs))
84 func (cl *Client) connect(ctx context.Context) (err error) {
85 // We could get fancier here and use RWMutex, and even fire off the connection asynchronously
86 // and provide a grace period while it resolves.
89 if !cl.connIdIssued.IsZero() && time.Since(cl.connIdIssued) < time.Minute {
92 respBody, _, err := cl.request(ctx, ActionConnect, nil)
96 var connResp ConnectionResponse
97 err = binary.Read(bytes.NewReader(respBody), binary.BigEndian, &connResp)
101 cl.connId = connResp.ConnectionId
102 cl.connIdIssued = time.Now()
106 func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id ConnectionId, err error) {
107 if action == ActionConnect {
108 id = ConnectRequestConnectionId
111 err = cl.connect(ctx)
119 func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
122 var connId ConnectionId
123 connId, err = cl.connIdForRequest(ctx, action)
128 err = Write(&buf, RequestHeader{
129 ConnectionId: connId,
137 _, err = cl.Writer.Write(buf.Bytes())
144 case <-time.After(timeout(n)):
149 func (cl *Client) request(ctx context.Context, action Action, body []byte) (respBody []byte, addr net.Addr, err error) {
150 respChan := make(chan DispatchedResponse, 1)
151 t := cl.Dispatcher.NewTransaction(func(dr DispatchedResponse) {
155 ctx, cancel := context.WithCancel(ctx)
157 writeErr := make(chan error, 1)
159 writeErr <- cl.requestWriter(ctx, action, body, t.Id())
162 case dr := <-respChan:
163 if dr.Header.Action == action {
166 } else if dr.Header.Action == ActionError {
167 // I've seen "Connection ID mismatch.^@" in less and other tools, I think they're just
168 // not handling a trailing \x00 nicely.
169 err = fmt.Errorf("error response: %#q", dr.Body)
171 err = fmt.Errorf("unexpected response action %v", dr.Header.Action)
173 case err = <-writeErr:
174 err = fmt.Errorf("write error: %w", err)