From: Matt Joiner Date: Tue, 7 Apr 2020 04:30:27 +0000 (+1000) Subject: Big tidy up of webtorrent code X-Git-Tag: v1.16.0~85^2~11 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=6f2c65fe334a8565b606829431a378a0c94f71d3;p=btrtrc.git Big tidy up of webtorrent code --- diff --git a/torrent.go b/torrent.go index 0cbc7917..81730ef8 100644 --- a/torrent.go +++ b/torrent.go @@ -1266,14 +1266,21 @@ func (t *Torrent) seeding() bool { func (t *Torrent) onWebRtcConn( c datachannel.ReadWriteCloser, - initiatedLocally bool, // Whether we offered first, or they did. + dcc webtorrent.DataChannelContext, ) { defer c.Close() - pc, err := t.cl.handshakesConnection(context.Background(), webrtcNetConn{c}, t, false, nil, "webrtc") + pc, err := t.cl.handshakesConnection( + context.Background(), + webrtcNetConn{c, dcc}, + t, + false, + webrtcNetAddr{dcc.Remote}, + webrtcNetwork, + ) if err != nil { t.logger.Printf("error in handshaking webrtc connection: %v", err) } - if initiatedLocally { + if dcc.LocalOffered { pc.Discovery = PeerSourceTracker } else { pc.Discovery = PeerSourceIncoming @@ -1309,11 +1316,11 @@ func (t *Torrent) startScrapingTracker(_url string) { sl := func() torrentTrackerAnnouncer { switch u.Scheme { case "ws", "wss": - wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn)} + wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn, t.logger)} go func() { - err := wst.Client.Run(t.announceRequest(tracker.Started)) + err := wst.Client.Run(t.announceRequest(tracker.Started), u.String()) if err != nil { - t.logger.Printf("error running websocket tracker announcer: %v", err) + t.logger.WithValues(log.Error).Printf("error running websocket tracker announcer: %v", err) } }() return wst diff --git a/webrtc.go b/webrtc.go index d805b54b..8afe7db3 100644 --- a/webrtc.go +++ b/webrtc.go @@ -5,29 +5,37 @@ import ( "time" "github.com/pion/datachannel" + "github.com/pion/webrtc/v2" + + "github.com/anacrolix/torrent/webtorrent" ) +const webrtcNetwork = "webrtc" + type webrtcNetConn struct { datachannel.ReadWriteCloser + webtorrent.DataChannelContext } type webrtcNetAddr struct { + webrtc.SessionDescription } func (webrtcNetAddr) Network() string { - return "webrtc" + return webrtcNetwork } -func (webrtcNetAddr) String() string { - return "" +func (me webrtcNetAddr) String() string { + // TODO: What can I show here that's more like other protocols? + return "" } -func (w webrtcNetConn) LocalAddr() net.Addr { - return webrtcNetAddr{} +func (me webrtcNetConn) LocalAddr() net.Addr { + return webrtcNetAddr{me.Local} } -func (w webrtcNetConn) RemoteAddr() net.Addr { - return webrtcNetAddr{} +func (me webrtcNetConn) RemoteAddr() net.Addr { + return webrtcNetAddr{me.Remote} } func (w webrtcNetConn) SetDeadline(t time.Time) error { diff --git a/webtorrent/client.go b/webtorrent/client.go index d4dd6049..72c1d046 100644 --- a/webtorrent/client.go +++ b/webtorrent/client.go @@ -14,24 +14,21 @@ import ( "github.com/pion/webrtc/v2" ) -const ( - trackerURL = `wss://tracker.openwebtorrent.com/` // For simplicity -) - // Client represents the webtorrent client type Client struct { lock sync.Mutex peerIDBinary string infoHashBinary string - offeredPeers map[string]Peer // OfferID to Peer + outboundOffers map[string]outboundOffer // OfferID to outboundOffer tracker *websocket.Conn - onConn func(_ datachannel.ReadWriteCloser, initiatedLocally bool) + onConn onDataChannelOpen + logger log.Logger } -// Peer represents a remote peer -type Peer struct { - peerID string - transport *Transport +// outboundOffer represents an outstanding offer. +type outboundOffer struct { + originalOffer webrtc.SessionDescription + transport *Transport } func binaryToJsonString(b []byte) string { @@ -42,33 +39,43 @@ func binaryToJsonString(b []byte) string { return string(seq) } -type onDataChannelOpen func(_ datachannel.ReadWriteCloser, initiatedLocally bool) +type DataChannelContext struct { + Local, Remote webrtc.SessionDescription + LocalOffered bool +} + +type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext) -func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen) *Client { +func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *Client { return &Client{ - offeredPeers: make(map[string]Peer), + outboundOffers: make(map[string]outboundOffer), peerIDBinary: binaryToJsonString(peerId[:]), infoHashBinary: binaryToJsonString(infoHash[:]), onConn: onConn, + logger: logger, } } -func (c *Client) Run(ar tracker.AnnounceRequest) error { - t, _, err := websocket.DefaultDialer.Dial(trackerURL, nil) +func (c *Client) Run(ar tracker.AnnounceRequest, url string) error { + t, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { - return fmt.Errorf("failed to dial tracker: %v", err) + return fmt.Errorf("failed to dial tracker: %w", err) } defer t.Close() + c.logger.WithValues(log.Info).Printf("dialed tracker %q", url) c.tracker = t - go c.announce(ar) - c.trackerReadLoop() - - return nil + go func() { + err := c.announce(ar) + if err != nil { + c.logger.WithValues(log.Error).Printf("error announcing: %v", err) + } + }() + return c.trackerReadLoop() } func (c *Client) announce(request tracker.AnnounceRequest) error { - transpot, offer, err := NewTransport() + transport, offer, err := NewTransport() if err != nil { return fmt.Errorf("failed to create transport: %w", err) } @@ -77,11 +84,13 @@ func (c *Client) announce(request tracker.AnnounceRequest) error { if err != nil { return fmt.Errorf("failed to generate bytes: %w", err) } - // OfferID := randOfferID.ToStringHex() offerIDBinary := randOfferID.ToStringLatin1() c.lock.Lock() - c.offeredPeers[offerIDBinary] = Peer{transport: transpot} + c.outboundOffers[offerIDBinary] = outboundOffer{ + transport: transport, + originalOffer: offer, + } c.lock.Unlock() req := AnnounceRequest{ @@ -124,7 +133,7 @@ func (c *Client) trackerReadLoop() error { if err != nil { return fmt.Errorf("read error: %w", err) } - log.Printf("recv: %q", message) + c.logger.WithValues(log.Debug).Printf("received message from tracker: %q", message) var ar AnnounceResponse if err := json.Unmarshal(message, &ar); err != nil { @@ -137,9 +146,7 @@ func (c *Client) trackerReadLoop() error { } switch { case ar.Offer != nil: - t, answer, err := NewTransportFromOffer(*ar.Offer, func(dc datachannel.ReadWriteCloser) { - c.onConn(dc, false) - }) + _, answer, err := NewTransportFromOffer(*ar.Offer, c.onConn) if err != nil { return fmt.Errorf("write AnnounceResponse: %w", err) } @@ -164,20 +171,21 @@ func (c *Client) trackerReadLoop() error { c.lock.Unlock() } c.lock.Unlock() - - // Do something with the peer - _ = Peer{peerID: ar.PeerID, transport: t} case ar.Answer != nil: c.lock.Lock() - peer, ok := c.offeredPeers[ar.OfferID] + offer, ok := c.outboundOffers[ar.OfferID] c.lock.Unlock() if !ok { - log.Printf("could not find peer for offer %q", ar.OfferID) + c.logger.WithValues(log.Warning).Printf("could not find offer for id %q", ar.OfferID) continue } log.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer) - err = peer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) { - c.onConn(dc, true) + err = offer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) { + c.onConn(dc, DataChannelContext{ + Local: offer.originalOffer, + Remote: *ar.Answer, + LocalOffered: true, + }) }) if err != nil { return fmt.Errorf("failed to sent answer: %v", err) diff --git a/webtorrent/transport.go b/webtorrent/transport.go index 0bb5701d..efaf3157 100644 --- a/webtorrent/transport.go +++ b/webtorrent/transport.go @@ -67,7 +67,7 @@ func NewTransport() (*Transport, webrtc.SessionDescription, error) { // NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to // be announced. -func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) (*Transport, webrtc.SessionDescription, error) { +func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannelOpen) (*Transport, webrtc.SessionDescription, error) { peerConnection, err := newPeerConnection() if err != nil { return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err) @@ -77,13 +77,6 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachan }) t := &Transport{pc: peerConnection} - peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { - fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID()) - t.lock.Lock() - t.dc = d - t.lock.Unlock() - t.handleOpen(onOpen) - }) err = peerConnection.SetRemoteDescription(offer) if err != nil { @@ -93,6 +86,15 @@ func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachan if err != nil { return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err) } + peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { + fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID()) + t.lock.Lock() + t.dc = d + t.lock.Unlock() + t.handleOpen(func(dc datachannel.ReadWriteCloser) { + onOpen(dc, DataChannelContext{answer, offer, false}) + }) + }) err = peerConnection.SetLocalDescription(answer) if err != nil { return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)