1 package udpTrackerServer
13 "github.com/anacrolix/dht/v2/krpc"
14 "github.com/anacrolix/generics"
15 "github.com/anacrolix/log"
16 "go.opentelemetry.io/otel"
18 "github.com/anacrolix/torrent/tracker"
19 "github.com/anacrolix/torrent/tracker/udp"
22 type ConnectionTrackerAddr = string
24 type ConnectionTracker interface {
25 Add(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) error
26 Check(ctx context.Context, addr ConnectionTrackerAddr, id udp.ConnectionId) (bool, error)
29 type InfoHash = [20]byte
31 type AnnounceTracker = tracker.AnnounceTracker
34 ConnTracker ConnectionTracker
35 SendResponse func(data []byte, addr net.Addr) (int, error)
36 Announce *tracker.AnnounceHandler
39 type RequestSourceAddr = net.Addr
41 var tracer = otel.Tracer("torrent.tracker.udp")
43 func (me *Server) HandleRequest(
45 family udp.AddrFamily,
46 source RequestSourceAddr,
49 ctx, span := tracer.Start(ctx, "Server.HandleRequest")
51 var h udp.RequestHeader
54 err := udp.Read(&r, &h)
56 err = fmt.Errorf("reading request header: %w", err)
60 case udp.ActionConnect:
61 err = me.handleConnect(ctx, source, h.TransactionId)
62 case udp.ActionAnnounce:
63 err = me.handleAnnounce(ctx, family, source, h.ConnectionId, h.TransactionId, &r)
65 err = fmt.Errorf("unimplemented")
68 err = fmt.Errorf("handling action %v: %w", h.Action, err)
73 func (me *Server) handleAnnounce(
75 addrFamily udp.AddrFamily,
76 source RequestSourceAddr,
77 connId udp.ConnectionId,
78 tid udp.TransactionId,
81 // Should we set a timeout of 10s or something for the entire response, so that we give up if a
84 ok, err := me.ConnTracker.Check(ctx, source.String(), connId)
86 err = fmt.Errorf("checking conn id: %w", err)
90 return fmt.Errorf("incorrect connection id: %x", connId)
92 var req udp.AnnounceRequest
93 err = udp.Read(r, &req)
97 // TODO: This should be done asynchronously to responding to the announce.
98 announceAddr, err := netip.ParseAddrPort(source.String())
100 err = fmt.Errorf("converting source net.Addr to AnnounceAddr: %w", err)
103 opts := tracker.GetPeersOpts{MaxCount: generics.Some[uint](50)}
104 if addrFamily == udp.AddrFamilyIpv4 {
105 opts.MaxCount = generics.Some[uint](150)
107 res := me.Announce.Serve(ctx, req, announceAddr, opts)
111 nodeAddrs := make([]krpc.NodeAddr, 0, len(res.Peers))
112 for _, p := range res.Peers {
117 case udp.AddrFamilyIpv4:
118 if !p.Addr().Unmap().Is4() {
121 ipBuf := p.Addr().As4()
123 case udp.AddrFamilyIpv6:
124 ipBuf := p.Addr().As16()
127 nodeAddrs = append(nodeAddrs, krpc.NodeAddr{
133 err = udp.Write(&buf, udp.ResponseHeader{
134 Action: udp.ActionAnnounce,
140 err = udp.Write(&buf, udp.AnnounceResponseHeader{
141 Interval: res.Interval.UnwrapOr(5 * 60),
146 b, err := udp.GetNodeAddrsCompactMarshaler(nodeAddrs, addrFamily).MarshalBinary()
148 err = fmt.Errorf("marshalling compact node addrs: %w", err)
152 n, err := me.SendResponse(buf.Bytes(), source)
157 err = io.ErrShortWrite
162 func (me *Server) handleConnect(ctx context.Context, source RequestSourceAddr, tid udp.TransactionId) error {
163 connId := randomConnectionId()
164 err := me.ConnTracker.Add(ctx, source.String(), connId)
166 err = fmt.Errorf("recording conn id: %w", err)
170 udp.Write(&buf, udp.ResponseHeader{
171 Action: udp.ActionConnect,
174 udp.Write(&buf, udp.ConnectionResponse{connId})
175 n, err := me.SendResponse(buf.Bytes(), source)
180 err = io.ErrShortWrite
185 func randomConnectionId() udp.ConnectionId {
187 _, err := rand.Read(b[:])
191 return binary.BigEndian.Uint64(b[:])
194 func RunSimple(ctx context.Context, s *Server, pc net.PacketConn, family udp.AddrFamily) error {
195 ctx, cancel := context.WithCancel(ctx)
199 n, addr, err := pc.ReadFrom(b[:])
204 err := s.HandleRequest(ctx, family, addr, b[:n])
206 log.Printf("error handling %v byte request from %v: %v", n, addr, err)