10 "github.com/anacrolix/log"
12 "github.com/anacrolix/torrent/tracker"
13 "github.com/gorilla/websocket"
14 "github.com/pion/datachannel"
15 "github.com/pion/webrtc/v2"
18 type TrackerClientStats struct {
20 ConvertedInboundConns int64
21 ConvertedOutboundConns int64
24 // Client represents the webtorrent client
25 type TrackerClient struct {
27 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
29 OnConn onDataChannelOpen
34 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
35 wsConn *websocket.Conn
37 stats TrackerClientStats
38 pingTicker *time.Ticker
41 func (me *TrackerClient) Stats() TrackerClientStats {
47 func (me *TrackerClient) peerIdBinary() string {
48 return binaryToJsonString(me.PeerId[:])
51 // outboundOffer represents an outstanding offer.
52 type outboundOffer struct {
53 originalOffer webrtc.SessionDescription
54 peerConnection *wrappedPeerConnection
55 dataChannel *webrtc.DataChannel
59 type DataChannelContext struct {
60 Local, Remote webrtc.SessionDescription
66 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
68 func (tc *TrackerClient) doWebsocket() error {
69 metrics.Add("websocket dials", 1)
71 c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
73 return fmt.Errorf("dialing tracker: %w", err)
76 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
82 closeChan := make(chan struct{})
86 case <-tc.pingTicker.C:
87 err := c.WriteMessage(websocket.PingMessage, []byte{})
97 err = tc.trackerReadLoop(tc.wsConn)
105 func (tc *TrackerClient) Run() error {
106 tc.pingTicker = time.NewTicker(60 * time.Second)
111 err := tc.doWebsocket()
118 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
119 time.Sleep(time.Minute)
126 func (tc *TrackerClient) Close() error {
129 if tc.wsConn != nil {
132 tc.closeUnusedOffers()
139 func (tc *TrackerClient) announceOffers() {
141 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
142 // is nil. Take ownership of outboundOffers here.
144 offers := tc.outboundOffers
145 tc.outboundOffers = nil
152 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
153 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
154 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
155 for _, offer := range offers {
156 // TODO: Capture the errors? Are we even in a position to do anything with them?
157 offer.peerConnection.Close()
158 // Use goroutine here to allow read loop to start and ensure the buffer drains.
159 go tc.Announce(tracker.Started, offer.infoHash)
163 func (tc *TrackerClient) closeUnusedOffers() {
164 for _, offer := range tc.outboundOffers {
165 offer.peerConnection.Close()
167 tc.outboundOffers = nil
170 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
171 metrics.Add("outbound announces", 1)
172 var randOfferId [20]byte
173 _, err := rand.Read(randOfferId[:])
175 return fmt.Errorf("generating offer_id bytes: %w", err)
177 offerIDBinary := binaryToJsonString(randOfferId[:])
179 pc, dc, offer, err := newOffer()
181 return fmt.Errorf("creating offer: %w", err)
184 request, err := tc.GetAnnounceRequest(event, infoHash)
187 return fmt.Errorf("getting announce parameters: %w", err)
190 req := AnnounceRequest{
191 Numwant: 1, // If higher we need to create equal amount of offers.
192 Uploaded: request.Uploaded,
193 Downloaded: request.Downloaded,
195 Event: request.Event.String(),
197 InfoHash: binaryToJsonString(infoHash[:]),
198 PeerID: tc.peerIdBinary(),
200 OfferID: offerIDBinary,
205 data, err := json.Marshal(req)
208 return fmt.Errorf("marshalling request: %w", err)
213 err = tc.writeMessage(data)
216 return fmt.Errorf("write AnnounceRequest: %w", err)
218 if tc.outboundOffers == nil {
219 tc.outboundOffers = make(map[string]outboundOffer)
221 tc.outboundOffers[offerIDBinary] = outboundOffer{
224 originalOffer: offer,
230 func (tc *TrackerClient) writeMessage(data []byte) error {
231 for tc.wsConn == nil {
233 return fmt.Errorf("%T closed", tc)
237 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
240 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
242 _, message, err := tracker.ReadMessage()
244 return fmt.Errorf("read message error: %w", err)
246 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
248 var ar AnnounceResponse
249 if err := json.Unmarshal(message, &ar); err != nil {
250 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
254 case ar.Offer != nil:
255 ih, err := jsonStringToInfoHash(ar.InfoHash)
257 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
260 tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
261 case ar.Answer != nil:
262 tc.handleAnswer(ar.OfferID, *ar.Answer)
267 func (tc *TrackerClient) handleOffer(
268 offer webrtc.SessionDescription,
273 peerConnection, answer, err := newAnsweringPeerConnection(offer)
275 return fmt.Errorf("write AnnounceResponse: %w", err)
277 response := AnnounceResponse{
279 InfoHash: binaryToJsonString(infoHash[:]),
280 PeerID: tc.peerIdBinary(),
285 data, err := json.Marshal(response)
287 peerConnection.Close()
288 return fmt.Errorf("marshalling response: %w", err)
292 if err := tc.writeMessage(data); err != nil {
293 peerConnection.Close()
294 return fmt.Errorf("writing response: %w", err)
296 timer := time.AfterFunc(30*time.Second, func() {
297 metrics.Add("answering peer connections timed out", 1)
298 peerConnection.Close()
300 peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
301 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
303 metrics.Add("answering peer connection conversions", 1)
305 tc.stats.ConvertedInboundConns++
307 tc.OnConn(dc, DataChannelContext{
319 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
322 offer, ok := tc.outboundOffers[offerId]
324 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
327 //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
328 metrics.Add("outbound offers answered", 1)
329 err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
330 metrics.Add("outbound offers answered with datachannel", 1)
332 tc.stats.ConvertedOutboundConns++
334 tc.OnConn(dc, DataChannelContext{
335 Local: offer.originalOffer,
339 InfoHash: offer.infoHash,
343 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
346 delete(tc.outboundOffers, offerId)
347 go tc.Announce(tracker.None, offer.infoHash)