func (t *Torrent) onWebRtcConn(
c datachannel.ReadWriteCloser,
- initiatedLocally bool, // Whether we offered first, or they did.
+ dcc webtorrent.DataChannelContext,
) {
defer c.Close()
- pc, err := t.cl.handshakesConnection(context.Background(), webrtcNetConn{c}, t, false, nil, "webrtc")
+ pc, err := t.cl.handshakesConnection(
+ context.Background(),
+ webrtcNetConn{c, dcc},
+ t,
+ false,
+ webrtcNetAddr{dcc.Remote},
+ webrtcNetwork,
+ )
if err != nil {
t.logger.Printf("error in handshaking webrtc connection: %v", err)
}
- if initiatedLocally {
+ if dcc.LocalOffered {
pc.Discovery = PeerSourceTracker
} else {
pc.Discovery = PeerSourceIncoming
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
case "ws", "wss":
- wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn)}
+ wst := websocketTracker{*u, webtorrent.NewClient(t.cl.peerID, t.infoHash, t.onWebRtcConn, t.logger)}
go func() {
- err := wst.Client.Run(t.announceRequest(tracker.Started))
+ err := wst.Client.Run(t.announceRequest(tracker.Started), u.String())
if err != nil {
- t.logger.Printf("error running websocket tracker announcer: %v", err)
+ t.logger.WithValues(log.Error).Printf("error running websocket tracker announcer: %v", err)
}
}()
return wst
"time"
"github.com/pion/datachannel"
+ "github.com/pion/webrtc/v2"
+
+ "github.com/anacrolix/torrent/webtorrent"
)
+const webrtcNetwork = "webrtc"
+
type webrtcNetConn struct {
datachannel.ReadWriteCloser
+ webtorrent.DataChannelContext
}
type webrtcNetAddr struct {
+ webrtc.SessionDescription
}
func (webrtcNetAddr) Network() string {
- return "webrtc"
+ return webrtcNetwork
}
-func (webrtcNetAddr) String() string {
- return ""
+func (me webrtcNetAddr) String() string {
+ // TODO: What can I show here that's more like other protocols?
+ return "<WebRTC>"
}
-func (w webrtcNetConn) LocalAddr() net.Addr {
- return webrtcNetAddr{}
+func (me webrtcNetConn) LocalAddr() net.Addr {
+ return webrtcNetAddr{me.Local}
}
-func (w webrtcNetConn) RemoteAddr() net.Addr {
- return webrtcNetAddr{}
+func (me webrtcNetConn) RemoteAddr() net.Addr {
+ return webrtcNetAddr{me.Remote}
}
func (w webrtcNetConn) SetDeadline(t time.Time) error {
"github.com/pion/webrtc/v2"
)
-const (
- trackerURL = `wss://tracker.openwebtorrent.com/` // For simplicity
-)
-
// Client represents the webtorrent client
type Client struct {
lock sync.Mutex
peerIDBinary string
infoHashBinary string
- offeredPeers map[string]Peer // OfferID to Peer
+ outboundOffers map[string]outboundOffer // OfferID to outboundOffer
tracker *websocket.Conn
- onConn func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
+ onConn onDataChannelOpen
+ logger log.Logger
}
-// Peer represents a remote peer
-type Peer struct {
- peerID string
- transport *Transport
+// outboundOffer represents an outstanding offer.
+type outboundOffer struct {
+ originalOffer webrtc.SessionDescription
+ transport *Transport
}
func binaryToJsonString(b []byte) string {
return string(seq)
}
-type onDataChannelOpen func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
+type DataChannelContext struct {
+ Local, Remote webrtc.SessionDescription
+ LocalOffered bool
+}
+
+type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
-func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen) *Client {
+func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *Client {
return &Client{
- offeredPeers: make(map[string]Peer),
+ outboundOffers: make(map[string]outboundOffer),
peerIDBinary: binaryToJsonString(peerId[:]),
infoHashBinary: binaryToJsonString(infoHash[:]),
onConn: onConn,
+ logger: logger,
}
}
-func (c *Client) Run(ar tracker.AnnounceRequest) error {
- t, _, err := websocket.DefaultDialer.Dial(trackerURL, nil)
+func (c *Client) Run(ar tracker.AnnounceRequest, url string) error {
+ t, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
- return fmt.Errorf("failed to dial tracker: %v", err)
+ return fmt.Errorf("failed to dial tracker: %w", err)
}
defer t.Close()
+ c.logger.WithValues(log.Info).Printf("dialed tracker %q", url)
c.tracker = t
- go c.announce(ar)
- c.trackerReadLoop()
-
- return nil
+ go func() {
+ err := c.announce(ar)
+ if err != nil {
+ c.logger.WithValues(log.Error).Printf("error announcing: %v", err)
+ }
+ }()
+ return c.trackerReadLoop()
}
func (c *Client) announce(request tracker.AnnounceRequest) error {
- transpot, offer, err := NewTransport()
+ transport, offer, err := NewTransport()
if err != nil {
return fmt.Errorf("failed to create transport: %w", err)
}
if err != nil {
return fmt.Errorf("failed to generate bytes: %w", err)
}
- // OfferID := randOfferID.ToStringHex()
offerIDBinary := randOfferID.ToStringLatin1()
c.lock.Lock()
- c.offeredPeers[offerIDBinary] = Peer{transport: transpot}
+ c.outboundOffers[offerIDBinary] = outboundOffer{
+ transport: transport,
+ originalOffer: offer,
+ }
c.lock.Unlock()
req := AnnounceRequest{
if err != nil {
return fmt.Errorf("read error: %w", err)
}
- log.Printf("recv: %q", message)
+ c.logger.WithValues(log.Debug).Printf("received message from tracker: %q", message)
var ar AnnounceResponse
if err := json.Unmarshal(message, &ar); err != nil {
}
switch {
case ar.Offer != nil:
- t, answer, err := NewTransportFromOffer(*ar.Offer, func(dc datachannel.ReadWriteCloser) {
- c.onConn(dc, false)
- })
+ _, answer, err := NewTransportFromOffer(*ar.Offer, c.onConn)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
c.lock.Unlock()
}
c.lock.Unlock()
-
- // Do something with the peer
- _ = Peer{peerID: ar.PeerID, transport: t}
case ar.Answer != nil:
c.lock.Lock()
- peer, ok := c.offeredPeers[ar.OfferID]
+ offer, ok := c.outboundOffers[ar.OfferID]
c.lock.Unlock()
if !ok {
- log.Printf("could not find peer for offer %q", ar.OfferID)
+ c.logger.WithValues(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
continue
}
log.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
- err = peer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
- c.onConn(dc, true)
+ err = offer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
+ c.onConn(dc, DataChannelContext{
+ Local: offer.originalOffer,
+ Remote: *ar.Answer,
+ LocalOffered: true,
+ })
})
if err != nil {
return fmt.Errorf("failed to sent answer: %v", err)
// NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
// be announced.
-func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) (*Transport, webrtc.SessionDescription, error) {
+func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen onDataChannelOpen) (*Transport, webrtc.SessionDescription, error) {
peerConnection, err := newPeerConnection()
if err != nil {
return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err)
})
t := &Transport{pc: peerConnection}
- peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
- fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
- t.lock.Lock()
- t.dc = d
- t.lock.Unlock()
- t.handleOpen(onOpen)
- })
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
if err != nil {
return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
}
+ peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
+ fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
+ t.lock.Lock()
+ t.dc = d
+ t.lock.Unlock()
+ t.handleOpen(func(dc datachannel.ReadWriteCloser) {
+ onOpen(dc, DataChannelContext{answer, offer, false})
+ })
+ })
err = peerConnection.SetLocalDescription(answer)
if err != nil {
return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)