"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
"github.com/pion/datachannel"
+ "github.com/pion/webrtc/v3"
"golang.org/x/sync/errgroup"
"github.com/anacrolix/torrent/bencode"
)
}
+type webRtcStatsReports map[string]webrtc.StatsReport
+
+func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports {
+ stats := make(map[string]webRtcStatsReports)
+ trackersMap := t.cl.websocketTrackers.clients
+ for i, trackerClient := range trackersMap {
+ ts := trackerClient.RtcPeerConnStats()
+ stats[i] = ts
+ }
+ return stats
+}
+
type requestState struct {
peer *Peer
when time.Time
WebsocketTrackerHttpHeader func() http.Header
ICEServers []webrtc.ICEServer
+
+ rtcPeerConns map[string]*wrappedPeerConnection
}
func (me *TrackerClient) Stats() TrackerClientStats {
return fmt.Errorf("creating offer: %w", err)
}
+ // save the leecher peer connections
+ tc.storePeerConnection(fmt.Sprintf("%x", randOfferId[:]), pc)
+
+ pc.OnClose(func() {
+ delete(tc.rtcPeerConns, offerIDBinary)
+ })
+
tc.Logger.Levelf(log.Debug, "announcing offer")
- err = tc.announce(event, infoHash, []outboundOffer{
- {
- offerId: offerIDBinary,
- outboundOfferValue: outboundOfferValue{
- originalOffer: offer,
- peerConnection: pc,
- infoHash: infoHash,
- dataChannel: dc,
- },
- },
+ err = tc.announce(event, infoHash, []outboundOffer{{
+ offerId: offerIDBinary,
+ outboundOfferValue: outboundOfferValue{
+ originalOffer: offer,
+ peerConnection: pc,
+ infoHash: infoHash,
+ dataChannel: dc,
+ }},
})
if err != nil {
dc.Close()
return nil
}
+// Calculate the stats for all the peer connections the moment they are requested.
+// As the stats will change over the life of a peer connection, this ensures that
+// the updated values are returned.
+func (tc *TrackerClient) RtcPeerConnStats() map[string]webrtc.StatsReport {
+ tc.mu.Lock()
+ defer tc.mu.Unlock()
+ sr := make(map[string]webrtc.StatsReport)
+ for id, pc := range tc.rtcPeerConns {
+ sr[id] = GetPeerConnStats(pc)
+ }
+ return sr
+}
+
func (tc *TrackerClient) writeMessage(data []byte) error {
for tc.wsConn == nil {
if tc.closed {
if err != nil {
return fmt.Errorf("creating answering peer connection: %w", err)
}
+
+ // save the seeder peer connections
+ tc.storePeerConnection(fmt.Sprintf("%x", offerContext.Id[:]), peerConnection)
+
response := AnnounceResponse{
Action: "announce",
InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash)
}
+
+func (tc *TrackerClient) storePeerConnection(offerId string, pc *wrappedPeerConnection) {
+ tc.mu.Lock()
+ defer tc.mu.Unlock()
+ if tc.rtcPeerConns == nil {
+ tc.rtcPeerConns = make(map[string]*wrappedPeerConnection)
+ }
+ tc.rtcPeerConns[offerId] = pc
+}
pproffd.CloseWrapper
span trace.Span
ctx context.Context
+
+ onCloseHandler func()
}
func (me *wrappedPeerConnection) Close() error {
me.closeMu.Lock()
defer me.closeMu.Unlock()
+
+ me.onClose()
+
err := me.CloseWrapper.Close()
me.span.End()
return err
}
+func (me *wrappedPeerConnection) OnClose(f func()) {
+ me.closeMu.Lock()
+ defer me.closeMu.Unlock()
+ me.onCloseHandler = f
+}
+
+func (me *wrappedPeerConnection) onClose() {
+ handler := me.onCloseHandler
+
+ if handler != nil {
+ handler()
+ }
+}
+
func newPeerConnection(logger log.Logger, iceServers []webrtc.ICEServer) (*wrappedPeerConnection, error) {
newPeerConnectionMu.Lock()
defer newPeerConnectionMu.Unlock()