]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Merge branch 'webtorrent'
[btrtrc.git] / torrent.go
index d29c9a923525eff29ac18a0d15f699fcc8550a07..69a0c667f08fbf113fd11d8c9d6f52e840ab63f7 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        "container/heap"
+       "context"
        "crypto/sha1"
        "errors"
        "fmt"
@@ -14,6 +15,7 @@ import (
        "unsafe"
 
        "github.com/davecgh/go-spew/spew"
+       "github.com/pion/datachannel"
 
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/log"
@@ -29,6 +31,7 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/storage"
        "github.com/anacrolix/torrent/tracker"
+       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Maintains state of torrent within a Client. Many methods should not be called before the info is
@@ -90,7 +93,7 @@ type Torrent struct {
        peers          prioritizedPeers
        wantPeersEvent missinggo.Event
        // An announcer for each tracker URL.
-       trackerAnnouncers map[string]*trackerScraper
+       trackerAnnouncers map[string]torrentTrackerAnnouncer
        // How many times we've initiated a DHT announce. TODO: Move into stats.
        numDHTAnnounces int
 
@@ -593,9 +596,9 @@ func (t *Torrent) writeStatus(w io.Writer) {
        func() {
                tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
                fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
-               for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
-                       lu := l.u
-                       ru := r.u
+               for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
+                       lu := l.URL()
+                       ru := r.URL()
                        var luns, runs url.URL = lu, ru
                        luns.Scheme = ""
                        runs.Scheme = ""
@@ -603,7 +606,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
                        ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
                        ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
                        return ml.Less()
-               }).([]*trackerScraper) {
+               }).([]torrentTrackerAnnouncer) {
                        fmt.Fprintf(tw, "    %s\n", ta.statusLine())
                }
                tw.Flush()
@@ -1268,6 +1271,33 @@ func (t *Torrent) seeding() bool {
        return true
 }
 
+func (t *Torrent) onWebRtcConn(
+       c datachannel.ReadWriteCloser,
+       dcc webtorrent.DataChannelContext,
+) {
+       defer c.Close()
+       pc, err := t.cl.handshakesConnection(
+               context.Background(),
+               webrtcNetConn{c, dcc},
+               t,
+               false,
+               webrtcNetAddr{dcc.Remote},
+               webrtcNetwork,
+               fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
+       )
+       if err != nil {
+               t.logger.Printf("error in handshaking webrtc connection: %v", err)
+       }
+       if dcc.LocalOffered {
+               pc.Discovery = PeerSourceTracker
+       } else {
+               pc.Discovery = PeerSourceIncoming
+       }
+       t.cl.lock()
+       defer t.cl.unlock()
+       t.cl.runHandshookConn(pc, t)
+}
+
 func (t *Torrent) startScrapingTracker(_url string) {
        if _url == "" {
                return
@@ -1288,24 +1318,44 @@ func (t *Torrent) startScrapingTracker(_url string) {
                t.startScrapingTracker(u.String())
                return
        }
-       if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
-               return
-       }
-       if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
-               return
-       }
        if _, ok := t.trackerAnnouncers[_url]; ok {
                return
        }
-       newAnnouncer := &trackerScraper{
-               u: *u,
-               t: t,
-       }
+       sl := func() torrentTrackerAnnouncer {
+               switch u.Scheme {
+               case "ws", "wss":
+                       wst := websocketTracker{*u, webtorrent.NewTrackerClient(t.cl.peerID, t.infoHash, t.onWebRtcConn,
+                               t.logger.WithText(func(m log.Msg) string {
+                                       return fmt.Sprintf("%q: %v", u.String(), m.Text())
+                               }).WithValues(log.Debug))}
+                       ar := t.announceRequest(tracker.Started)
+                       go func() {
+                               err := wst.TrackerClient.Run(ar, u.String())
+                               if err != nil {
+                                       t.logger.WithValues(log.Error).Printf(
+                                               "error running websocket tracker announcer for %q: %v",
+                                               u.String(), err)
+                               }
+                       }()
+                       return wst
+               }
+               if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
+                       return nil
+               }
+               if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
+                       return nil
+               }
+               newAnnouncer := &trackerScraper{
+                       u: *u,
+                       t: t,
+               }
+               go newAnnouncer.Run()
+               return newAnnouncer
+       }()
        if t.trackerAnnouncers == nil {
-               t.trackerAnnouncers = make(map[string]*trackerScraper)
+               t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
        }
-       t.trackerAnnouncers[_url] = newAnnouncer
-       go newAnnouncer.Run()
+       t.trackerAnnouncers[_url] = sl
 }
 
 // Adds and starts tracker scrapers for tracker URLs that aren't already