1 package udpTrackerServer
13 "github.com/anacrolix/dht/v2/krpc"
14 "github.com/anacrolix/generics"
15 "github.com/anacrolix/log"
16 "go.opentelemetry.io/otel"
17 "go.opentelemetry.io/otel/codes"
19 "github.com/anacrolix/torrent/tracker"
20 "github.com/anacrolix/torrent/tracker/udp"
23 type ConnectionTrackerAddr = string
25 type ConnectionTracker interface {
26 Add(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) error
27 Check(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) (bool, error)
30 type InfoHash = [20]byte
32 type AnnounceTracker = tracker.AnnounceTracker
35 ConnTracker ConnectionTracker
36 SendResponse func(data []byte, addr net.Addr) (int, error)
37 Announce *tracker.AnnounceHandler
40 type RequestSourceAddr = net.Addr
42 var tracer = otel.Tracer("torrent.tracker.udp")
44 func (me *Server) HandleRequest(
46 family udp.AddrFamily,
47 source RequestSourceAddr,
50 ctx, span := tracer.Start(ctx, "Server.HandleRequest")
54 span.SetStatus(codes.Error, err.Error())
57 var h udp.RequestHeader
60 err = udp.Read(&r, &h)
62 err = fmt.Errorf("reading request header: %w", err)
66 case udp.ActionConnect:
67 err = me.handleConnect(ctx, source, h.TransactionId)
68 case udp.ActionAnnounce:
69 err = me.handleAnnounce(ctx, family, source, h.ConnectionId, h.TransactionId, &r)
71 err = fmt.Errorf("unimplemented")
74 err = fmt.Errorf("handling action %v: %w", h.Action, err)
79 func (me *Server) handleAnnounce(
81 addrFamily udp.AddrFamily,
82 source RequestSourceAddr,
83 connId udp.ConnectionId,
84 tid udp.TransactionId,
87 // Should we set a timeout of 10s or something for the entire response, so that we give up if a
90 ok, err := me.ConnTracker.Check(ctx, source.String(), connId)
92 err = fmt.Errorf("checking conn id: %w", err)
96 return fmt.Errorf("incorrect connection id: %x", connId)
98 var req udp.AnnounceRequest
99 err = udp.Read(r, &req)
103 // TODO: This should be done asynchronously to responding to the announce.
104 announceAddr, err := netip.ParseAddrPort(source.String())
106 err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err)
109 opts := tracker.GetPeersOpts{MaxCount: generics.Some[uint](50)}
110 if addrFamily == udp.AddrFamilyIpv4 {
111 opts.MaxCount = generics.Some[uint](150)
113 res := me.Announce.Serve(ctx, req, announceAddr, opts)
117 nodeAddrs := make([]krpc.NodeAddr, 0, len(res.Peers))
118 for _, p := range res.Peers {
123 case udp.AddrFamilyIpv4:
124 if !p.Addr().Unmap().Is4() {
127 ipBuf := p.Addr().As4()
129 case udp.AddrFamilyIpv6:
130 ipBuf := p.Addr().As16()
133 nodeAddrs = append(nodeAddrs, krpc.NodeAddr{
139 err = udp.Write(&buf, udp.ResponseHeader{
140 Action: udp.ActionAnnounce,
146 err = udp.Write(&buf, udp.AnnounceResponseHeader{
147 Interval: res.Interval.UnwrapOr(5 * 60),
152 b, err := udp.GetNodeAddrsCompactMarshaler(nodeAddrs, addrFamily).MarshalBinary()
154 err = fmt.Errorf("marshalling compact node addrs: %w", err)
158 n, err := me.SendResponse(buf.Bytes(), source)
163 err = io.ErrShortWrite
168 func (me *Server) handleConnect(ctx context.Context, source RequestSourceAddr, tid udp.TransactionId) error {
169 connId := randomConnectionId()
170 err := me.ConnTracker.Add(ctx, source.String(), connId)
172 err = fmt.Errorf("recording conn id: %w", err)
176 udp.Write(&buf, udp.ResponseHeader{
177 Action: udp.ActionConnect,
180 udp.Write(&buf, udp.ConnectionResponse{connId})
181 n, err := me.SendResponse(buf.Bytes(), source)
186 err = io.ErrShortWrite
191 func randomConnectionId() udp.ConnectionId {
193 _, err := rand.Read(b[:])
197 return binary.BigEndian.Uint64(b[:])
200 func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error {
201 ctx, cancel := context.WithCancel(ctx)
205 n, addr, err := pc.ReadFrom(b[:])
210 err := s.HandleRequest(ctx, family, addr, b[:n])
212 log.Printf("error handling %v byte request from %v: %v", n, addr, err)