]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Log webrtc connection errors and rework webtorrent.TrackerClient
authorMatt Joiner <anacrolix@gmail.com>
Mon, 20 Apr 2020 03:42:25 +0000 (13:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 20 Apr 2020 03:42:25 +0000 (13:42 +1000)
client.go
torrent.go
webtorrent/tracker_client.go

index f1e6c14ef6fd0b3bf7a52951a70066c43d4e957d..c4e81190e8b0778418f12b77de588eda3709a262 100644 (file)
--- a/client.go
+++ b/client.go
@@ -713,7 +713,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, t
        defer c.close()
        c.Discovery = ps
        c.trusted = trusted
-       cl.runHandshookConn(c, t)
+       t.runHandshookConnLoggingErr(c)
 }
 
 // The port number for incoming peer connections. 0 if the client isn't listening.
@@ -851,11 +851,11 @@ func (cl *Client) runReceivedConn(c *PeerConn) {
        torrent.Add("received handshake for loaded torrent", 1)
        cl.lock()
        defer cl.unlock()
-       cl.runHandshookConn(c, t)
+       t.runHandshookConnLoggingErr(c)
 }
 
 // Client lock must be held before entering this.
-func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
+func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        c.setTorrent(t)
        if c.PeerID == cl.peerID {
                if c.outgoing {
@@ -867,7 +867,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
                        // address, we won't record the remote address as a doppleganger. Instead, the initiator
                        // can record *us* as the doppleganger.
                }
-               return
+               return errors.New("local and remote peer ids are the same")
        }
        c.conn.SetWriteDeadline(time.Time{})
        c.r = deadlineReader{c.conn, c.r}
@@ -876,16 +876,12 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
                torrent.Add("completed handshake over ipv6", 1)
        }
        if err := t.addConnection(c); err != nil {
-               log.Fmsg("error adding connection: %s", err).AddValues(c).SetLevel(log.Debug).Log(t.logger)
-               return
+               return fmt.Errorf("adding connection: %w", err)
        }
        defer t.dropConnection(c)
        go c.writer(time.Minute)
        cl.sendInitialMessages(c, t)
-       err := c.mainReadLoop()
-       if err != nil && cl.config.Debug {
-               cl.logger.Printf("error during connection main read loop: %s", err)
-       }
+       return fmt.Errorf("main read loop: %w", c.mainReadLoop())
 }
 
 // See the order given in Transmission's tr_peerMsgsNew.
index 998a05bc042afd950b6719995a0f75b7800019b0..63952351622d1fb00e5cd94f50fe1f4dc791f97a 100644 (file)
@@ -1297,7 +1297,8 @@ func (t *Torrent) onWebRtcConn(
                fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
        )
        if err != nil {
-               t.logger.Printf("error in handshaking webrtc connection: %v", err)
+               t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
+               return
        }
        if dcc.LocalOffered {
                pc.Discovery = PeerSourceTracker
@@ -1306,7 +1307,19 @@ func (t *Torrent) onWebRtcConn(
        }
        t.cl.lock()
        defer t.cl.unlock()
-       t.cl.runHandshookConn(pc, t)
+       err = t.cl.runHandshookConn(pc, t)
+       t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
+}
+
+func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
+       err := t.cl.runHandshookConn(pc, t)
+       if err != nil || logAll {
+               t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+       }
+}
+
+func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
+       t.logRunHandshookConn(pc, false, log.Debug)
 }
 
 func (t *Torrent) startScrapingTracker(_url string) {
@@ -1335,17 +1348,30 @@ func (t *Torrent) startScrapingTracker(_url string) {
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
                case "ws", "wss":
-                       wst := websocketTracker{*u, webtorrent.NewTrackerClient(t.cl.peerID, t.infoHash, t.onWebRtcConn,
-                               t.logger.WithText(func(m log.Msg) string {
-                                       return fmt.Sprintf("%q: %v", u.String(), m.Text())
-                               }).WithDefaultLevel(log.Debug))}
-                       ar := t.announceRequest(tracker.Started)
+                       wst := websocketTracker{
+                               *u,
+                               &webtorrent.TrackerClient{
+                                       Url: u.String(),
+                                       GetAnnounceRequest: func(event tracker.AnnounceEvent) tracker.AnnounceRequest {
+                                               t.cl.lock()
+                                               defer t.cl.unlock()
+                                               return t.announceRequest(event)
+                                       },
+                                       PeerId:   t.cl.peerID,
+                                       InfoHash: t.infoHash,
+                                       OnConn:   t.onWebRtcConn,
+                                       Logger: t.logger.WithText(func(m log.Msg) string {
+                                               return fmt.Sprintf("%q: %v", u.String(), m.Text())
+                                       }).WithDefaultLevel(log.Debug),
+                               },
+                       }
                        go func() {
-                               err := wst.TrackerClient.Run(ar, u.String())
+                               err := wst.TrackerClient.Run()
                                if err != nil {
                                        t.logger.WithDefaultLevel(log.Warning).Printf(
                                                "error running websocket tracker announcer for %q: %v",
-                                               u.String(), err)
+                                               u.String(), err,
+                                       )
                                }
                        }()
                        return wst
index efa7c8d2255b17dd6ba4ccc5e2a4292ed699980e..13df300b88eea900286cf4199b06657e4194dbec 100644 (file)
@@ -5,6 +5,7 @@ import (
        "encoding/json"
        "fmt"
        "sync"
+       "time"
 
        "github.com/anacrolix/log"
 
@@ -16,13 +17,24 @@ import (
 
 // Client represents the webtorrent client
 type TrackerClient struct {
+       Url                string
+       GetAnnounceRequest func(tracker.AnnounceEvent) tracker.AnnounceRequest
+       PeerId             [20]byte
+       InfoHash           [20]byte
+       OnConn             onDataChannelOpen
+       Logger             log.Logger
+
        lock           sync.Mutex
-       peerIDBinary   string
-       infoHashBinary string
        outboundOffers map[string]outboundOffer // OfferID to outboundOffer
-       tracker        *websocket.Conn
-       onConn         onDataChannelOpen
-       logger         log.Logger
+       wsConn         *websocket.Conn
+}
+
+func (me *TrackerClient) peerIdBinary() string {
+       return binaryToJsonString(me.PeerId[:])
+}
+
+func (me *TrackerClient) infoHashBinary() string {
+       return binaryToJsonString(me.InfoHash[:])
 }
 
 // outboundOffer represents an outstanding offer.
@@ -43,48 +55,46 @@ type DataChannelContext struct {
 
 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
 
-func NewTrackerClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *TrackerClient {
-       return &TrackerClient{
-               outboundOffers: make(map[string]outboundOffer),
-               peerIDBinary:   binaryToJsonString(peerId[:]),
-               infoHashBinary: binaryToJsonString(infoHash[:]),
-               onConn:         onConn,
-               logger:         logger,
-       }
-}
-
-func (c *TrackerClient) Run(ar tracker.AnnounceRequest, url string) error {
-       t, _, err := websocket.DefaultDialer.Dial(url, nil)
+func (tc *TrackerClient) doWebsocket() error {
+       c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
        if err != nil {
-               return fmt.Errorf("failed to dial tracker: %w", err)
+               return fmt.Errorf("dialing tracker: %w", err)
        }
-       defer t.Close()
-       c.logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", url)
-       c.tracker = t
-
+       defer c.Close()
+       tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
+       tc.wsConn = c
        go func() {
-               err := c.announce(ar)
+               err := tc.announce(tracker.Started)
                if err != nil {
-                       c.logger.WithDefaultLevel(log.Error).Printf("error announcing: %v", err)
+                       tc.Logger.WithDefaultLevel(log.Error).Printf("error in initial announce: %v", err)
                }
        }()
-       err = c.trackerReadLoop(c.tracker)
-       c.lock.Lock()
-       c.lock.Unlock()
-       c.closeUnusedOffers()
+       err = tc.trackerReadLoop(tc.wsConn)
+       tc.lock.Lock()
+       tc.closeUnusedOffers()
+       tc.lock.Unlock()
        return err
 }
 
-func (c *TrackerClient) closeUnusedOffers() {
-       for _, offer := range c.outboundOffers {
+func (tc *TrackerClient) Run() error {
+       for {
+               err := tc.doWebsocket()
+               tc.Logger.Printf("websocket instance ended: %v", err)
+               time.Sleep(time.Minute)
+       }
+}
+
+func (tc *TrackerClient) closeUnusedOffers() {
+       for _, offer := range tc.outboundOffers {
                if offer.answered {
                        continue
                }
                offer.peerConnection.Close()
        }
+       tc.outboundOffers = nil
 }
 
-func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
+func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
        var randOfferId [20]byte
        _, err := rand.Read(randOfferId[:])
        if err != nil {
@@ -97,15 +107,17 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
                return fmt.Errorf("creating offer: %w", err)
        }
 
+       request := tc.GetAnnounceRequest(event)
+
        req := AnnounceRequest{
                Numwant:    1, // If higher we need to create equal amount of offers.
                Uploaded:   request.Uploaded,
                Downloaded: request.Downloaded,
                Left:       request.Left,
-               Event:      "started",
+               Event:      request.Event.String(),
                Action:     "announce",
-               InfoHash:   c.infoHashBinary,
-               PeerID:     c.peerIDBinary,
+               InfoHash:   tc.infoHashBinary(),
+               PeerID:     tc.peerIdBinary(),
                Offers: []Offer{{
                        OfferID: offerIDBinary,
                        Offer:   offer,
@@ -117,16 +129,18 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
                return fmt.Errorf("marshalling request: %w", err)
        }
 
-       c.lock.Lock()
-       defer c.lock.Unlock()
+       tc.lock.Lock()
+       defer tc.lock.Unlock()
 
-       err = c.tracker.WriteMessage(websocket.TextMessage, data)
+       err = tc.wsConn.WriteMessage(websocket.TextMessage, data)
        if err != nil {
                pc.Close()
                return fmt.Errorf("write AnnounceRequest: %w", err)
        }
-
-       c.outboundOffers[offerIDBinary] = outboundOffer{
+       if tc.outboundOffers == nil {
+               tc.outboundOffers = make(map[string]outboundOffer)
+       }
+       tc.outboundOffers[offerIDBinary] = outboundOffer{
                peerConnection: pc,
                dataChannel:    dc,
                originalOffer:  offer,
@@ -134,34 +148,34 @@ func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
        return nil
 }
 
-func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
+func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
        for {
                _, message, err := tracker.ReadMessage()
                if err != nil {
                        return fmt.Errorf("read message error: %w", err)
                }
-               c.logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
+               tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
 
                var ar AnnounceResponse
                if err := json.Unmarshal(message, &ar); err != nil {
-                       c.logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
+                       tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
                        continue
                }
-               if ar.InfoHash != c.infoHashBinary {
-                       c.logger.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash)
+               if ar.InfoHash != tc.infoHashBinary() {
+                       tc.Logger.Printf("announce response for different hash: expected %q got %q", tc.infoHashBinary(), ar.InfoHash)
                        continue
                }
                switch {
                case ar.Offer != nil:
-                       answer, err := getAnswerForOffer(*ar.Offer, c.onConn, ar.OfferID)
+                       answer, err := getAnswerForOffer(*ar.Offer, tc.OnConn, ar.OfferID)
                        if err != nil {
                                return fmt.Errorf("write AnnounceResponse: %w", err)
                        }
 
                        req := AnnounceResponse{
                                Action:   "announce",
-                               InfoHash: c.infoHashBinary,
-                               PeerID:   c.peerIDBinary,
+                               InfoHash: tc.infoHashBinary(),
+                               PeerID:   tc.peerIdBinary(),
                                ToPeerID: ar.PeerID,
                                Answer:   &answer,
                                OfferID:  ar.OfferID,
@@ -171,34 +185,40 @@ func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
                                return fmt.Errorf("failed to marshal request: %w", err)
                        }
 
-                       c.lock.Lock()
+                       tc.lock.Lock()
                        err = tracker.WriteMessage(websocket.TextMessage, data)
                        if err != nil {
                                return fmt.Errorf("write AnnounceResponse: %w", err)
-                               c.lock.Unlock()
+                               tc.lock.Unlock()
                        }
-                       c.lock.Unlock()
+                       tc.lock.Unlock()
                case ar.Answer != nil:
-                       c.lock.Lock()
-                       offer, ok := c.outboundOffers[ar.OfferID]
-                       c.lock.Unlock()
-                       if !ok {
-                               c.logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
-                               continue
-                       }
-                       c.logger.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
-                       err = offer.setAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
-                               c.onConn(dc, DataChannelContext{
-                                       Local:        offer.originalOffer,
-                                       Remote:       *ar.Answer,
-                                       OfferId:      ar.OfferID,
-                                       LocalOffered: true,
-                               })
-                       })
-                       if err != nil {
-                               return fmt.Errorf("failed to sent answer: %w", err)
-                       }
-                       offer.answered = true
+                       tc.handleAnswer(ar.OfferID, *ar.Answer)
                }
        }
 }
+
+func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
+       tc.lock.Lock()
+       defer tc.lock.Unlock()
+       offer, ok := tc.outboundOffers[offerId]
+       if !ok {
+               tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
+               return
+       }
+       tc.Logger.Printf("offer %q got answer %v", offerId, answer)
+       err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
+               tc.OnConn(dc, DataChannelContext{
+                       Local:        offer.originalOffer,
+                       Remote:       answer,
+                       OfferId:      offerId,
+                       LocalOffered: true,
+               })
+       })
+       if err != nil {
+               tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+               return
+       }
+       delete(tc.outboundOffers, offerId)
+       go tc.announce(tracker.None)
+}