defer c.close()
c.Discovery = ps
c.trusted = trusted
- cl.runHandshookConn(c, t)
+ t.runHandshookConnLoggingErr(c)
}
// The port number for incoming peer connections. 0 if the client isn't listening.
torrent.Add("received handshake for loaded torrent", 1)
cl.lock()
defer cl.unlock()
- cl.runHandshookConn(c, t)
+ t.runHandshookConnLoggingErr(c)
}
// Client lock must be held before entering this.
-func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) {
+func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
c.setTorrent(t)
if c.PeerID == cl.peerID {
if c.outgoing {
// address, we won't record the remote address as a doppleganger. Instead, the initiator
// can record *us* as the doppleganger.
}
- return
+ return errors.New("local and remote peer ids are the same")
}
c.conn.SetWriteDeadline(time.Time{})
c.r = deadlineReader{c.conn, c.r}
torrent.Add("completed handshake over ipv6", 1)
}
if err := t.addConnection(c); err != nil {
- log.Fmsg("error adding connection: %s", err).AddValues(c).SetLevel(log.Debug).Log(t.logger)
- return
+ return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(c)
go c.writer(time.Minute)
cl.sendInitialMessages(c, t)
- err := c.mainReadLoop()
- if err != nil && cl.config.Debug {
- cl.logger.Printf("error during connection main read loop: %s", err)
- }
+ return fmt.Errorf("main read loop: %w", c.mainReadLoop())
}
// See the order given in Transmission's tr_peerMsgsNew.
fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
)
if err != nil {
- t.logger.Printf("error in handshaking webrtc connection: %v", err)
+ t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
+ return
}
if dcc.LocalOffered {
pc.Discovery = PeerSourceTracker
}
t.cl.lock()
defer t.cl.unlock()
- t.cl.runHandshookConn(pc, t)
+ err = t.cl.runHandshookConn(pc, t)
+ t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
+}
+
+func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
+ err := t.cl.runHandshookConn(pc, t)
+ if err != nil || logAll {
+ t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+ }
+}
+
+func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
+ t.logRunHandshookConn(pc, false, log.Debug)
}
func (t *Torrent) startScrapingTracker(_url string) {
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
case "ws", "wss":
- wst := websocketTracker{*u, webtorrent.NewTrackerClient(t.cl.peerID, t.infoHash, t.onWebRtcConn,
- t.logger.WithText(func(m log.Msg) string {
- return fmt.Sprintf("%q: %v", u.String(), m.Text())
- }).WithDefaultLevel(log.Debug))}
- ar := t.announceRequest(tracker.Started)
+ wst := websocketTracker{
+ *u,
+ &webtorrent.TrackerClient{
+ Url: u.String(),
+ GetAnnounceRequest: func(event tracker.AnnounceEvent) tracker.AnnounceRequest {
+ t.cl.lock()
+ defer t.cl.unlock()
+ return t.announceRequest(event)
+ },
+ PeerId: t.cl.peerID,
+ InfoHash: t.infoHash,
+ OnConn: t.onWebRtcConn,
+ Logger: t.logger.WithText(func(m log.Msg) string {
+ return fmt.Sprintf("%q: %v", u.String(), m.Text())
+ }).WithDefaultLevel(log.Debug),
+ },
+ }
go func() {
- err := wst.TrackerClient.Run(ar, u.String())
+ err := wst.TrackerClient.Run()
if err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf(
"error running websocket tracker announcer for %q: %v",
- u.String(), err)
+ u.String(), err,
+ )
}
}()
return wst
"encoding/json"
"fmt"
"sync"
+ "time"
"github.com/anacrolix/log"
// Client represents the webtorrent client
type TrackerClient struct {
+ Url string
+ GetAnnounceRequest func(tracker.AnnounceEvent) tracker.AnnounceRequest
+ PeerId [20]byte
+ InfoHash [20]byte
+ OnConn onDataChannelOpen
+ Logger log.Logger
+
lock sync.Mutex
- peerIDBinary string
- infoHashBinary string
outboundOffers map[string]outboundOffer // OfferID to outboundOffer
- tracker *websocket.Conn
- onConn onDataChannelOpen
- logger log.Logger
+ wsConn *websocket.Conn
+}
+
+func (me *TrackerClient) peerIdBinary() string {
+ return binaryToJsonString(me.PeerId[:])
+}
+
+func (me *TrackerClient) infoHashBinary() string {
+ return binaryToJsonString(me.InfoHash[:])
}
// outboundOffer represents an outstanding offer.
type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
-func NewTrackerClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *TrackerClient {
- return &TrackerClient{
- outboundOffers: make(map[string]outboundOffer),
- peerIDBinary: binaryToJsonString(peerId[:]),
- infoHashBinary: binaryToJsonString(infoHash[:]),
- onConn: onConn,
- logger: logger,
- }
-}
-
-func (c *TrackerClient) Run(ar tracker.AnnounceRequest, url string) error {
- t, _, err := websocket.DefaultDialer.Dial(url, nil)
+func (tc *TrackerClient) doWebsocket() error {
+ c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
if err != nil {
- return fmt.Errorf("failed to dial tracker: %w", err)
+ return fmt.Errorf("dialing tracker: %w", err)
}
- defer t.Close()
- c.logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", url)
- c.tracker = t
-
+ defer c.Close()
+ tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
+ tc.wsConn = c
go func() {
- err := c.announce(ar)
+ err := tc.announce(tracker.Started)
if err != nil {
- c.logger.WithDefaultLevel(log.Error).Printf("error announcing: %v", err)
+ tc.Logger.WithDefaultLevel(log.Error).Printf("error in initial announce: %v", err)
}
}()
- err = c.trackerReadLoop(c.tracker)
- c.lock.Lock()
- c.lock.Unlock()
- c.closeUnusedOffers()
+ err = tc.trackerReadLoop(tc.wsConn)
+ tc.lock.Lock()
+ tc.closeUnusedOffers()
+ tc.lock.Unlock()
return err
}
-func (c *TrackerClient) closeUnusedOffers() {
- for _, offer := range c.outboundOffers {
+func (tc *TrackerClient) Run() error {
+ for {
+ err := tc.doWebsocket()
+ tc.Logger.Printf("websocket instance ended: %v", err)
+ time.Sleep(time.Minute)
+ }
+}
+
+func (tc *TrackerClient) closeUnusedOffers() {
+ for _, offer := range tc.outboundOffers {
if offer.answered {
continue
}
offer.peerConnection.Close()
}
+ tc.outboundOffers = nil
}
-func (c *TrackerClient) announce(request tracker.AnnounceRequest) error {
+func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
var randOfferId [20]byte
_, err := rand.Read(randOfferId[:])
if err != nil {
return fmt.Errorf("creating offer: %w", err)
}
+ request := tc.GetAnnounceRequest(event)
+
req := AnnounceRequest{
Numwant: 1, // If higher we need to create equal amount of offers.
Uploaded: request.Uploaded,
Downloaded: request.Downloaded,
Left: request.Left,
- Event: "started",
+ Event: request.Event.String(),
Action: "announce",
- InfoHash: c.infoHashBinary,
- PeerID: c.peerIDBinary,
+ InfoHash: tc.infoHashBinary(),
+ PeerID: tc.peerIdBinary(),
Offers: []Offer{{
OfferID: offerIDBinary,
Offer: offer,
return fmt.Errorf("marshalling request: %w", err)
}
- c.lock.Lock()
- defer c.lock.Unlock()
+ tc.lock.Lock()
+ defer tc.lock.Unlock()
- err = c.tracker.WriteMessage(websocket.TextMessage, data)
+ err = tc.wsConn.WriteMessage(websocket.TextMessage, data)
if err != nil {
pc.Close()
return fmt.Errorf("write AnnounceRequest: %w", err)
}
-
- c.outboundOffers[offerIDBinary] = outboundOffer{
+ if tc.outboundOffers == nil {
+ tc.outboundOffers = make(map[string]outboundOffer)
+ }
+ tc.outboundOffers[offerIDBinary] = outboundOffer{
peerConnection: pc,
dataChannel: dc,
originalOffer: offer,
return nil
}
-func (c *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
+func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
for {
_, message, err := tracker.ReadMessage()
if err != nil {
return fmt.Errorf("read message error: %w", err)
}
- c.logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
+ tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
var ar AnnounceResponse
if err := json.Unmarshal(message, &ar); err != nil {
- c.logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
+ tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
continue
}
- if ar.InfoHash != c.infoHashBinary {
- c.logger.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash)
+ if ar.InfoHash != tc.infoHashBinary() {
+ tc.Logger.Printf("announce response for different hash: expected %q got %q", tc.infoHashBinary(), ar.InfoHash)
continue
}
switch {
case ar.Offer != nil:
- answer, err := getAnswerForOffer(*ar.Offer, c.onConn, ar.OfferID)
+ answer, err := getAnswerForOffer(*ar.Offer, tc.OnConn, ar.OfferID)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
req := AnnounceResponse{
Action: "announce",
- InfoHash: c.infoHashBinary,
- PeerID: c.peerIDBinary,
+ InfoHash: tc.infoHashBinary(),
+ PeerID: tc.peerIdBinary(),
ToPeerID: ar.PeerID,
Answer: &answer,
OfferID: ar.OfferID,
return fmt.Errorf("failed to marshal request: %w", err)
}
- c.lock.Lock()
+ tc.lock.Lock()
err = tracker.WriteMessage(websocket.TextMessage, data)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
- c.lock.Unlock()
+ tc.lock.Unlock()
}
- c.lock.Unlock()
+ tc.lock.Unlock()
case ar.Answer != nil:
- c.lock.Lock()
- offer, ok := c.outboundOffers[ar.OfferID]
- c.lock.Unlock()
- if !ok {
- c.logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
- continue
- }
- c.logger.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
- err = offer.setAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
- c.onConn(dc, DataChannelContext{
- Local: offer.originalOffer,
- Remote: *ar.Answer,
- OfferId: ar.OfferID,
- LocalOffered: true,
- })
- })
- if err != nil {
- return fmt.Errorf("failed to sent answer: %w", err)
- }
- offer.answered = true
+ tc.handleAnswer(ar.OfferID, *ar.Answer)
}
}
}
+
+func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
+ tc.lock.Lock()
+ defer tc.lock.Unlock()
+ offer, ok := tc.outboundOffers[offerId]
+ if !ok {
+ tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
+ return
+ }
+ tc.Logger.Printf("offer %q got answer %v", offerId, answer)
+ err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
+ tc.OnConn(dc, DataChannelContext{
+ Local: offer.originalOffer,
+ Remote: answer,
+ OfferId: offerId,
+ LocalOffered: true,
+ })
+ })
+ if err != nil {
+ tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+ return
+ }
+ delete(tc.outboundOffers, offerId)
+ go tc.announce(tracker.None)
+}