]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Expose torrent Peer status updates (#987)
authorMarco Vidonis <31407403+marcovidonis@users.noreply.github.com>
Tue, 18 Mar 2025 23:29:32 +0000 (23:29 +0000)
committerGitHub <noreply@github.com>
Tue, 18 Mar 2025 23:29:32 +0000 (10:29 +1100)
* handle connection to torrent peer status update messages

* basic observer framework
* connects to a valid tracker
* added observer channel for announce status
* set up Peer Connection status Observers
* add PeerConn test: connection established
* added Observers factory method
* Added Event to AnnounceStatus, with embedded TrackerStatus
* state updates must be non-blocking
* add unit tests on PeerConn Observer status reading
* add test and debug log on dropped connection
* add PeerID check to test

---------

Co-authored-by: Parker Whittle <pwhittle@medicom.us>
* do not support webtorrent/transport_test on wasm

* make AnnounceStatus InfoHash into a HexString

* replace MakeMapIfNilAndSet

* add StatusUpdated to callbacks

* replace Observers on Peer Conn status with callbacks

* replace tracker status updates with callbacks

* replace tracker announce status updates with callbacks

* remove references to observers

* test callbacks in client-peerconn_test

* add check that all callbacks were called in peer connection test

* test callbacks in client-tracker_test

---------

Co-authored-by: Parker Whittle <pwhittle@medicom.us>
callbacks.go
client-peerconn_test.go [new file with mode: 0644]
client-tracker_test.go [new file with mode: 0644]
client.go
peerconn.go
testing.go
torrent.go
webtorrent/tracker-client.go
webtorrent/transport_test.go
wstracker.go

index 0c66bc50f78c7ca1e59f9721bc4a2edbd7b37dab..664ba166171c3015ed77c51fc89d04fdbe4b4021 100644 (file)
@@ -34,6 +34,10 @@ type Callbacks struct {
        // handshake has not yet occurred. This is a good time to alter the supported extension
        // protocols.
        PeerConnAdded []func(*PeerConn)
+
+       // Sends status event updates. Useful to inform the user of specific events as they happen,
+       // for logging or to action on.
+       StatusUpdated []func(StatusUpdatedEvent)
 }
 
 type ReceivedUsefulDataEvent = PeerMessageEvent
@@ -54,3 +58,23 @@ type PeerConnReadExtensionMessageEvent struct {
        ExtensionNumber pp.ExtensionNumber
        Payload         []byte
 }
+
+type StatusUpdatedEvent struct {
+       Event StatusEvent `json:"event"`
+       Error error       `json:"error"`
+       // The following fields may or may not be populated depending on the event.
+       PeerId   PeerID `json:"peer_id"`
+       Url      string `json:"url"`
+       InfoHash string `json:"info_hash"`
+}
+
+type StatusEvent string
+
+const (
+       PeerConnected             StatusEvent = "peer_connected"
+       PeerDisconnected          StatusEvent = "peer_disconnected"
+       TrackerConnected          StatusEvent = "tracker_connected"
+       TrackerDisconnected       StatusEvent = "tracker_disconnected"
+       TrackerAnnounceSuccessful StatusEvent = "tracker_announce_successful"
+       TrackerAnnounceError      StatusEvent = "tracker_announce_error"
+)
diff --git a/client-peerconn_test.go b/client-peerconn_test.go
new file mode 100644 (file)
index 0000000..0e6b537
--- /dev/null
@@ -0,0 +1,188 @@
+package torrent
+
+import (
+       "io"
+       "os"
+       "testing"
+       "testing/iotest"
+
+       "github.com/anacrolix/missinggo/v2"
+       "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/torrent/internal/testutil"
+       "github.com/frankban/quicktest"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "golang.org/x/time/rate"
+)
+
+func TestPeerConnEstablished(t *testing.T) {
+       var expectedPeerId PeerID
+       missinggo.CopyExact(&expectedPeerId, "12345123451234512345")
+
+       gotPeerConnectedEvt := false
+       gotPeerDisconnectedEvt := false
+       ps := testClientTransferParams{
+               ConfigureSeeder: ConfigureClient{
+                       Config: func(cfg *ClientConfig) {
+                               cfg.PeerID = "12345123451234512345"
+                       },
+               },
+               ConfigureLeecher: ConfigureClient{
+                       Config: func(cfg *ClientConfig) {
+                               // cfg.DisableUTP = true
+                               cfg.DisableTCP = true
+                               cfg.Debug = false
+                               cfg.DisableTrackers = true
+                               cfg.EstablishedConnsPerTorrent = 1
+                               cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated,
+                                       func(e StatusUpdatedEvent) {
+                                               if e.Event == PeerConnected {
+                                                       gotPeerConnectedEvt = true
+                                                       require.Equal(t, expectedPeerId, e.PeerId)
+                                                       require.NoError(t, e.Error)
+                                               }
+                                       },
+                                       func(e StatusUpdatedEvent) {
+                                               if e.Event == PeerDisconnected {
+                                                       gotPeerDisconnectedEvt = true
+                                                       require.Equal(t, expectedPeerId, e.PeerId)
+                                                       require.NoError(t, e.Error)
+                                               }
+                                       },
+                               )
+                       },
+               },
+       }
+
+       testClientTransfer(t, ps)
+       // double check that the callbacks were called
+       require.True(t, gotPeerConnectedEvt)
+       require.True(t, gotPeerDisconnectedEvt)
+}
+
+type ConfigureClient struct {
+       Config func(cfg *ClientConfig)
+       Client func(cl *Client)
+}
+
+type testClientTransferParams struct {
+       SeederUploadRateLimiter    *rate.Limiter
+       LeecherDownloadRateLimiter *rate.Limiter
+       ConfigureSeeder            ConfigureClient
+       ConfigureLeecher           ConfigureClient
+
+       LeecherStartsWithoutMetadata bool
+}
+
+// Simplified version of testClientTransfer found in test/leecher-storage.go.
+// Could not import and reuse that function due to circular dependencies between modules.
+func testClientTransfer(t *testing.T, ps testClientTransferParams) {
+       greetingTempDir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(greetingTempDir)
+       // Create seeder and a Torrent.
+       cfg := TestingConfig(t)
+       cfg.Seed = true
+       // Some test instances don't like this being on, even when there's no cache involved.
+       cfg.DropMutuallyCompletePeers = false
+       if ps.SeederUploadRateLimiter != nil {
+               cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+       }
+       cfg.DataDir = greetingTempDir
+       if ps.ConfigureSeeder.Config != nil {
+               ps.ConfigureSeeder.Config(cfg)
+       }
+       seeder, err := NewClient(cfg)
+       require.NoError(t, err)
+       if ps.ConfigureSeeder.Client != nil {
+               ps.ConfigureSeeder.Client(seeder)
+       }
+       seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+       defer seeder.Close()
+       <-seederTorrent.Complete().On()
+
+       // Create leecher and a Torrent.
+       leecherDataDir := t.TempDir()
+       cfg = TestingConfig(t)
+       // See the seeder client config comment.
+       cfg.DropMutuallyCompletePeers = false
+       cfg.DataDir = leecherDataDir
+       if ps.LeecherDownloadRateLimiter != nil {
+               cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+       }
+       cfg.Seed = false
+       if ps.ConfigureLeecher.Config != nil {
+               ps.ConfigureLeecher.Config(cfg)
+       }
+       leecher, err := NewClient(cfg)
+       require.NoError(t, err)
+       defer leecher.Close()
+       if ps.ConfigureLeecher.Client != nil {
+               ps.ConfigureLeecher.Client(leecher)
+       }
+       leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
+               ret = TorrentSpecFromMetaInfo(mi)
+               ret.ChunkSize = 2
+               if ps.LeecherStartsWithoutMetadata {
+                       ret.InfoBytes = nil
+               }
+               return
+       }())
+       require.NoError(t, err)
+       assert.False(t, leecherTorrent.Complete().Bool())
+       assert.True(t, new)
+
+       added := leecherTorrent.AddClientPeer(seeder)
+       assert.False(t, leecherTorrent.Seeding())
+       // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
+       // should be sitting idle until we demand data.
+       if !ps.LeecherStartsWithoutMetadata {
+               assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
+       }
+       if ps.LeecherStartsWithoutMetadata {
+               <-leecherTorrent.GotInfo()
+       }
+       r := leecherTorrent.NewReader()
+       defer r.Close()
+       go leecherTorrent.SetInfoBytes(mi.InfoBytes)
+
+       assertReadAllGreeting(t, r)
+       <-leecherTorrent.Complete().On()
+       assert.NotEmpty(t, seederTorrent.PeerConns())
+       leecherPeerConns := leecherTorrent.PeerConns()
+       if cfg.DropMutuallyCompletePeers {
+               // I don't think we can assume it will be empty already, due to timing.
+               // assert.Empty(t, leecherPeerConns)
+       } else {
+               assert.NotEmpty(t, leecherPeerConns)
+       }
+       foundSeeder := false
+       for _, pc := range leecherPeerConns {
+               completed := pc.PeerPieces().GetCardinality()
+               t.Logf("peer conn %v has %v completed pieces", pc, completed)
+               if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
+                       foundSeeder = true
+               }
+       }
+       if !foundSeeder {
+               t.Errorf("didn't find seeder amongst leecher peer conns")
+       }
+
+       seederStats := seederTorrent.Stats()
+       assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+       assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+       leecherStats := leecherTorrent.Stats()
+       assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+       assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
+       // Try reading through again for the cases where the torrent data size
+       // exceeds the size of the cache.
+       assertReadAllGreeting(t, r)
+}
+
+func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
+       pos, err := r.Seek(0, io.SeekStart)
+       assert.NoError(t, err)
+       assert.EqualValues(t, 0, pos)
+       quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
+}
diff --git a/client-tracker_test.go b/client-tracker_test.go
new file mode 100644 (file)
index 0000000..0bc63e3
--- /dev/null
@@ -0,0 +1,220 @@
+package torrent
+
+import (
+       "errors"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/anacrolix/torrent/internal/testutil"
+       "github.com/anacrolix/torrent/tracker"
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/require"
+)
+
+func TestClientInvalidTracker(t *testing.T) {
+       timeout := time.NewTimer(3 * time.Second)
+       receivedStatusUpdate := make(chan bool)
+       gotTrackerDisconnectedEvt := false
+       cfg := TestingConfig(t)
+       cfg.DisableTrackers = false
+       cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+               if e.Event == TrackerAnnounceError {
+                       // ignore
+                       return
+               }
+               if e.Event == TrackerDisconnected {
+                       gotTrackerDisconnectedEvt = true
+                       require.Equal(t, "ws://test.invalid:4242", e.Url)
+                       require.Error(t, e.Error)
+               }
+               receivedStatusUpdate <- true
+       })
+
+       cl, err := NewClient(cfg)
+       require.NoError(t, err)
+       defer cl.Close()
+
+       dir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(dir)
+
+       mi.AnnounceList = [][]string{
+               {"ws://test.invalid:4242"},
+       }
+
+       to, err := cl.AddTorrent(mi)
+       require.NoError(t, err)
+
+       select {
+       case <-timeout.C:
+       case <-receivedStatusUpdate:
+       }
+       require.True(t, gotTrackerDisconnectedEvt)
+       to.Drop()
+}
+
+var upgrader = websocket.Upgrader{}
+
+func testtracker(w http.ResponseWriter, r *http.Request) {
+       c, err := upgrader.Upgrade(w, r, nil)
+       if err != nil {
+               return
+       }
+       defer c.Close()
+       for {
+               _, _, err := c.ReadMessage()
+               if err != nil {
+                       break
+               }
+               //err = c.WriteMessage(mt, message)
+               //if err != nil {
+               //      break
+               //}
+       }
+}
+
+func TestClientValidTrackerConn(t *testing.T) {
+       s, trackerUrl := startTestTracker()
+       defer s.Close()
+
+       timeout := time.NewTimer(3 * time.Second)
+       receivedStatusUpdate := make(chan bool)
+       gotTrackerConnectedEvt := false
+       cfg := TestingConfig(t)
+       cfg.DisableTrackers = false
+       cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+               if e.Event == TrackerConnected {
+                       gotTrackerConnectedEvt = true
+                       require.Equal(t, trackerUrl, e.Url)
+                       require.NoError(t, e.Error)
+               }
+               receivedStatusUpdate <- true
+       })
+
+       cl, err := NewClient(cfg)
+       require.NoError(t, err)
+       defer cl.Close()
+
+       dir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(dir)
+
+       mi.AnnounceList = [][]string{
+               {trackerUrl},
+       }
+
+       to, err := cl.AddTorrent(mi)
+       require.NoError(t, err)
+
+       select {
+       case <-timeout.C:
+       case <-receivedStatusUpdate:
+       }
+       require.True(t, gotTrackerConnectedEvt)
+       to.Drop()
+}
+
+func TestClientAnnounceFailure(t *testing.T) {
+       s, trackerUrl := startTestTracker()
+       defer s.Close()
+
+       timeout := time.NewTimer(3 * time.Second)
+       receivedStatusUpdate := make(chan bool)
+       gotTrackerAnnounceErrorEvt := false
+       cfg := TestingConfig(t)
+       cfg.DisableTrackers = false
+
+       cl, err := NewClient(cfg)
+       require.NoError(t, err)
+       defer cl.Close()
+
+       cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
+               return tracker.AnnounceRequest{}, errors.New("test error")
+       }
+
+       dir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(dir)
+
+       mi.AnnounceList = [][]string{
+               {trackerUrl},
+       }
+
+       to, err := cl.AddTorrent(mi)
+       require.NoError(t, err)
+
+       cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+               if e.Event == TrackerConnected {
+                       // ignore
+                       return
+               }
+               if e.Event == TrackerAnnounceError {
+                       gotTrackerAnnounceErrorEvt = true
+                       require.Equal(t, trackerUrl, e.Url)
+                       require.Equal(t, to.InfoHash().HexString(), e.InfoHash)
+                       require.Error(t, e.Error)
+                       require.Equal(t, "test error", e.Error.Error())
+               }
+               receivedStatusUpdate <- true
+       })
+
+       select {
+       case <-timeout.C:
+       case <-receivedStatusUpdate:
+       }
+       require.True(t, gotTrackerAnnounceErrorEvt)
+       to.Drop()
+}
+
+func TestClientAnnounceSuccess(t *testing.T) {
+       s, trackerUrl := startTestTracker()
+       defer s.Close()
+
+       timeout := time.NewTimer(3 * time.Second)
+       receivedStatusUpdate := make(chan bool)
+       gotTrackerAnnounceSuccessfulEvt := false
+       cfg := TestingConfig(t)
+       cfg.DisableTrackers = false
+
+       cl, err := NewClient(cfg)
+       require.NoError(t, err)
+       defer cl.Close()
+
+       dir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(dir)
+
+       mi.AnnounceList = [][]string{
+               {trackerUrl},
+       }
+
+       to, err := cl.AddTorrent(mi)
+       require.NoError(t, err)
+
+       cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+               if e.Event == TrackerConnected {
+                       // ignore
+                       return
+               }
+               if e.Event == TrackerAnnounceSuccessful {
+                       gotTrackerAnnounceSuccessfulEvt = true
+                       require.Equal(t, trackerUrl, e.Url)
+                       require.Equal(t, to.InfoHash().HexString(), e.InfoHash)
+                       require.NoError(t, e.Error)
+               }
+               receivedStatusUpdate <- true
+       })
+
+       select {
+       case <-timeout.C:
+       case <-receivedStatusUpdate:
+       }
+       require.True(t, gotTrackerAnnounceSuccessfulEvt)
+       to.Drop()
+}
+
+func startTestTracker() (*httptest.Server, string) {
+       s := httptest.NewServer(http.HandlerFunc(testtracker))
+       trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http")
+       return s, trackerUrl
+}
index fe3af23997e5a6649faf9385e50a7d8bd41f03b0..9001fce1f99ac856254c43f84f9685b30bb18f40 100644 (file)
--- a/client.go
+++ b/client.go
@@ -18,6 +18,8 @@ import (
        "strconv"
        "time"
 
+       "github.com/anacrolix/torrent/webtorrent"
+
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
@@ -47,7 +49,6 @@ import (
        "github.com/anacrolix/torrent/tracker"
        "github.com/anacrolix/torrent/types/infohash"
        infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
-       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Clients contain zero or more Torrents. A Client manages a blocklist, the
@@ -332,6 +333,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
                ICEServers:                 cl.ICEServers(),
                DialContext:                cl.config.TrackerDialContext,
+               callbacks:                  &cl.config.Callbacks,
                OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
                        cl.lock()
                        defer cl.unlock()
@@ -741,6 +743,7 @@ func doProtocolHandshakeOnDialResult(
        cl := t.cl
        nc := dr.Conn
        addrIpPort, _ := tryIpPortFromNetAddr(addr)
+
        c, err = cl.initiateProtocolHandshakes(
                context.Background(), nc, t, obfuscatedHeader,
                newConnectionOpts{
@@ -1128,8 +1131,23 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error {
        pc.startMessageWriter()
        pc.sendInitialMessages()
        pc.initUpdateRequestsTimer()
+
+       for _, cb := range pc.callbacks.StatusUpdated {
+               cb(StatusUpdatedEvent{
+                       Event:  PeerConnected,
+                       PeerId: pc.PeerID,
+               })
+       }
+
        err := pc.mainReadLoop()
        if err != nil {
+               for _, cb := range pc.callbacks.StatusUpdated {
+                       cb(StatusUpdatedEvent{
+                               Event:  PeerDisconnected,
+                               Error:  err,
+                               PeerId: pc.PeerID,
+                       })
+               }
                return fmt.Errorf("main read loop: %w", err)
        }
        return nil
index 65800dd695490ddbb35f50070b3ab58b327cc5cc..ad069a293dc416b9d65a76476bdb02357c0c6d41 100644 (file)
@@ -32,6 +32,12 @@ import (
        utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
 )
 
+type PeerStatus struct {
+       Id  PeerID
+       Ok  bool
+       Err string // see https://github.com/golang/go/issues/5161
+}
+
 // Maintains the state of a BitTorrent-protocol based connection with a peer.
 type PeerConn struct {
        Peer
index a49092730814fa8632f8db3c2f3c0caa083b2b32..1ab74c7a83f3975b2c264e77e8bae27c605c66a7 100644 (file)
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "github.com/stretchr/testify/require"
        "testing"
        "time"
 
@@ -35,3 +36,13 @@ func TestingConfig(t testing.TB) *ClientConfig {
        //})
        return cfg
 }
+
+func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} {
+       select {
+       case s := <-channel:
+               return s
+       case <-time.After(duration):
+               require.Fail(t, "Timeout reading observer channel.")
+       }
+       return nil
+}
index cbd2c05006b8ff73cd878afb30200200039b8941..8f7e41fa24f9076d83b834064ae1b172291f7f42 100644 (file)
@@ -1804,6 +1804,15 @@ func (t *Torrent) assertPendingRequests() {
 func (t *Torrent) dropConnection(c *PeerConn) {
        t.cl.event.Broadcast()
        c.close()
+
+       for _, cb := range c.callbacks.StatusUpdated {
+               cb(StatusUpdatedEvent{
+                       Event:  PeerDisconnected,
+                       PeerId: c.PeerID,
+               })
+       }
+       t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID)
+
        if t.deletePeerConn(c) {
                t.openNewConns()
        }
@@ -1864,6 +1873,7 @@ func (t *Torrent) onWebRtcConn(
                return
        }
        localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
+
        pc, err := t.cl.initiateProtocolHandshakes(
                context.Background(),
                netConn,
index ec2232d132cb1d941d03138d62b290f808a8f008..cf02b629b546ce44fbbfdd3a788a7df3a60023d4 100644 (file)
@@ -16,6 +16,7 @@ import (
        "go.opentelemetry.io/otel/trace"
 
        "github.com/anacrolix/torrent/tracker"
+       "github.com/anacrolix/torrent/types/infohash"
 )
 
 type TrackerClientStats struct {
@@ -45,6 +46,12 @@ type TrackerClient struct {
        ICEServers                 []webrtc.ICEServer
 
        rtcPeerConns map[string]*wrappedPeerConnection
+
+       // callbacks
+       OnConnected          func(error)
+       OnDisconnected       func(error)
+       OnAnnounceSuccessful func(ih string)
+       OnAnnounceError      func(ih string, err error)
 }
 
 func (me *TrackerClient) Stats() TrackerClientStats {
@@ -99,6 +106,7 @@ func (tc *TrackerClient) doWebsocket() error {
 
        c, _, err := tc.Dialer.Dial(tc.Url, header)
        if err != nil {
+               tc.OnDisconnected(err)
                return fmt.Errorf("dialing tracker: %w", err)
        }
        defer c.Close()
@@ -125,6 +133,7 @@ func (tc *TrackerClient) doWebsocket() error {
                        }
                }
        }()
+       tc.OnConnected(nil)
        err = tc.trackerReadLoop(tc.wsConn)
        close(closeChan)
        tc.mu.Lock()
@@ -262,6 +271,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
 func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
        request, err := tc.GetAnnounceRequest(event, infoHash)
        if err != nil {
+               tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
                return fmt.Errorf("getting announce parameters: %w", err)
        }
 
@@ -291,10 +301,13 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte
        defer tc.mu.Unlock()
        err = tc.writeMessage(data)
        if err != nil {
+               tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
                return fmt.Errorf("write AnnounceRequest: %w", err)
        }
+       tc.OnAnnounceSuccessful(infohash.T(infoHash).HexString())
+       g.MakeMapIfNil(&tc.outboundOffers)
        for _, offer := range offers {
-               g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
+               g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
        }
        return nil
 }
index dcb170a06159f8d7896fc52f05e71e6430f9372c..753339d303f5a850ec06ca1bea8ee3f97a180b78 100644 (file)
@@ -1,3 +1,6 @@
+//go:build !js
+// +build !js
+
 package webtorrent
 
 import (
index 0e71e4e4a00f4d3f711b3ed87b3daf7eeadf5954..ed5526db150d6b7e0cec349269b4279a94ede5e3 100644 (file)
@@ -8,13 +8,14 @@ import (
        "net/url"
        "sync"
 
+       "github.com/anacrolix/torrent/webtorrent"
+
        "github.com/anacrolix/log"
        "github.com/gorilla/websocket"
        "github.com/pion/webrtc/v4"
 
        "github.com/anacrolix/torrent/tracker"
        httpTracker "github.com/anacrolix/torrent/tracker/http"
-       "github.com/anacrolix/torrent/webtorrent"
 )
 
 type websocketTrackerStatus struct {
@@ -49,6 +50,7 @@ type websocketTrackers struct {
        DialContext                func(ctx context.Context, network, addr string) (net.Conn, error)
        WebsocketTrackerHttpHeader func() netHttp.Header
        ICEServers                 []webrtc.ICEServer
+       callbacks                  *Callbacks
 }
 
 func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.TrackerClient, func()) {
@@ -75,6 +77,43 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra
                                ),
                                WebsocketTrackerHttpHeader: me.WebsocketTrackerHttpHeader,
                                ICEServers:                 me.ICEServers,
+                               OnConnected: func(err error) {
+                                       for _, cb := range me.callbacks.StatusUpdated {
+                                               cb(StatusUpdatedEvent{
+                                                       Event: TrackerConnected,
+                                                       Url:   url,
+                                                       Error: err,
+                                               })
+                                       }
+                               },
+                               OnDisconnected: func(err error) {
+                                       for _, cb := range me.callbacks.StatusUpdated {
+                                               cb(StatusUpdatedEvent{
+                                                       Event: TrackerDisconnected,
+                                                       Url:   url,
+                                                       Error: err,
+                                               })
+                                       }
+                               },
+                               OnAnnounceSuccessful: func(ih string) {
+                                       for _, cb := range me.callbacks.StatusUpdated {
+                                               cb(StatusUpdatedEvent{
+                                                       Event:    TrackerAnnounceSuccessful,
+                                                       Url:      url,
+                                                       InfoHash: ih,
+                                               })
+                                       }
+                               },
+                               OnAnnounceError: func(ih string, err error) {
+                                       for _, cb := range me.callbacks.StatusUpdated {
+                                               cb(StatusUpdatedEvent{
+                                                       Event:    TrackerAnnounceError,
+                                                       Url:      url,
+                                                       Error:    err,
+                                                       InfoHash: ih,
+                                               })
+                                       }
+                               },
                        },
                }
                value.TrackerClient.Start(func(err error) {