]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Big tidy up of webtorrent code
authorMatt Joiner <anacrolix@gmail.com>
Tue, 7 Apr 2020 04:30:27 +0000 (14:30 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 7 Apr 2020 04:30:27 +0000 (14:30 +1000)
torrent.go
webrtc.go
webtorrent/client.go
webtorrent/transport.go

index 0cbc7917aafcdafd54dd24c5fefc2c59bc48ae32..81730ef8c6a07a5a0827b16241194da1cd5c2e1a 100644 (file)
@@ -1266,14 +1266,21 @@ func (t *Torrent) seeding() bool {
 
 func (t *Torrent) onWebRtcConn(
        c datachannel.ReadWriteCloser,
-       initiatedLocally bool, // Whether we offered first, or they did.
+       dcc webtorrent.DataChannelContext,
 ) {
        defer c.Close()
-       pc, err := t.cl.handshakesConnection(context.Background(), webrtcNetConn{c}, t, false, nil, "webrtc")
+       pc, err := t.cl.handshakesConnection(
+               context.Background(),
+               webrtcNetConn{c, dcc},
+               t,
+               false,
+               webrtcNetAddr{dcc.Remote},
+               webrtcNetwork,
+       )
        if err != nil {
                t.logger.Printf("error in handshaking webrtc connection: %v", err)
        }
-       if initiatedLocally {
+       if dcc.LocalOffered {
                pc.Discovery = PeerSourceTracker
        } else {
                pc.Discovery = PeerSourceIncoming
@@ -1309,11 +1316,11 @@ func (t *Torrent) startScrapingTracker(_url string) {
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
                case "ws", "wss":
-                       wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn)}
+                       wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn, t.logger)}
                        go func() {
-                               err := wst.Client.Run(t.announceRequest(tracker.Started))
+                               err := wst.Client.Run(t.announceRequest(tracker.Started), u.String())
                                if err != nil {
-                                       t.logger.Printf("error running websocket tracker announcer: %v", err)
+                                       t.logger.WithValues(log.Error).Printf("error running websocket tracker announcer: %v", err)
                                }
                        }()
                        return wst
index d805b54bd3fd5864d9090b2d15ec756e2081f4a2..8afe7db3060db10bf7fac7a537e1a80ef8bdcc59 100644 (file)
--- a/webrtc.go
+++ b/webrtc.go
@@ -5,29 +5,37 @@ import (
        "time"
 
        "github.com/pion/datachannel"
+       "github.com/pion/webrtc/v2"
+
+       "github.com/anacrolix/torrent/webtorrent"
 )
 
+const webrtcNetwork = "webrtc"
+
 type webrtcNetConn struct {
        datachannel.ReadWriteCloser
+       webtorrent.DataChannelContext
 }
 
 type webrtcNetAddr struct {
+       webrtc.SessionDescription
 }
 
 func (webrtcNetAddr) Network() string {
-       return "webrtc"
+       return webrtcNetwork
 }
 
-func (webrtcNetAddr) String() string {
-       return ""
+func (me webrtcNetAddr) String() string {
+       // TODO: What can I show here that's more like other protocols?
+       return "<WebRTC>"
 }
 
-func (w webrtcNetConn) LocalAddr() net.Addr {
-       return webrtcNetAddr{}
+func (me webrtcNetConn) LocalAddr() net.Addr {
+       return webrtcNetAddr{me.Local}
 }
 
-func (w webrtcNetConn) RemoteAddr() net.Addr {
-       return webrtcNetAddr{}
+func (me webrtcNetConn) RemoteAddr() net.Addr {
+       return webrtcNetAddr{me.Remote}
 }
 
 func (w webrtcNetConn) SetDeadline(t time.Time) error {
index d4dd604954caf6fc690cd8e0163196bb1c814ef8..72c1d046aa7155417a18db5a516f4ec73049d330 100644 (file)
@@ -14,24 +14,21 @@ import (
        "github.com/pion/webrtc/v2"
 )
 
-const (
-       trackerURL = `wss://tracker.openwebtorrent.com/` // For simplicity
-)
-
 // Client represents the webtorrent client
 type Client struct {
        lock           sync.Mutex
        peerIDBinary   string
        infoHashBinary string
-       offeredPeers   map[string]Peer // OfferID to Peer
+       outboundOffers map[string]outboundOffer // OfferID to outboundOffer
        tracker        *websocket.Conn
-       onConn         func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
+       onConn         onDataChannelOpen
+       logger         log.Logger
 }
 
-// Peer represents a remote peer
-type Peer struct {
-       peerID    string
-       transport *Transport
+// outboundOffer represents an outstanding offer.
+type outboundOffer struct {
+       originalOffer webrtc.SessionDescription
+       transport     *Transport
 }
 
 func binaryToJsonString(b []byte) string {
@@ -42,33 +39,43 @@ func binaryToJsonString(b []byte) string {
        return string(seq)
 }
 
-type onDataChannelOpen func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
+type DataChannelContext struct {
+       Local, Remote webrtc.SessionDescription
+       LocalOffered  bool
+}
+
+type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
 
-func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen) *Client {
+func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *Client {
        return &Client{
-               offeredPeers:   make(map[string]Peer),
+               outboundOffers: make(map[string]outboundOffer),
                peerIDBinary:   binaryToJsonString(peerId[:]),
                infoHashBinary: binaryToJsonString(infoHash[:]),
                onConn:         onConn,
+               logger:         logger,
        }
 }
 
-func (c *Client) Run(ar tracker.AnnounceRequest) error {
-       t, _, err := websocket.DefaultDialer.Dial(trackerURL, nil)
+func (c *Client) Run(ar tracker.AnnounceRequest, url string) error {
+       t, _, err := websocket.DefaultDialer.Dial(url, nil)
        if err != nil {
-               return fmt.Errorf("failed to dial tracker: %v", err)
+               return fmt.Errorf("failed to dial tracker: %w", err)
        }
        defer t.Close()
+       c.logger.WithValues(log.Info).Printf("dialed tracker %q", url)
        c.tracker = t
 
-       go c.announce(ar)
-       c.trackerReadLoop()
-
-       return nil
+       go func() {
+               err := c.announce(ar)
+               if err != nil {
+                       c.logger.WithValues(log.Error).Printf("error announcing: %v", err)
+               }
+       }()
+       return c.trackerReadLoop()
 }
 
 func (c *Client) announce(request tracker.AnnounceRequest) error {
-       transpot, offer, err := NewTransport()
+       transport, offer, err := NewTransport()
        if err != nil {
                return fmt.Errorf("failed to create transport: %w", err)
        }
@@ -77,11 +84,13 @@ func (c *Client) announce(request tracker.AnnounceRequest) error {
        if err != nil {
                return fmt.Errorf("failed to generate bytes: %w", err)
        }
-       // OfferID := randOfferID.ToStringHex()
        offerIDBinary := randOfferID.ToStringLatin1()
 
        c.lock.Lock()
-       c.offeredPeers[offerIDBinary] = Peer{transport: transpot}
+       c.outboundOffers[offerIDBinary] = outboundOffer{
+               transport:     transport,
+               originalOffer: offer,
+       }
        c.lock.Unlock()
 
        req := AnnounceRequest{
@@ -124,7 +133,7 @@ func (c *Client) trackerReadLoop() error {
                if err != nil {
                        return fmt.Errorf("read error: %w", err)
                }
-               log.Printf("recv: %q", message)
+               c.logger.WithValues(log.Debug).Printf("received message from tracker: %q", message)
 
                var ar AnnounceResponse
                if err := json.Unmarshal(message, &ar); err != nil {
@@ -137,9 +146,7 @@ func (c *Client) trackerReadLoop() error {
                }
                switch {
                case ar.Offer != nil:
-                       t, answer, err := NewTransportFromOffer(*ar.Offer, func(dc datachannel.ReadWriteCloser) {
-                               c.onConn(dc, false)
-                       })
+                       _, answer, err := NewTransportFromOffer(*ar.Offer, c.onConn)
                        if err != nil {
                                return fmt.Errorf("write AnnounceResponse: %w", err)
                        }
@@ -164,20 +171,21 @@ func (c *Client) trackerReadLoop() error {
                                c.lock.Unlock()
                        }
                        c.lock.Unlock()
-
-                       // Do something with the peer
-                       _ = Peer{peerID: ar.PeerID, transport: t}
                case ar.Answer != nil:
                        c.lock.Lock()
-                       peer, ok := c.offeredPeers[ar.OfferID]
+                       offer, ok := c.outboundOffers[ar.OfferID]
                        c.lock.Unlock()
                        if !ok {
-                               log.Printf("could not find peer for offer %q", ar.OfferID)
+                               c.logger.WithValues(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
                                continue
                        }
                        log.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
-                       err = peer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
-                               c.onConn(dc, true)
+                       err = offer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
+                               c.onConn(dc, DataChannelContext{
+                                       Local:        offer.originalOffer,
+                                       Remote:       *ar.Answer,
+                                       LocalOffered: true,
+                               })
                        })
                        if err != nil {
                                return fmt.Errorf("failed to sent answer: %v", err)
index 0bb5701dd58031448aabe89a557ff2b34c81ff36..efaf3157011a3d7427e27919c17a9a1041f99183 100644 (file)
@@ -67,7 +67,7 @@ func NewTransport() (*Transport, webrtc.SessionDescription, error) {
 
 // NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
 // be announced.
-func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) (*Transport, webrtc.SessionDescription, error) {
+func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannelOpen) (*Transport, webrtc.SessionDescription, error) {
        peerConnection, err := newPeerConnection()
        if err != nil {
                return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err)
@@ -77,13 +77,6 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachan
        })
 
        t := &Transport{pc: peerConnection}
-       peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
-               fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
-               t.lock.Lock()
-               t.dc = d
-               t.lock.Unlock()
-               t.handleOpen(onOpen)
-       })
 
        err = peerConnection.SetRemoteDescription(offer)
        if err != nil {
@@ -93,6 +86,15 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachan
        if err != nil {
                return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
        }
+       peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
+               fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
+               t.lock.Lock()
+               t.dc = d
+               t.lock.Unlock()
+               t.handleOpen(func(dc datachannel.ReadWriteCloser) {
+                       onOpen(dc, DataChannelContext{answer, offer, false})
+               })
+       })
        err = peerConnection.SetLocalDescription(answer)
        if err != nil {
                return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)