From 3a656a26676c23ee845dcc5b810e1f7f06005b06 Mon Sep 17 00:00:00 2001 From: Marco Vidonis <31407403+marcovidonis@users.noreply.github.com> Date: Tue, 18 Mar 2025 23:29:32 +0000 Subject: [PATCH] Expose torrent Peer status updates (#987) * handle connection to torrent peer status update messages * basic observer framework * connects to a valid tracker * added observer channel for announce status * set up Peer Connection status Observers * add PeerConn test: connection established * added Observers factory method * Added Event to AnnounceStatus, with embedded TrackerStatus * state updates must be non-blocking * add unit tests on PeerConn Observer status reading * add test and debug log on dropped connection * add PeerID check to test --------- Co-authored-by: Parker Whittle * do not support webtorrent/transport_test on wasm * make AnnounceStatus InfoHash into a HexString * replace MakeMapIfNilAndSet * add StatusUpdated to callbacks * replace Observers on Peer Conn status with callbacks * replace tracker status updates with callbacks * replace tracker announce status updates with callbacks * remove references to observers * test callbacks in client-peerconn_test * add check that all callbacks were called in peer connection test * test callbacks in client-tracker_test --------- Co-authored-by: Parker Whittle --- callbacks.go | 24 ++++ client-peerconn_test.go | 188 ++++++++++++++++++++++++++++++ client-tracker_test.go | 220 +++++++++++++++++++++++++++++++++++ client.go | 20 +++- peerconn.go | 6 + testing.go | 11 ++ torrent.go | 10 ++ webtorrent/tracker-client.go | 15 ++- webtorrent/transport_test.go | 3 + wstracker.go | 41 ++++++- 10 files changed, 535 insertions(+), 3 deletions(-) create mode 100644 client-peerconn_test.go create mode 100644 client-tracker_test.go diff --git a/callbacks.go b/callbacks.go index 0c66bc50..664ba166 100644 --- a/callbacks.go +++ b/callbacks.go @@ -34,6 +34,10 @@ type Callbacks struct { // handshake has not yet occurred. This is a good time to alter the supported extension // protocols. PeerConnAdded []func(*PeerConn) + + // Sends status event updates. Useful to inform the user of specific events as they happen, + // for logging or to action on. + StatusUpdated []func(StatusUpdatedEvent) } type ReceivedUsefulDataEvent = PeerMessageEvent @@ -54,3 +58,23 @@ type PeerConnReadExtensionMessageEvent struct { ExtensionNumber pp.ExtensionNumber Payload []byte } + +type StatusUpdatedEvent struct { + Event StatusEvent `json:"event"` + Error error `json:"error"` + // The following fields may or may not be populated depending on the event. + PeerId PeerID `json:"peer_id"` + Url string `json:"url"` + InfoHash string `json:"info_hash"` +} + +type StatusEvent string + +const ( + PeerConnected StatusEvent = "peer_connected" + PeerDisconnected StatusEvent = "peer_disconnected" + TrackerConnected StatusEvent = "tracker_connected" + TrackerDisconnected StatusEvent = "tracker_disconnected" + TrackerAnnounceSuccessful StatusEvent = "tracker_announce_successful" + TrackerAnnounceError StatusEvent = "tracker_announce_error" +) diff --git a/client-peerconn_test.go b/client-peerconn_test.go new file mode 100644 index 00000000..0e6b5372 --- /dev/null +++ b/client-peerconn_test.go @@ -0,0 +1,188 @@ +package torrent + +import ( + "io" + "os" + "testing" + "testing/iotest" + + "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestPeerConnEstablished(t *testing.T) { + var expectedPeerId PeerID + missinggo.CopyExact(&expectedPeerId, "12345123451234512345") + + gotPeerConnectedEvt := false + gotPeerDisconnectedEvt := false + ps := testClientTransferParams{ + ConfigureSeeder: ConfigureClient{ + Config: func(cfg *ClientConfig) { + cfg.PeerID = "12345123451234512345" + }, + }, + ConfigureLeecher: ConfigureClient{ + Config: func(cfg *ClientConfig) { + // cfg.DisableUTP = true + cfg.DisableTCP = true + cfg.Debug = false + cfg.DisableTrackers = true + cfg.EstablishedConnsPerTorrent = 1 + cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, + func(e StatusUpdatedEvent) { + if e.Event == PeerConnected { + gotPeerConnectedEvt = true + require.Equal(t, expectedPeerId, e.PeerId) + require.NoError(t, e.Error) + } + }, + func(e StatusUpdatedEvent) { + if e.Event == PeerDisconnected { + gotPeerDisconnectedEvt = true + require.Equal(t, expectedPeerId, e.PeerId) + require.NoError(t, e.Error) + } + }, + ) + }, + }, + } + + testClientTransfer(t, ps) + // double check that the callbacks were called + require.True(t, gotPeerConnectedEvt) + require.True(t, gotPeerDisconnectedEvt) +} + +type ConfigureClient struct { + Config func(cfg *ClientConfig) + Client func(cl *Client) +} + +type testClientTransferParams struct { + SeederUploadRateLimiter *rate.Limiter + LeecherDownloadRateLimiter *rate.Limiter + ConfigureSeeder ConfigureClient + ConfigureLeecher ConfigureClient + + LeecherStartsWithoutMetadata bool +} + +// Simplified version of testClientTransfer found in test/leecher-storage.go. +// Could not import and reuse that function due to circular dependencies between modules. +func testClientTransfer(t *testing.T, ps testClientTransferParams) { + greetingTempDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingTempDir) + // Create seeder and a Torrent. + cfg := TestingConfig(t) + cfg.Seed = true + // Some test instances don't like this being on, even when there's no cache involved. + cfg.DropMutuallyCompletePeers = false + if ps.SeederUploadRateLimiter != nil { + cfg.UploadRateLimiter = ps.SeederUploadRateLimiter + } + cfg.DataDir = greetingTempDir + if ps.ConfigureSeeder.Config != nil { + ps.ConfigureSeeder.Config(cfg) + } + seeder, err := NewClient(cfg) + require.NoError(t, err) + if ps.ConfigureSeeder.Client != nil { + ps.ConfigureSeeder.Client(seeder) + } + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + defer seeder.Close() + <-seederTorrent.Complete().On() + + // Create leecher and a Torrent. + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + // See the seeder client config comment. + cfg.DropMutuallyCompletePeers = false + cfg.DataDir = leecherDataDir + if ps.LeecherDownloadRateLimiter != nil { + cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter + } + cfg.Seed = false + if ps.ConfigureLeecher.Config != nil { + ps.ConfigureLeecher.Config(cfg) + } + leecher, err := NewClient(cfg) + require.NoError(t, err) + defer leecher.Close() + if ps.ConfigureLeecher.Client != nil { + ps.ConfigureLeecher.Client(leecher) + } + leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + if ps.LeecherStartsWithoutMetadata { + ret.InfoBytes = nil + } + return + }()) + require.NoError(t, err) + assert.False(t, leecherTorrent.Complete().Bool()) + assert.True(t, new) + + added := leecherTorrent.AddClientPeer(seeder) + assert.False(t, leecherTorrent.Seeding()) + // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they + // should be sitting idle until we demand data. + if !ps.LeecherStartsWithoutMetadata { + assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers) + } + if ps.LeecherStartsWithoutMetadata { + <-leecherTorrent.GotInfo() + } + r := leecherTorrent.NewReader() + defer r.Close() + go leecherTorrent.SetInfoBytes(mi.InfoBytes) + + assertReadAllGreeting(t, r) + <-leecherTorrent.Complete().On() + assert.NotEmpty(t, seederTorrent.PeerConns()) + leecherPeerConns := leecherTorrent.PeerConns() + if cfg.DropMutuallyCompletePeers { + // I don't think we can assume it will be empty already, due to timing. + // assert.Empty(t, leecherPeerConns) + } else { + assert.NotEmpty(t, leecherPeerConns) + } + foundSeeder := false + for _, pc := range leecherPeerConns { + completed := pc.PeerPieces().GetCardinality() + t.Logf("peer conn %v has %v completed pieces", pc, completed) + if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) { + foundSeeder = true + } + } + if !foundSeeder { + t.Errorf("didn't find seeder amongst leecher peer conns") + } + + seederStats := seederTorrent.Stats() + assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) + assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) + + leecherStats := leecherTorrent.Stats() + assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) + assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) + + // Try reading through again for the cases where the torrent data size + // exceeds the size of the cache. + assertReadAllGreeting(t, r) +} + +func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { + pos, err := r.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.EqualValues(t, 0, pos) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) +} diff --git a/client-tracker_test.go b/client-tracker_test.go new file mode 100644 index 00000000..0bc63e34 --- /dev/null +++ b/client-tracker_test.go @@ -0,0 +1,220 @@ +package torrent + +import ( + "errors" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/tracker" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +func TestClientInvalidTracker(t *testing.T) { + timeout := time.NewTimer(3 * time.Second) + receivedStatusUpdate := make(chan bool) + gotTrackerDisconnectedEvt := false + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) { + if e.Event == TrackerAnnounceError { + // ignore + return + } + if e.Event == TrackerDisconnected { + gotTrackerDisconnectedEvt = true + require.Equal(t, "ws://test.invalid:4242", e.Url) + require.Error(t, e.Error) + } + receivedStatusUpdate <- true + }) + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {"ws://test.invalid:4242"}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + select { + case <-timeout.C: + case <-receivedStatusUpdate: + } + require.True(t, gotTrackerDisconnectedEvt) + to.Drop() +} + +var upgrader = websocket.Upgrader{} + +func testtracker(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + for { + _, _, err := c.ReadMessage() + if err != nil { + break + } + //err = c.WriteMessage(mt, message) + //if err != nil { + // break + //} + } +} + +func TestClientValidTrackerConn(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + timeout := time.NewTimer(3 * time.Second) + receivedStatusUpdate := make(chan bool) + gotTrackerConnectedEvt := false + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) { + if e.Event == TrackerConnected { + gotTrackerConnectedEvt = true + require.Equal(t, trackerUrl, e.Url) + require.NoError(t, e.Error) + } + receivedStatusUpdate <- true + }) + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + select { + case <-timeout.C: + case <-receivedStatusUpdate: + } + require.True(t, gotTrackerConnectedEvt) + to.Drop() +} + +func TestClientAnnounceFailure(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + timeout := time.NewTimer(3 * time.Second) + receivedStatusUpdate := make(chan bool) + gotTrackerAnnounceErrorEvt := false + cfg := TestingConfig(t) + cfg.DisableTrackers = false + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { + return tracker.AnnounceRequest{}, errors.New("test error") + } + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) { + if e.Event == TrackerConnected { + // ignore + return + } + if e.Event == TrackerAnnounceError { + gotTrackerAnnounceErrorEvt = true + require.Equal(t, trackerUrl, e.Url) + require.Equal(t, to.InfoHash().HexString(), e.InfoHash) + require.Error(t, e.Error) + require.Equal(t, "test error", e.Error.Error()) + } + receivedStatusUpdate <- true + }) + + select { + case <-timeout.C: + case <-receivedStatusUpdate: + } + require.True(t, gotTrackerAnnounceErrorEvt) + to.Drop() +} + +func TestClientAnnounceSuccess(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + timeout := time.NewTimer(3 * time.Second) + receivedStatusUpdate := make(chan bool) + gotTrackerAnnounceSuccessfulEvt := false + cfg := TestingConfig(t) + cfg.DisableTrackers = false + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) { + if e.Event == TrackerConnected { + // ignore + return + } + if e.Event == TrackerAnnounceSuccessful { + gotTrackerAnnounceSuccessfulEvt = true + require.Equal(t, trackerUrl, e.Url) + require.Equal(t, to.InfoHash().HexString(), e.InfoHash) + require.NoError(t, e.Error) + } + receivedStatusUpdate <- true + }) + + select { + case <-timeout.C: + case <-receivedStatusUpdate: + } + require.True(t, gotTrackerAnnounceSuccessfulEvt) + to.Drop() +} + +func startTestTracker() (*httptest.Server, string) { + s := httptest.NewServer(http.HandlerFunc(testtracker)) + trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http") + return s, trackerUrl +} diff --git a/client.go b/client.go index fe3af239..9001fce1 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,8 @@ import ( "strconv" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -47,7 +49,6 @@ import ( "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/types/infohash" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" - "github.com/anacrolix/torrent/webtorrent" ) // Clients contain zero or more Torrents. A Client manages a blocklist, the @@ -332,6 +333,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, ICEServers: cl.ICEServers(), DialContext: cl.config.TrackerDialContext, + callbacks: &cl.config.Callbacks, OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() @@ -741,6 +743,7 @@ func doProtocolHandshakeOnDialResult( cl := t.cl nc := dr.Conn addrIpPort, _ := tryIpPortFromNetAddr(addr) + c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ @@ -1128,8 +1131,23 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error { pc.startMessageWriter() pc.sendInitialMessages() pc.initUpdateRequestsTimer() + + for _, cb := range pc.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerConnected, + PeerId: pc.PeerID, + }) + } + err := pc.mainReadLoop() if err != nil { + for _, cb := range pc.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerDisconnected, + Error: err, + PeerId: pc.PeerID, + }) + } return fmt.Errorf("main read loop: %w", err) } return nil diff --git a/peerconn.go b/peerconn.go index 65800dd6..ad069a29 100644 --- a/peerconn.go +++ b/peerconn.go @@ -32,6 +32,12 @@ import ( utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) +type PeerStatus struct { + Id PeerID + Ok bool + Err string // see https://github.com/golang/go/issues/5161 +} + // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer diff --git a/testing.go b/testing.go index a4909273..1ab74c7a 100644 --- a/testing.go +++ b/testing.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/stretchr/testify/require" "testing" "time" @@ -35,3 +36,13 @@ func TestingConfig(t testing.TB) *ClientConfig { //}) return cfg } + +func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} { + select { + case s := <-channel: + return s + case <-time.After(duration): + require.Fail(t, "Timeout reading observer channel.") + } + return nil +} diff --git a/torrent.go b/torrent.go index cbd2c050..8f7e41fa 100644 --- a/torrent.go +++ b/torrent.go @@ -1804,6 +1804,15 @@ func (t *Torrent) assertPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() c.close() + + for _, cb := range c.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: PeerDisconnected, + PeerId: c.PeerID, + }) + } + t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID) + if t.deletePeerConn(c) { t.openNewConns() } @@ -1864,6 +1873,7 @@ func (t *Torrent) onWebRtcConn( return } localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) + pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index ec2232d1..cf02b629 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/types/infohash" ) type TrackerClientStats struct { @@ -45,6 +46,12 @@ type TrackerClient struct { ICEServers []webrtc.ICEServer rtcPeerConns map[string]*wrappedPeerConnection + + // callbacks + OnConnected func(error) + OnDisconnected func(error) + OnAnnounceSuccessful func(ih string) + OnAnnounceError func(ih string, err error) } func (me *TrackerClient) Stats() TrackerClientStats { @@ -99,6 +106,7 @@ func (tc *TrackerClient) doWebsocket() error { c, _, err := tc.Dialer.Dial(tc.Url, header) if err != nil { + tc.OnDisconnected(err) return fmt.Errorf("dialing tracker: %w", err) } defer c.Close() @@ -125,6 +133,7 @@ func (tc *TrackerClient) doWebsocket() error { } } }() + tc.OnConnected(nil) err = tc.trackerReadLoop(tc.wsConn) close(closeChan) tc.mu.Lock() @@ -262,6 +271,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error { request, err := tc.GetAnnounceRequest(event, infoHash) if err != nil { + tc.OnAnnounceError(infohash.T(infoHash).HexString(), err) return fmt.Errorf("getting announce parameters: %w", err) } @@ -291,10 +301,13 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte defer tc.mu.Unlock() err = tc.writeMessage(data) if err != nil { + tc.OnAnnounceError(infohash.T(infoHash).HexString(), err) return fmt.Errorf("write AnnounceRequest: %w", err) } + tc.OnAnnounceSuccessful(infohash.T(infoHash).HexString()) + g.MakeMapIfNil(&tc.outboundOffers) for _, offer := range offers { - g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue) + g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue) } return nil } diff --git a/webtorrent/transport_test.go b/webtorrent/transport_test.go index dcb170a0..753339d3 100644 --- a/webtorrent/transport_test.go +++ b/webtorrent/transport_test.go @@ -1,3 +1,6 @@ +//go:build !js +// +build !js + package webtorrent import ( diff --git a/wstracker.go b/wstracker.go index 0e71e4e4..ed5526db 100644 --- a/wstracker.go +++ b/wstracker.go @@ -8,13 +8,14 @@ import ( "net/url" "sync" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/log" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - "github.com/anacrolix/torrent/webtorrent" ) type websocketTrackerStatus struct { @@ -49,6 +50,7 @@ type websocketTrackers struct { DialContext func(ctx context.Context, network, addr string) (net.Conn, error) WebsocketTrackerHttpHeader func() netHttp.Header ICEServers []webrtc.ICEServer + callbacks *Callbacks } func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.TrackerClient, func()) { @@ -75,6 +77,43 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra ), WebsocketTrackerHttpHeader: me.WebsocketTrackerHttpHeader, ICEServers: me.ICEServers, + OnConnected: func(err error) { + for _, cb := range me.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: TrackerConnected, + Url: url, + Error: err, + }) + } + }, + OnDisconnected: func(err error) { + for _, cb := range me.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: TrackerDisconnected, + Url: url, + Error: err, + }) + } + }, + OnAnnounceSuccessful: func(ih string) { + for _, cb := range me.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: TrackerAnnounceSuccessful, + Url: url, + InfoHash: ih, + }) + } + }, + OnAnnounceError: func(ih string, err error) { + for _, cb := range me.callbacks.StatusUpdated { + cb(StatusUpdatedEvent{ + Event: TrackerAnnounceError, + Url: url, + Error: err, + InfoHash: ih, + }) + } + }, }, } value.TrackerClient.Start(func(err error) { -- 2.48.1