package webtorrent
import (
+ "context"
"crypto/rand"
"encoding/json"
"fmt"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/trace"
"sync"
"time"
type outboundOffer struct {
originalOffer webrtc.SessionDescription
peerConnection *wrappedPeerConnection
- dataChannel *webrtc.DataChannel
infoHash [20]byte
}
InfoHash [20]byte
// This is private as some methods might not be appropriate with data channel context.
peerConnection *wrappedPeerConnection
+ span trace.Span
+ ctx context.Context
}
func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
}
offerIDBinary := binaryToJsonString(randOfferId[:])
- pc, dc, offer, err := newOffer()
+ pc, offer, err := newOffer(tc.Logger)
if err != nil {
return fmt.Errorf("creating offer: %w", err)
}
}
tc.outboundOffers[offerIDBinary] = outboundOffer{
peerConnection: pc,
- dataChannel: dc,
originalOffer: offer,
infoHash: infoHash,
}
infoHash [20]byte,
peerId string,
) error {
- peerConnection, answer, err := newAnsweringPeerConnection(offer)
+ peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
return fmt.Errorf("writing response: %w", err)
}
timer := time.AfterFunc(30*time.Second, func() {
+ peerConnection.span.SetStatus(codes.Error, "answer timeout")
metrics.Add("answering peer connections timed out", 1)
peerConnection.Close()
})
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
- setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
+ ctx, span := dataChannelStarted(peerConnection.ctx, d)
+ setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
LocalOffered: false,
InfoHash: infoHash,
peerConnection: peerConnection,
+ ctx: ctx,
+ span: span,
})
})
})
}
// tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
metrics.Add("outbound offers answered", 1)
- err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
- metrics.Add("outbound offers answered with datachannel", 1)
- tc.mu.Lock()
- tc.stats.ConvertedOutboundConns++
- tc.mu.Unlock()
- tc.OnConn(dc, DataChannelContext{
- Local: offer.originalOffer,
- Remote: answer,
- OfferId: offerId,
- LocalOffered: true,
- InfoHash: offer.infoHash,
- peerConnection: offer.peerConnection,
+ // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
+ // initiating?
+ dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+ if err != nil {
+ err = fmt.Errorf("creating data channel: %w", err)
+ tc.Logger.LevelPrint(log.Error, err)
+ offer.peerConnection.span.RecordError(err)
+ offer.peerConnection.Close()
+ goto deleteOffer
+ }
+ {
+ ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
+ setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
+ metrics.Add("outbound offers answered with datachannel", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedOutboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(dc, DataChannelContext{
+ Local: offer.originalOffer,
+ Remote: answer,
+ OfferId: offerId,
+ LocalOffered: true,
+ InfoHash: offer.infoHash,
+ peerConnection: offer.peerConnection,
+ ctx: ctx,
+ span: span,
+ })
})
- })
+ }
+ err = offer.peerConnection.SetRemoteDescription(answer)
if err != nil {
- tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+ err = fmt.Errorf("using outbound offer answer: %w", err)
+ offer.peerConnection.span.RecordError(err)
+ dataChannel.Close()
+ tc.Logger.WithDefaultLevel(log.Error).Print(err)
return
}
+deleteOffer:
delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash)
}