From c074b30d9419dd8e15a7a213381b5b6a6d4a993b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 20 Apr 2020 13:42:25 +1000 Subject: [PATCH] Log webrtc connection errors and rework webtorrent.TrackerClient --- client.go | 16 ++-- torrent.go | 44 ++++++++-- webtorrent/tracker_client.go | 160 ++++++++++++++++++++--------------- 3 files changed, 131 insertions(+), 89 deletions(-) diff --git a/client.go b/client.go index f1e6c14e..c4e81190 100644 --- a/client.go +++ b/client.go @@ -713,7 +713,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, t defer c.close() c.Discovery = ps c.trusted = trusted - cl.runHandshookConn(c, t) + t.runHandshookConnLoggingErr(c) } // The port number for incoming peer connections. 0 if the client isn't listening. @@ -851,11 +851,11 @@ func (cl *Client) runReceivedConn(c *PeerConn) { torrent.Add("received handshake for loaded torrent", 1) cl.lock() defer cl.unlock() - cl.runHandshookConn(c, t) + t.runHandshookConnLoggingErr(c) } // Client lock must be held before entering this. -func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) { +func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { c.setTorrent(t) if c.PeerID == cl.peerID { if c.outgoing { @@ -867,7 +867,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) { // address, we won't record the remote address as a doppleganger. Instead, the initiator // can record *us* as the doppleganger. } - return + return errors.New("local and remote peer ids are the same") } c.conn.SetWriteDeadline(time.Time{}) c.r = deadlineReader{c.conn, c.r} @@ -876,16 +876,12 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) { torrent.Add("completed handshake over ipv6", 1) } if err := t.addConnection(c); err != nil { - log.Fmsg("error adding connection: %s", err).AddValues(c).SetLevel(log.Debug).Log(t.logger) - return + return fmt.Errorf("adding connection: %w", err) } defer t.dropConnection(c) go c.writer(time.Minute) cl.sendInitialMessages(c, t) - err := c.mainReadLoop() - if err != nil && cl.config.Debug { - cl.logger.Printf("error during connection main read loop: %s", err) - } + return fmt.Errorf("main read loop: %w", c.mainReadLoop()) } // See the order given in Transmission's tr_peerMsgsNew. diff --git a/torrent.go b/torrent.go index 998a05bc..63952351 100644 --- a/torrent.go +++ b/torrent.go @@ -1297,7 +1297,8 @@ func (t *Torrent) onWebRtcConn( fmt.Sprintf("webrtc offer_id %x", dcc.OfferId), ) if err != nil { - t.logger.Printf("error in handshaking webrtc connection: %v", err) + t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err) + return } if dcc.LocalOffered { pc.Discovery = PeerSourceTracker @@ -1306,7 +1307,19 @@ func (t *Torrent) onWebRtcConn( } t.cl.lock() defer t.cl.unlock() - t.cl.runHandshookConn(pc, t) + err = t.cl.runHandshookConn(pc, t) + t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err) +} + +func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) { + err := t.cl.runHandshookConn(pc, t) + if err != nil || logAll { + t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err) + } +} + +func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { + t.logRunHandshookConn(pc, false, log.Debug) } func (t *Torrent) startScrapingTracker(_url string) { @@ -1335,17 +1348,30 @@ func (t *Torrent) startScrapingTracker(_url string) { sl := func() torrentTrackerAnnouncer { switch u.Scheme { case "ws", "wss": - wst := websocketTracker{*u, webtorrent.NewTrackerClient(t.cl.peerID, t.infoHash, t.onWebRtcConn, - t.logger.WithText(func(m log.Msg) string { - return fmt.Sprintf("%q: %v", u.String(), m.Text()) - }).WithDefaultLevel(log.Debug))} - ar := t.announceRequest(tracker.Started) + wst := websocketTracker{ + *u, + &webtorrent.TrackerClient{ + Url: u.String(), + GetAnnounceRequest: func(event tracker.AnnounceEvent) tracker.AnnounceRequest { + t.cl.lock() + defer t.cl.unlock() + return t.announceRequest(event) + }, + PeerId: t.cl.peerID, + InfoHash: t.infoHash, + OnConn: t.onWebRtcConn, + Logger: t.logger.WithText(func(m log.Msg) string { + return fmt.Sprintf("%q: %v", u.String(), m.Text()) + }).WithDefaultLevel(log.Debug), + }, + } go func() { - err := wst.TrackerClient.Run(ar, u.String()) + err := wst.TrackerClient.Run() if err != nil { t.logger.WithDefaultLevel(log.Warning).Printf( "error running websocket tracker announcer for %q: %v", - u.String(), err) + u.String(), err, + ) } }() return wst diff --git a/webtorrent/tracker_client.go b/webtorrent/tracker_client.go index efa7c8d2..13df300b 100644 --- a/webtorrent/tracker_client.go +++ b/webtorrent/tracker_client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/anacrolix/log" @@ -16,13 +17,24 @@ import ( // Client represents the webtorrent client type TrackerClient struct { + Url string + GetAnnounceRequest func(tracker.AnnounceEvent) tracker.AnnounceRequest + PeerId [20]byte + InfoHash [20]byte + OnConn onDataChannelOpen + Logger log.Logger + lock sync.Mutex - peerIDBinary string - infoHashBinary string outboundOffers map[string]outboundOffer // OfferID to outboundOffer - tracker *websocket.Conn - onConn onDataChannelOpen - logger log.Logger + wsConn *websocket.Conn +} + +func (me *TrackerClient) peerIdBinary() string { + return binaryToJsonString(me.PeerId[:]) +} + +func (me *TrackerClient) infoHashBinary() string { + return binaryToJsonString(me.InfoHash[:]) } // outboundOffer represents an outstanding offer. @@ -43,48 +55,46 @@ type DataChannelContext struct { type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext) -func NewTrackerClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *TrackerClient { - return &TrackerClient{ - outboundOffers: make(map[string]outboundOffer), - peerIDBinary: binaryToJsonString(peerId[:]), - infoHashBinary: binaryToJsonString(infoHash[:]), - onConn: onConn, - logger: logger, - } -} - -func (c *TrackerClient) Run(ar tracker.AnnounceRequest, url string) error { - t, _, err := websocket.DefaultDialer.Dial(url, nil) +func (tc *TrackerClient) doWebsocket() error { + c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil) if err != nil { - return fmt.Errorf("failed to dial tracker: %w", err) + return fmt.Errorf("dialing tracker: %w", err) } - defer t.Close() - c.logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", url) - c.tracker = t - + defer c.Close() + tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url) + tc.wsConn = c go func() { - err := c.announce(ar) + err := tc.announce(tracker.Started) if err != nil { - c.logger.WithDefaultLevel(log.Error).Printf("error announcing: %v", err) + tc.Logger.WithDefaultLevel(log.Error).Printf("error in initial announce: %v", err) } }() - err = c.trackerReadLoop(c.tracker) - c.lock.Lock() - c.lock.Unlock() - c.closeUnusedOffers() + err = tc.trackerReadLoop(tc.wsConn) + tc.lock.Lock() + tc.closeUnusedOffers() + tc.lock.Unlock() return err } -func (c *TrackerClient) closeUnusedOffers() { - for _, offer := range c.outboundOffers { +func (tc *TrackerClient) Run() error { + for { + err := tc.doWebsocket() + tc.Logger.Printf("websocket instance ended: %v", err) + time.Sleep(time.Minute) + } +} + +func (tc *TrackerClient) closeUnusedOffers() { + for _, offer := range tc.outboundOffers { if offer.answered { continue } offer.peerConnection.Close() } + tc.outboundOffers = nil } -func (c *TrackerClient) announce(request tracker.AnnounceRequest) error { +func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error { var randOfferId [20]byte _, err := rand.Read(randOfferId[:]) if err != nil { @@ -97,15 +107,17 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error { return fmt.Errorf("creating offer: %w", err) } + request := tc.GetAnnounceRequest(event) + req := AnnounceRequest{ Numwant: 1, // If higher we need to create equal amount of offers. Uploaded: request.Uploaded, Downloaded: request.Downloaded, Left: request.Left, - Event: "started", + Event: request.Event.String(), Action: "announce", - InfoHash: c.infoHashBinary, - PeerID: c.peerIDBinary, + InfoHash: tc.infoHashBinary(), + PeerID: tc.peerIdBinary(), Offers: []Offer{{ OfferID: offerIDBinary, Offer: offer, @@ -117,16 +129,18 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error { return fmt.Errorf("marshalling request: %w", err) } - c.lock.Lock() - defer c.lock.Unlock() + tc.lock.Lock() + defer tc.lock.Unlock() - err = c.tracker.WriteMessage(websocket.TextMessage, data) + err = tc.wsConn.WriteMessage(websocket.TextMessage, data) if err != nil { pc.Close() return fmt.Errorf("write AnnounceRequest: %w", err) } - - c.outboundOffers[offerIDBinary] = outboundOffer{ + if tc.outboundOffers == nil { + tc.outboundOffers = make(map[string]outboundOffer) + } + tc.outboundOffers[offerIDBinary] = outboundOffer{ peerConnection: pc, dataChannel: dc, originalOffer: offer, @@ -134,34 +148,34 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error { return nil } -func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error { +func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error { for { _, message, err := tracker.ReadMessage() if err != nil { return fmt.Errorf("read message error: %w", err) } - c.logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message) + tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message) var ar AnnounceResponse if err := json.Unmarshal(message, &ar); err != nil { - c.logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err) + tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err) continue } - if ar.InfoHash != c.infoHashBinary { - c.logger.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash) + if ar.InfoHash != tc.infoHashBinary() { + tc.Logger.Printf("announce response for different hash: expected %q got %q", tc.infoHashBinary(), ar.InfoHash) continue } switch { case ar.Offer != nil: - answer, err := getAnswerForOffer(*ar.Offer, c.onConn, ar.OfferID) + answer, err := getAnswerForOffer(*ar.Offer, tc.OnConn, ar.OfferID) if err != nil { return fmt.Errorf("write AnnounceResponse: %w", err) } req := AnnounceResponse{ Action: "announce", - InfoHash: c.infoHashBinary, - PeerID: c.peerIDBinary, + InfoHash: tc.infoHashBinary(), + PeerID: tc.peerIdBinary(), ToPeerID: ar.PeerID, Answer: &answer, OfferID: ar.OfferID, @@ -171,34 +185,40 @@ func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error { return fmt.Errorf("failed to marshal request: %w", err) } - c.lock.Lock() + tc.lock.Lock() err = tracker.WriteMessage(websocket.TextMessage, data) if err != nil { return fmt.Errorf("write AnnounceResponse: %w", err) - c.lock.Unlock() + tc.lock.Unlock() } - c.lock.Unlock() + tc.lock.Unlock() case ar.Answer != nil: - c.lock.Lock() - offer, ok := c.outboundOffers[ar.OfferID] - c.lock.Unlock() - if !ok { - c.logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", ar.OfferID) - continue - } - c.logger.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer) - err = offer.setAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) { - c.onConn(dc, DataChannelContext{ - Local: offer.originalOffer, - Remote: *ar.Answer, - OfferId: ar.OfferID, - LocalOffered: true, - }) - }) - if err != nil { - return fmt.Errorf("failed to sent answer: %w", err) - } - offer.answered = true + tc.handleAnswer(ar.OfferID, *ar.Answer) } } } + +func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) { + tc.lock.Lock() + defer tc.lock.Unlock() + offer, ok := tc.outboundOffers[offerId] + if !ok { + tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId) + return + } + tc.Logger.Printf("offer %q got answer %v", offerId, answer) + err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) { + tc.OnConn(dc, DataChannelContext{ + Local: offer.originalOffer, + Remote: answer, + OfferId: offerId, + LocalOffered: true, + }) + }) + if err != nil { + tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err) + return + } + delete(tc.outboundOffers, offerId) + go tc.announce(tracker.None) +} -- 2.48.1