// handshake has not yet occurred. This is a good time to alter the supported extension
// protocols.
PeerConnAdded []func(*PeerConn)
+
+ // Sends status event updates. Useful to inform the user of specific events as they happen,
+ // for logging or to action on.
+ StatusUpdated []func(StatusUpdatedEvent)
}
type ReceivedUsefulDataEvent = PeerMessageEvent
ExtensionNumber pp.ExtensionNumber
Payload []byte
}
+
+type StatusUpdatedEvent struct {
+ Event StatusEvent `json:"event"`
+ Error error `json:"error"`
+ // The following fields may or may not be populated depending on the event.
+ PeerId PeerID `json:"peer_id"`
+ Url string `json:"url"`
+ InfoHash string `json:"info_hash"`
+}
+
+type StatusEvent string
+
+const (
+ PeerConnected StatusEvent = "peer_connected"
+ PeerDisconnected StatusEvent = "peer_disconnected"
+ TrackerConnected StatusEvent = "tracker_connected"
+ TrackerDisconnected StatusEvent = "tracker_disconnected"
+ TrackerAnnounceSuccessful StatusEvent = "tracker_announce_successful"
+ TrackerAnnounceError StatusEvent = "tracker_announce_error"
+)
--- /dev/null
+package torrent
+
+import (
+ "io"
+ "os"
+ "testing"
+ "testing/iotest"
+
+ "github.com/anacrolix/missinggo/v2"
+ "github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/torrent/internal/testutil"
+ "github.com/frankban/quicktest"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/time/rate"
+)
+
+func TestPeerConnEstablished(t *testing.T) {
+ var expectedPeerId PeerID
+ missinggo.CopyExact(&expectedPeerId, "12345123451234512345")
+
+ gotPeerConnectedEvt := false
+ gotPeerDisconnectedEvt := false
+ ps := testClientTransferParams{
+ ConfigureSeeder: ConfigureClient{
+ Config: func(cfg *ClientConfig) {
+ cfg.PeerID = "12345123451234512345"
+ },
+ },
+ ConfigureLeecher: ConfigureClient{
+ Config: func(cfg *ClientConfig) {
+ // cfg.DisableUTP = true
+ cfg.DisableTCP = true
+ cfg.Debug = false
+ cfg.DisableTrackers = true
+ cfg.EstablishedConnsPerTorrent = 1
+ cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated,
+ func(e StatusUpdatedEvent) {
+ if e.Event == PeerConnected {
+ gotPeerConnectedEvt = true
+ require.Equal(t, expectedPeerId, e.PeerId)
+ require.NoError(t, e.Error)
+ }
+ },
+ func(e StatusUpdatedEvent) {
+ if e.Event == PeerDisconnected {
+ gotPeerDisconnectedEvt = true
+ require.Equal(t, expectedPeerId, e.PeerId)
+ require.NoError(t, e.Error)
+ }
+ },
+ )
+ },
+ },
+ }
+
+ testClientTransfer(t, ps)
+ // double check that the callbacks were called
+ require.True(t, gotPeerConnectedEvt)
+ require.True(t, gotPeerDisconnectedEvt)
+}
+
+type ConfigureClient struct {
+ Config func(cfg *ClientConfig)
+ Client func(cl *Client)
+}
+
+type testClientTransferParams struct {
+ SeederUploadRateLimiter *rate.Limiter
+ LeecherDownloadRateLimiter *rate.Limiter
+ ConfigureSeeder ConfigureClient
+ ConfigureLeecher ConfigureClient
+
+ LeecherStartsWithoutMetadata bool
+}
+
+// Simplified version of testClientTransfer found in test/leecher-storage.go.
+// Could not import and reuse that function due to circular dependencies between modules.
+func testClientTransfer(t *testing.T, ps testClientTransferParams) {
+ greetingTempDir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(greetingTempDir)
+ // Create seeder and a Torrent.
+ cfg := TestingConfig(t)
+ cfg.Seed = true
+ // Some test instances don't like this being on, even when there's no cache involved.
+ cfg.DropMutuallyCompletePeers = false
+ if ps.SeederUploadRateLimiter != nil {
+ cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
+ }
+ cfg.DataDir = greetingTempDir
+ if ps.ConfigureSeeder.Config != nil {
+ ps.ConfigureSeeder.Config(cfg)
+ }
+ seeder, err := NewClient(cfg)
+ require.NoError(t, err)
+ if ps.ConfigureSeeder.Client != nil {
+ ps.ConfigureSeeder.Client(seeder)
+ }
+ seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+ defer seeder.Close()
+ <-seederTorrent.Complete().On()
+
+ // Create leecher and a Torrent.
+ leecherDataDir := t.TempDir()
+ cfg = TestingConfig(t)
+ // See the seeder client config comment.
+ cfg.DropMutuallyCompletePeers = false
+ cfg.DataDir = leecherDataDir
+ if ps.LeecherDownloadRateLimiter != nil {
+ cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
+ }
+ cfg.Seed = false
+ if ps.ConfigureLeecher.Config != nil {
+ ps.ConfigureLeecher.Config(cfg)
+ }
+ leecher, err := NewClient(cfg)
+ require.NoError(t, err)
+ defer leecher.Close()
+ if ps.ConfigureLeecher.Client != nil {
+ ps.ConfigureLeecher.Client(leecher)
+ }
+ leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
+ ret = TorrentSpecFromMetaInfo(mi)
+ ret.ChunkSize = 2
+ if ps.LeecherStartsWithoutMetadata {
+ ret.InfoBytes = nil
+ }
+ return
+ }())
+ require.NoError(t, err)
+ assert.False(t, leecherTorrent.Complete().Bool())
+ assert.True(t, new)
+
+ added := leecherTorrent.AddClientPeer(seeder)
+ assert.False(t, leecherTorrent.Seeding())
+ // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they
+ // should be sitting idle until we demand data.
+ if !ps.LeecherStartsWithoutMetadata {
+ assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers)
+ }
+ if ps.LeecherStartsWithoutMetadata {
+ <-leecherTorrent.GotInfo()
+ }
+ r := leecherTorrent.NewReader()
+ defer r.Close()
+ go leecherTorrent.SetInfoBytes(mi.InfoBytes)
+
+ assertReadAllGreeting(t, r)
+ <-leecherTorrent.Complete().On()
+ assert.NotEmpty(t, seederTorrent.PeerConns())
+ leecherPeerConns := leecherTorrent.PeerConns()
+ if cfg.DropMutuallyCompletePeers {
+ // I don't think we can assume it will be empty already, due to timing.
+ // assert.Empty(t, leecherPeerConns)
+ } else {
+ assert.NotEmpty(t, leecherPeerConns)
+ }
+ foundSeeder := false
+ for _, pc := range leecherPeerConns {
+ completed := pc.PeerPieces().GetCardinality()
+ t.Logf("peer conn %v has %v completed pieces", pc, completed)
+ if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
+ foundSeeder = true
+ }
+ }
+ if !foundSeeder {
+ t.Errorf("didn't find seeder amongst leecher peer conns")
+ }
+
+ seederStats := seederTorrent.Stats()
+ assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+ assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+ leecherStats := leecherTorrent.Stats()
+ assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+ assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
+ // Try reading through again for the cases where the torrent data size
+ // exceeds the size of the cache.
+ assertReadAllGreeting(t, r)
+}
+
+func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) {
+ pos, err := r.Seek(0, io.SeekStart)
+ assert.NoError(t, err)
+ assert.EqualValues(t, 0, pos)
+ quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil)
+}
--- /dev/null
+package torrent
+
+import (
+ "errors"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/anacrolix/torrent/internal/testutil"
+ "github.com/anacrolix/torrent/tracker"
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/require"
+)
+
+func TestClientInvalidTracker(t *testing.T) {
+ timeout := time.NewTimer(3 * time.Second)
+ receivedStatusUpdate := make(chan bool)
+ gotTrackerDisconnectedEvt := false
+ cfg := TestingConfig(t)
+ cfg.DisableTrackers = false
+ cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+ if e.Event == TrackerAnnounceError {
+ // ignore
+ return
+ }
+ if e.Event == TrackerDisconnected {
+ gotTrackerDisconnectedEvt = true
+ require.Equal(t, "ws://test.invalid:4242", e.Url)
+ require.Error(t, e.Error)
+ }
+ receivedStatusUpdate <- true
+ })
+
+ cl, err := NewClient(cfg)
+ require.NoError(t, err)
+ defer cl.Close()
+
+ dir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(dir)
+
+ mi.AnnounceList = [][]string{
+ {"ws://test.invalid:4242"},
+ }
+
+ to, err := cl.AddTorrent(mi)
+ require.NoError(t, err)
+
+ select {
+ case <-timeout.C:
+ case <-receivedStatusUpdate:
+ }
+ require.True(t, gotTrackerDisconnectedEvt)
+ to.Drop()
+}
+
+var upgrader = websocket.Upgrader{}
+
+func testtracker(w http.ResponseWriter, r *http.Request) {
+ c, err := upgrader.Upgrade(w, r, nil)
+ if err != nil {
+ return
+ }
+ defer c.Close()
+ for {
+ _, _, err := c.ReadMessage()
+ if err != nil {
+ break
+ }
+ //err = c.WriteMessage(mt, message)
+ //if err != nil {
+ // break
+ //}
+ }
+}
+
+func TestClientValidTrackerConn(t *testing.T) {
+ s, trackerUrl := startTestTracker()
+ defer s.Close()
+
+ timeout := time.NewTimer(3 * time.Second)
+ receivedStatusUpdate := make(chan bool)
+ gotTrackerConnectedEvt := false
+ cfg := TestingConfig(t)
+ cfg.DisableTrackers = false
+ cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+ if e.Event == TrackerConnected {
+ gotTrackerConnectedEvt = true
+ require.Equal(t, trackerUrl, e.Url)
+ require.NoError(t, e.Error)
+ }
+ receivedStatusUpdate <- true
+ })
+
+ cl, err := NewClient(cfg)
+ require.NoError(t, err)
+ defer cl.Close()
+
+ dir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(dir)
+
+ mi.AnnounceList = [][]string{
+ {trackerUrl},
+ }
+
+ to, err := cl.AddTorrent(mi)
+ require.NoError(t, err)
+
+ select {
+ case <-timeout.C:
+ case <-receivedStatusUpdate:
+ }
+ require.True(t, gotTrackerConnectedEvt)
+ to.Drop()
+}
+
+func TestClientAnnounceFailure(t *testing.T) {
+ s, trackerUrl := startTestTracker()
+ defer s.Close()
+
+ timeout := time.NewTimer(3 * time.Second)
+ receivedStatusUpdate := make(chan bool)
+ gotTrackerAnnounceErrorEvt := false
+ cfg := TestingConfig(t)
+ cfg.DisableTrackers = false
+
+ cl, err := NewClient(cfg)
+ require.NoError(t, err)
+ defer cl.Close()
+
+ cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
+ return tracker.AnnounceRequest{}, errors.New("test error")
+ }
+
+ dir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(dir)
+
+ mi.AnnounceList = [][]string{
+ {trackerUrl},
+ }
+
+ to, err := cl.AddTorrent(mi)
+ require.NoError(t, err)
+
+ cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+ if e.Event == TrackerConnected {
+ // ignore
+ return
+ }
+ if e.Event == TrackerAnnounceError {
+ gotTrackerAnnounceErrorEvt = true
+ require.Equal(t, trackerUrl, e.Url)
+ require.Equal(t, to.InfoHash().HexString(), e.InfoHash)
+ require.Error(t, e.Error)
+ require.Equal(t, "test error", e.Error.Error())
+ }
+ receivedStatusUpdate <- true
+ })
+
+ select {
+ case <-timeout.C:
+ case <-receivedStatusUpdate:
+ }
+ require.True(t, gotTrackerAnnounceErrorEvt)
+ to.Drop()
+}
+
+func TestClientAnnounceSuccess(t *testing.T) {
+ s, trackerUrl := startTestTracker()
+ defer s.Close()
+
+ timeout := time.NewTimer(3 * time.Second)
+ receivedStatusUpdate := make(chan bool)
+ gotTrackerAnnounceSuccessfulEvt := false
+ cfg := TestingConfig(t)
+ cfg.DisableTrackers = false
+
+ cl, err := NewClient(cfg)
+ require.NoError(t, err)
+ defer cl.Close()
+
+ dir, mi := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(dir)
+
+ mi.AnnounceList = [][]string{
+ {trackerUrl},
+ }
+
+ to, err := cl.AddTorrent(mi)
+ require.NoError(t, err)
+
+ cfg.Callbacks.StatusUpdated = append(cfg.Callbacks.StatusUpdated, func(e StatusUpdatedEvent) {
+ if e.Event == TrackerConnected {
+ // ignore
+ return
+ }
+ if e.Event == TrackerAnnounceSuccessful {
+ gotTrackerAnnounceSuccessfulEvt = true
+ require.Equal(t, trackerUrl, e.Url)
+ require.Equal(t, to.InfoHash().HexString(), e.InfoHash)
+ require.NoError(t, e.Error)
+ }
+ receivedStatusUpdate <- true
+ })
+
+ select {
+ case <-timeout.C:
+ case <-receivedStatusUpdate:
+ }
+ require.True(t, gotTrackerAnnounceSuccessfulEvt)
+ to.Drop()
+}
+
+func startTestTracker() (*httptest.Server, string) {
+ s := httptest.NewServer(http.HandlerFunc(testtracker))
+ trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http")
+ return s, trackerUrl
+}
"strconv"
"time"
+ "github.com/anacrolix/torrent/webtorrent"
+
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/types/infohash"
infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
- "github.com/anacrolix/torrent/webtorrent"
)
// Clients contain zero or more Torrents. A Client manages a blocklist, the
WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
ICEServers: cl.ICEServers(),
DialContext: cl.config.TrackerDialContext,
+ callbacks: &cl.config.Callbacks,
OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
cl.lock()
defer cl.unlock()
cl := t.cl
nc := dr.Conn
addrIpPort, _ := tryIpPortFromNetAddr(addr)
+
c, err = cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
newConnectionOpts{
pc.startMessageWriter()
pc.sendInitialMessages()
pc.initUpdateRequestsTimer()
+
+ for _, cb := range pc.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: PeerConnected,
+ PeerId: pc.PeerID,
+ })
+ }
+
err := pc.mainReadLoop()
if err != nil {
+ for _, cb := range pc.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: PeerDisconnected,
+ Error: err,
+ PeerId: pc.PeerID,
+ })
+ }
return fmt.Errorf("main read loop: %w", err)
}
return nil
utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
)
+type PeerStatus struct {
+ Id PeerID
+ Ok bool
+ Err string // see https://github.com/golang/go/issues/5161
+}
+
// Maintains the state of a BitTorrent-protocol based connection with a peer.
type PeerConn struct {
Peer
package torrent
import (
+ "github.com/stretchr/testify/require"
"testing"
"time"
//})
return cfg
}
+
+func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} {
+ select {
+ case s := <-channel:
+ return s
+ case <-time.After(duration):
+ require.Fail(t, "Timeout reading observer channel.")
+ }
+ return nil
+}
func (t *Torrent) dropConnection(c *PeerConn) {
t.cl.event.Broadcast()
c.close()
+
+ for _, cb := range c.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: PeerDisconnected,
+ PeerId: c.PeerID,
+ })
+ }
+ t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID)
+
if t.deletePeerConn(c) {
t.openNewConns()
}
return
}
localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
+
pc, err := t.cl.initiateProtocolHandshakes(
context.Background(),
netConn,
"go.opentelemetry.io/otel/trace"
"github.com/anacrolix/torrent/tracker"
+ "github.com/anacrolix/torrent/types/infohash"
)
type TrackerClientStats struct {
ICEServers []webrtc.ICEServer
rtcPeerConns map[string]*wrappedPeerConnection
+
+ // callbacks
+ OnConnected func(error)
+ OnDisconnected func(error)
+ OnAnnounceSuccessful func(ih string)
+ OnAnnounceError func(ih string, err error)
}
func (me *TrackerClient) Stats() TrackerClientStats {
c, _, err := tc.Dialer.Dial(tc.Url, header)
if err != nil {
+ tc.OnDisconnected(err)
return fmt.Errorf("dialing tracker: %w", err)
}
defer c.Close()
}
}
}()
+ tc.OnConnected(nil)
err = tc.trackerReadLoop(tc.wsConn)
close(closeChan)
tc.mu.Lock()
func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
request, err := tc.GetAnnounceRequest(event, infoHash)
if err != nil {
+ tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
return fmt.Errorf("getting announce parameters: %w", err)
}
defer tc.mu.Unlock()
err = tc.writeMessage(data)
if err != nil {
+ tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
return fmt.Errorf("write AnnounceRequest: %w", err)
}
+ tc.OnAnnounceSuccessful(infohash.T(infoHash).HexString())
+ g.MakeMapIfNil(&tc.outboundOffers)
for _, offer := range offers {
- g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
+ g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
}
return nil
}
+//go:build !js
+// +build !js
+
package webtorrent
import (
"net/url"
"sync"
+ "github.com/anacrolix/torrent/webtorrent"
+
"github.com/anacrolix/log"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v4"
"github.com/anacrolix/torrent/tracker"
httpTracker "github.com/anacrolix/torrent/tracker/http"
- "github.com/anacrolix/torrent/webtorrent"
)
type websocketTrackerStatus struct {
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
WebsocketTrackerHttpHeader func() netHttp.Header
ICEServers []webrtc.ICEServer
+ callbacks *Callbacks
}
func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.TrackerClient, func()) {
),
WebsocketTrackerHttpHeader: me.WebsocketTrackerHttpHeader,
ICEServers: me.ICEServers,
+ OnConnected: func(err error) {
+ for _, cb := range me.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: TrackerConnected,
+ Url: url,
+ Error: err,
+ })
+ }
+ },
+ OnDisconnected: func(err error) {
+ for _, cb := range me.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: TrackerDisconnected,
+ Url: url,
+ Error: err,
+ })
+ }
+ },
+ OnAnnounceSuccessful: func(ih string) {
+ for _, cb := range me.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: TrackerAnnounceSuccessful,
+ Url: url,
+ InfoHash: ih,
+ })
+ }
+ },
+ OnAnnounceError: func(ih string, err error) {
+ for _, cb := range me.callbacks.StatusUpdated {
+ cb(StatusUpdatedEvent{
+ Event: TrackerAnnounceError,
+ Url: url,
+ Error: err,
+ InfoHash: ih,
+ })
+ }
+ },
},
}
value.TrackerClient.Start(func(err error) {