]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Expose WebRTC peerconn stats (#983)
authorMarco Vidonis <31407403+marcovidonis@users.noreply.github.com>
Thu, 3 Oct 2024 14:46:35 +0000 (15:46 +0100)
committerGitHub <noreply@github.com>
Thu, 3 Oct 2024 14:46:35 +0000 (00:46 +1000)
* add WebRTC peer connection transport stats

* save all peer connections in tracker-client and make stats available when seeding

* make offer ID keys into readable strings

* handle unsupported Peer Conn stats on WASM

torrent.go
webtorrent/peer-conn-stats.go [new file with mode: 0644]
webtorrent/peer-conn-stats_js.go [new file with mode: 0644]
webtorrent/tracker-client.go
webtorrent/transport.go

index 12886061173096f6ef39c4253d5df41932be2bcf..7435c9b6c2a4ac2c0a444cd3d8430a3d20749544 100644 (file)
@@ -33,6 +33,7 @@ import (
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
        "github.com/pion/datachannel"
+       "github.com/pion/webrtc/v3"
        "golang.org/x/sync/errgroup"
 
        "github.com/anacrolix/torrent/bencode"
@@ -3002,6 +3003,18 @@ func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
        )
 }
 
+type webRtcStatsReports map[string]webrtc.StatsReport
+
+func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports {
+       stats := make(map[string]webRtcStatsReports)
+       trackersMap := t.cl.websocketTrackers.clients
+       for i, trackerClient := range trackersMap {
+               ts := trackerClient.RtcPeerConnStats()
+               stats[i] = ts
+       }
+       return stats
+}
+
 type requestState struct {
        peer *Peer
        when time.Time
diff --git a/webtorrent/peer-conn-stats.go b/webtorrent/peer-conn-stats.go
new file mode 100644 (file)
index 0000000..9073986
--- /dev/null
@@ -0,0 +1,15 @@
+//go:build !js
+// +build !js
+
+package webtorrent
+
+import (
+       "github.com/pion/webrtc/v3"
+)
+
+func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) {
+       if pc != nil {
+               stats = pc.GetStats()
+       }
+       return
+}
diff --git a/webtorrent/peer-conn-stats_js.go b/webtorrent/peer-conn-stats_js.go
new file mode 100644 (file)
index 0000000..7f770b1
--- /dev/null
@@ -0,0 +1,13 @@
+//go:build js && wasm
+// +build js,wasm
+
+package webtorrent
+
+import (
+       "github.com/pion/webrtc/v3"
+)
+
+// webrtc.PeerConnection.GetStats() is not currently supported for WASM. Return empty stats.
+func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) {
+       return
+}
index 8b307c7beacb22667f771698c5ea638616f34b6d..cc0c65849b3f92f334106f09e0a10a01d24da5ff 100644 (file)
@@ -44,6 +44,8 @@ type TrackerClient struct {
 
        WebsocketTrackerHttpHeader func() http.Header
        ICEServers                 []webrtc.ICEServer
+
+       rtcPeerConns map[string]*wrappedPeerConnection
 }
 
 func (me *TrackerClient) Stats() TrackerClientStats {
@@ -234,17 +236,22 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
                return fmt.Errorf("creating offer: %w", err)
        }
 
+       // save the leecher peer connections
+       tc.storePeerConnection(fmt.Sprintf("%x", randOfferId[:]), pc)
+
+       pc.OnClose(func() {
+               delete(tc.rtcPeerConns, offerIDBinary)
+       })
+
        tc.Logger.Levelf(log.Debug, "announcing offer")
-       err = tc.announce(event, infoHash, []outboundOffer{
-               {
-                       offerId: offerIDBinary,
-                       outboundOfferValue: outboundOfferValue{
-                               originalOffer:  offer,
-                               peerConnection: pc,
-                               infoHash:       infoHash,
-                               dataChannel:    dc,
-                       },
-               },
+       err = tc.announce(event, infoHash, []outboundOffer{{
+               offerId: offerIDBinary,
+               outboundOfferValue: outboundOfferValue{
+                       originalOffer:  offer,
+                       peerConnection: pc,
+                       infoHash:       infoHash,
+                       dataChannel:    dc,
+               }},
        })
        if err != nil {
                dc.Close()
@@ -293,6 +300,19 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte
        return nil
 }
 
+// Calculate the stats for all the peer connections the moment they are requested.
+// As the stats will change over the life of a peer connection, this ensures that
+// the updated values are returned.
+func (tc *TrackerClient) RtcPeerConnStats() map[string]webrtc.StatsReport {
+       tc.mu.Lock()
+       defer tc.mu.Unlock()
+       sr := make(map[string]webrtc.StatsReport)
+       for id, pc := range tc.rtcPeerConns {
+               sr[id] = GetPeerConnStats(pc)
+       }
+       return sr
+}
+
 func (tc *TrackerClient) writeMessage(data []byte) error {
        for tc.wsConn == nil {
                if tc.closed {
@@ -359,6 +379,10 @@ func (tc *TrackerClient) handleOffer(
        if err != nil {
                return fmt.Errorf("creating answering peer connection: %w", err)
        }
+
+       // save the seeder peer connections
+       tc.storePeerConnection(fmt.Sprintf("%x", offerContext.Id[:]), peerConnection)
+
        response := AnnounceResponse{
                Action:   "announce",
                InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
@@ -401,3 +425,12 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
        delete(tc.outboundOffers, offerId)
        go tc.Announce(tracker.None, offer.infoHash)
 }
+
+func (tc *TrackerClient) storePeerConnection(offerId string, pc *wrappedPeerConnection) {
+       tc.mu.Lock()
+       defer tc.mu.Unlock()
+       if tc.rtcPeerConns == nil {
+               tc.rtcPeerConns = make(map[string]*wrappedPeerConnection)
+       }
+       tc.rtcPeerConns[offerId] = pc
+}
index 75f1376299e060ac7b622ff57bcb265ed7b3efcb..6231fd8d6627a16ab7f183c0753766fde1abf1cd 100644 (file)
@@ -38,16 +38,35 @@ type wrappedPeerConnection struct {
        pproffd.CloseWrapper
        span trace.Span
        ctx  context.Context
+
+       onCloseHandler func()
 }
 
 func (me *wrappedPeerConnection) Close() error {
        me.closeMu.Lock()
        defer me.closeMu.Unlock()
+
+       me.onClose()
+
        err := me.CloseWrapper.Close()
        me.span.End()
        return err
 }
 
+func (me *wrappedPeerConnection) OnClose(f func()) {
+       me.closeMu.Lock()
+       defer me.closeMu.Unlock()
+       me.onCloseHandler = f
+}
+
+func (me *wrappedPeerConnection) onClose() {
+       handler := me.onCloseHandler
+
+       if handler != nil {
+               handler()
+       }
+}
+
 func newPeerConnection(logger log.Logger, iceServers []webrtc.ICEServer) (*wrappedPeerConnection, error) {
        newPeerConnectionMu.Lock()
        defer newPeerConnectionMu.Unlock()