14 "github.com/anacrolix/dht/v2/krpc"
17 // Client interacts with UDP trackers via its Writer and Dispatcher. It has no knowledge of
18 // connection specifics.
22 connIdIssued time.Time
23 Dispatcher *Dispatcher
27 func (cl *Client) Announce(
28 ctx context.Context, req AnnounceRequest, opts Options,
29 // Decides whether the response body is IPv6 or IPv4, see BEP 15.
30 ipv6 func(net.Addr) bool,
32 respHdr AnnounceResponseHeader,
33 // A slice of krpc.NodeAddr, likely wrapped in an appropriate unmarshalling wrapper.
34 peers AnnounceResponsePeers,
37 respBody, addr, err := cl.request(ctx, ActionAnnounce, append(mustMarshal(req), opts.Encode()...))
41 r := bytes.NewBuffer(respBody)
42 err = Read(r, &respHdr)
44 err = fmt.Errorf("reading response header: %w", err)
48 peers = &krpc.CompactIPv6NodeAddrs{}
50 peers = &krpc.CompactIPv4NodeAddrs{}
52 err = peers.UnmarshalBinary(r.Bytes())
54 err = fmt.Errorf("reading response peers: %w", err)
59 // There's no way to pass options in a scrape, since we don't when the request body ends.
60 func (cl *Client) Scrape(
61 ctx context.Context, ihs []InfoHash,
63 out ScrapeResponse, err error,
65 respBody, _, err := cl.request(ctx, ActionScrape, mustMarshal(ScrapeRequest(ihs)))
69 r := bytes.NewBuffer(respBody)
71 var item ScrapeInfohashResult
76 out = append(out, item)
78 if len(out) > len(ihs) {
79 err = fmt.Errorf("got %v results but expected %v", len(out), len(ihs))
85 func (cl *Client) connect(ctx context.Context) (err error) {
86 // We could get fancier here and use RWMutex, and even fire off the connection asynchronously
87 // and provide a grace period while it resolves.
90 if !cl.connIdIssued.IsZero() && time.Since(cl.connIdIssued) < time.Minute {
93 respBody, _, err := cl.request(ctx, ActionConnect, nil)
97 var connResp ConnectionResponse
98 err = binary.Read(bytes.NewReader(respBody), binary.BigEndian, &connResp)
102 cl.connId = connResp.ConnectionId
103 cl.connIdIssued = time.Now()
107 func (cl *Client) connIdForRequest(ctx context.Context, action Action) (id ConnectionId, err error) {
108 if action == ActionConnect {
109 id = ConnectRequestConnectionId
112 err = cl.connect(ctx)
120 func (cl *Client) requestWriter(ctx context.Context, action Action, body []byte, tId TransactionId) (err error) {
123 var connId ConnectionId
124 connId, err = cl.connIdForRequest(ctx, action)
129 err = binary.Write(&buf, binary.BigEndian, RequestHeader{
130 ConnectionId: connId,
138 _, err = cl.Writer.Write(buf.Bytes())
145 case <-time.After(timeout(n)):
150 func (cl *Client) request(ctx context.Context, action Action, body []byte) (respBody []byte, addr net.Addr, err error) {
151 respChan := make(chan DispatchedResponse, 1)
152 t := cl.Dispatcher.NewTransaction(func(dr DispatchedResponse) {
156 ctx, cancel := context.WithCancel(ctx)
158 writeErr := make(chan error, 1)
160 writeErr <- cl.requestWriter(ctx, action, body, t.Id())
163 case dr := <-respChan:
164 if dr.Header.Action == action {
167 } else if dr.Header.Action == ActionError {
168 err = errors.New(string(dr.Body))
170 err = fmt.Errorf("unexpected response action %v", dr.Header.Action)
172 case err = <-writeErr:
173 err = fmt.Errorf("write error: %w", err)