From 80b1560de3a47bc139adb06aa5f7510ceccfe49b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 28 Feb 2024 19:19:16 +1100 Subject: [PATCH] Announce to both v1 and v2 swarms --- client.go | 46 ++++++--- client_test.go | 12 ++- cmd/torrent2/main.go | 3 +- file.go | 1 + issue97_test.go | 2 + ltep_test.go | 2 + merkle/merkle.go | 3 +- metainfo/bep52.go | 1 + metainfo/file-tree.go | 6 +- metainfo/fileinfo.go | 3 +- metainfo/magnet-v2.go | 2 +- metainfo/magnet_test.go | 4 +- metainfo/metainfo_test.go | 2 +- peerconn.go | 3 + peerconn_test.go | 4 +- pexconn_test.go | 3 +- piece.go | 6 +- reader.go | 4 +- requesting.go | 2 +- spec.go | 10 +- t.go | 6 +- test_test.go | 2 +- torrent.go | 163 ++++++++++++++++++++++++------- torrent_test.go | 6 +- tracker_scraper.go | 8 +- types/infohash-v2/infohash-v2.go | 6 +- types/infohash/infohash.go | 4 + 27 files changed, 227 insertions(+), 87 deletions(-) diff --git a/client.go b/client.go index cc472ecd..4b8e9df1 100644 --- a/client.go +++ b/client.go @@ -43,11 +43,11 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" - infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/types/infohash" + infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "github.com/anacrolix/torrent/webtorrent" ) @@ -156,7 +156,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice)) fmt.Fprintln(w) sort.Slice(torrentsSlice, func(l, r int) bool { - return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString() + return torrentsSlice[l].canonicalShortInfohash().AsString() < torrentsSlice[r].canonicalShortInfohash().AsString() }) for _, t := range torrentsSlice { if t.name() == "" { @@ -306,14 +306,18 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { cl.websocketTrackers = websocketTrackers{ PeerId: cl.peerID, Logger: cl.logger, - GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { + GetAnnounceRequest: func( + event tracker.AnnounceEvent, infoHash [20]byte, + ) ( + tracker.AnnounceRequest, error, + ) { cl.lock() defer cl.unlock() t, ok := cl.torrents[infoHash] if !ok { return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") } - return t.announceRequest(event), nil + return t.announceRequest(event, infoHash), nil }, Proxy: cl.config.HTTPProxy, WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, @@ -903,16 +907,15 @@ func (cl *Client) incomingPeerPort() int { return cl.LocalPort() } -func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error { +func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) (err error) { if c.headerEncrypted { var rw io.ReadWriter - var err error rw, c.cryptoMethod, err = mse.InitiateHandshake( struct { io.Reader io.Writer }{c.r, c.w}, - t.infoHash[:], + t.canonicalShortInfohash().Bytes(), nil, cl.config.CryptoProvides, ) @@ -921,14 +924,19 @@ func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error { return fmt.Errorf("header obfuscation handshake: %w", err) } } - ih, err := cl.connBtHandshake(c, &t.infoHash) + ih, err := cl.connBtHandshake(c, t.canonicalShortInfohash()) if err != nil { return fmt.Errorf("bittorrent protocol handshake: %w", err) } - if ih != t.infoHash { - return errors.New("bittorrent protocol handshake: peer infohash didn't match") + if g.Some(ih) == t.infoHash { + return nil } - return nil + if t.infoHashV2.Ok && *t.infoHashV2.Value.ToShort() == ih { + c.v2 = true + return nil + } + err = errors.New("bittorrent protocol handshake: peer infohash didn't match") + return } // Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code @@ -1285,6 +1293,13 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( // Return a Torrent ready for insertion into a Client. func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { + var v1InfoHash g.Option[infohash.T] + if !opts.InfoHash.IsZero() { + v1InfoHash.Set(opts.InfoHash) + } + if !v1InfoHash.Ok && !opts.InfoHashV2.Ok { + panic("v1 infohash must be nonzero or v2 infohash must be set") + } // use provided storage, if provided storageClient := cl.defaultStorage if opts.Storage != nil { @@ -1293,7 +1308,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { t = &Torrent{ cl: cl, - infoHash: opts.InfoHash, + infoHash: v1InfoHash, infoHashV2: opts.InfoHashV2, peers: prioritizedPeers{ om: gbtree.New(32), @@ -1344,10 +1359,13 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo return cl.AddTorrentInfoHashWithStorage(infoHash, nil) } -// Adds a torrent by InfoHash with a custom Storage implementation. +// Deprecated. Adds a torrent by InfoHash with a custom Storage implementation. // If the torrent already exists then this Storage is ignored and the // existing torrent returned with `new` set to `false` -func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) { +func (cl *Client) AddTorrentInfoHashWithStorage( + infoHash metainfo.Hash, + specStorage storage.ClientImpl, +) (t *Torrent, new bool) { cl.lock() defer cl.unlock() t, ok := cl.torrents[infoHash] diff --git a/client_test.go b/client_test.go index 5463d41c..781f04a9 100644 --- a/client_test.go +++ b/client_test.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "io" + "math/rand" "net" "net/netip" "os" @@ -131,9 +132,9 @@ func TestAddDropManyTorrents(t *testing.T) { cl, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer cl.Close() - for i := 0; i < 1000; i += 1 { + for i := range 1000 { var spec TorrentSpec - binary.PutVarint(spec.InfoHash[:], int64(i)) + binary.PutVarint(spec.InfoHash[:], int64(i+1)) tt, new, err := cl.AddTorrentSpec(&spec) assert.NoError(t, err) assert.True(t, new) @@ -155,6 +156,7 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) { require.NoError(t, err) defer cl.Close() spec := TorrentSpec{} + rand.Read(spec.InfoHash[:]) T, new, _ := cl.AddTorrentSpec(&spec) if !new { t.FailNow() @@ -587,6 +589,7 @@ func TestPeerInvalidHave(t *testing.T) { } func TestPieceCompletedInStorageButNotClient(t *testing.T) { + c := qt.New(t) greetingTempDir, greetingMetainfo := testutil.GreetingTestTorrent() defer os.RemoveAll(greetingTempDir) cfg := TestingConfig(t) @@ -594,9 +597,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) { seeder, err := NewClient(TestingConfig(t)) require.NoError(t, err) defer seeder.Close() - seeder.AddTorrentSpec(&TorrentSpec{ + _, new, err := seeder.AddTorrentSpec(&TorrentSpec{ InfoBytes: greetingMetainfo.InfoBytes, + InfoHash: greetingMetainfo.HashInfoBytes(), }) + c.Check(err, qt.IsNil) + c.Check(new, qt.IsTrue) } // Check that when the listen port is 0, all the protocols listened on have diff --git a/cmd/torrent2/main.go b/cmd/torrent2/main.go index 412e57c2..0d57f013 100644 --- a/cmd/torrent2/main.go +++ b/cmd/torrent2/main.go @@ -4,8 +4,9 @@ package main import ( - "github.com/anacrolix/torrent/metainfo" "os" + + "github.com/anacrolix/torrent/metainfo" ) type argError struct { diff --git a/file.go b/file.go index 3a53adaa..ae802f13 100644 --- a/file.go +++ b/file.go @@ -2,6 +2,7 @@ package torrent import ( "crypto/sha256" + "github.com/RoaringBitmap/roaring" g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" diff --git a/issue97_test.go b/issue97_test.go index ee8107c6..9e14f047 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -19,6 +19,8 @@ func TestHashPieceAfterStorageClosed(t *testing.T) { logger: log.Default, chunkSize: defaultChunkSize, } + tt.infoHash.Ok = true + tt.infoHash.Value[0] = 1 mi := testutil.GreetingMetaInfo() info, err := mi.UnmarshalInfo() require.NoError(t, err) diff --git a/ltep_test.go b/ltep_test.go index 0a95a868..37300f09 100644 --- a/ltep_test.go +++ b/ltep_test.go @@ -1,6 +1,7 @@ package torrent_test import ( + "math/rand" "strconv" "testing" @@ -113,6 +114,7 @@ func TestUserLtep(t *testing.T) { c.Assert(err, qt.IsNil) defer cl2.Close() addOpts := AddTorrentOpts{} + rand.Read(addOpts.InfoHash[:]) t1, _ := cl1.AddTorrentOpt(addOpts) t2, _ := cl2.AddTorrentOpt(addOpts) defer testutil.ExportStatusWriter(cl1, "cl1", t)() diff --git a/merkle/merkle.go b/merkle/merkle.go index a6667cb4..ab54af6a 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -3,8 +3,9 @@ package merkle import ( "crypto/sha256" "fmt" - g "github.com/anacrolix/generics" "math/bits" + + g "github.com/anacrolix/generics" ) // The leaf block size for BitTorrent v2 Merkle trees. diff --git a/metainfo/bep52.go b/metainfo/bep52.go index 8bdd19de..0291d653 100644 --- a/metainfo/bep52.go +++ b/metainfo/bep52.go @@ -2,6 +2,7 @@ package metainfo import ( "fmt" + "github.com/anacrolix/torrent/merkle" ) diff --git a/metainfo/file-tree.go b/metainfo/file-tree.go index 3fcc4331..bfb7229e 100644 --- a/metainfo/file-tree.go +++ b/metainfo/file-tree.go @@ -1,10 +1,12 @@ package metainfo import ( + "sort" + g "github.com/anacrolix/generics" - "github.com/anacrolix/torrent/bencode" "golang.org/x/exp/maps" - "sort" + + "github.com/anacrolix/torrent/bencode" ) const FileTreePropertiesKey = "" diff --git a/metainfo/fileinfo.go b/metainfo/fileinfo.go index 82e1c94f..66ee2da8 100644 --- a/metainfo/fileinfo.go +++ b/metainfo/fileinfo.go @@ -1,8 +1,9 @@ package metainfo import ( - g "github.com/anacrolix/generics" "strings" + + g "github.com/anacrolix/generics" ) // Information specific to a single file inside the MetaInfo structure. diff --git a/metainfo/magnet-v2.go b/metainfo/magnet-v2.go index a6c2c8b4..322c624d 100644 --- a/metainfo/magnet-v2.go +++ b/metainfo/magnet-v2.go @@ -7,9 +7,9 @@ import ( "net/url" "strings" + g "github.com/anacrolix/generics" "github.com/multiformats/go-multihash" - g "github.com/anacrolix/generics" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" ) diff --git a/metainfo/magnet_test.go b/metainfo/magnet_test.go index ba13ac88..25475098 100644 --- a/metainfo/magnet_test.go +++ b/metainfo/magnet_test.go @@ -2,10 +2,10 @@ package metainfo import ( "encoding/hex" - "github.com/davecgh/go-spew/spew" - qt "github.com/frankban/quicktest" "testing" + "github.com/davecgh/go-spew/spew" + qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/metainfo/metainfo_test.go b/metainfo/metainfo_test.go index 09a88e50..9f0f9f5f 100644 --- a/metainfo/metainfo_test.go +++ b/metainfo/metainfo_test.go @@ -1,7 +1,6 @@ package metainfo import ( - "github.com/davecgh/go-spew/spew" "io" "os" "path" @@ -10,6 +9,7 @@ import ( "testing" "github.com/anacrolix/missinggo/v2" + "github.com/davecgh/go-spew/spew" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/peerconn.go b/peerconn.go index ded9b483..59a31ad4 100644 --- a/peerconn.go +++ b/peerconn.go @@ -36,6 +36,9 @@ import ( type PeerConn struct { Peer + // BEP 52 + v2 bool + // A string that should identify the PeerConn's net.Conn endpoints. The net.Conn could // be wrapping WebRTC, uTP, or TCP etc. Used in writing the conn status for peers. connString string diff --git a/peerconn_test.go b/peerconn_test.go index 70f9017b..7dc28bd1 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -29,7 +29,7 @@ func TestSendBitfieldThenHave(t *testing.T) { cl.initLogger() qtc := qt.New(t) c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) - c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) + c.setTorrent(cl.newTorrentForTesting()) err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}) qtc.Assert(err, qt.IsNil) r, w := io.Pipe() @@ -98,7 +98,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { }) cl.initLogger() ts := &torrentStorage{} - t := cl.newTorrent(metainfo.Hash{}, nil) + t := cl.newTorrentForTesting() t.initialPieceCheckDisabled = true require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, 20), diff --git a/pexconn_test.go b/pexconn_test.go index b8be73e8..02da7d0a 100644 --- a/pexconn_test.go +++ b/pexconn_test.go @@ -7,7 +7,6 @@ import ( "github.com/anacrolix/dht/v2/krpc" "github.com/stretchr/testify/require" - "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -15,7 +14,7 @@ func TestPexConnState(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() - torrent := cl.newTorrent(metainfo.Hash{}, nil) + torrent := cl.newTorrentForTesting() addr := &net.TCPAddr{IP: net.IPv6loopback, Port: 4747} c := cl.newConnection(nil, newConnectionOpts{ remoteAddr: addr, diff --git a/piece.go b/piece.go index 09696536..607e2f88 100644 --- a/piece.go +++ b/piece.go @@ -2,16 +2,16 @@ package torrent import ( "fmt" - g "github.com/anacrolix/generics" - infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "sync" "github.com/anacrolix/chansync" + g "github.com/anacrolix/generics" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" + infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" ) type Piece struct { @@ -48,7 +48,7 @@ type Piece struct { } func (p *Piece) String() string { - return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index) + return fmt.Sprintf("%s/%d", p.t.canonicalShortInfohash().HexString(), p.index) } func (p *Piece) Info() metainfo.Piece { diff --git a/reader.go b/reader.go index 4b20206c..039fd99c 100644 --- a/reader.go +++ b/reader.go @@ -261,8 +261,8 @@ func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, er } // TODO: Just reset pieces in the readahead window. This might help // prevent thrashing with small caches and file and piece priorities. - r.log(log.Fstr("error reading torrent %s piece %d offset %d, %d bytes: %v", - r.t.infoHash.HexString(), firstPieceIndex, firstPieceOffset, len(b1), err)) + r.log(log.Fstr("error reading piece %d offset %d, %d bytes: %v", + firstPieceIndex, firstPieceOffset, len(b1), err)) if !r.t.updatePieceCompletion(firstPieceIndex) { r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex)) } diff --git a/requesting.go b/requesting.go index 1440207d..b48fc79f 100644 --- a/requesting.go +++ b/requesting.go @@ -189,7 +189,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { input, t.getPieceRequestOrder(), func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { - if ih != t.infoHash { + if ih != *t.canonicalShortInfohash() { return } if !p.peerHasPiece(pieceIndex) { diff --git a/spec.go b/spec.go index 29d30adc..49a1104e 100644 --- a/spec.go +++ b/spec.go @@ -2,12 +2,13 @@ package torrent import ( "fmt" + g "github.com/anacrolix/generics" - infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/storage" + infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" ) // Specifies a new torrent for adding to a client, or additions to an existing Torrent. There are @@ -46,14 +47,15 @@ type TorrentSpec struct { } func TorrentSpecFromMagnetUri(uri string) (spec *TorrentSpec, err error) { - m, err := metainfo.ParseMagnetUri(uri) + m, err := metainfo.ParseMagnetV2Uri(uri) if err != nil { return } spec = &TorrentSpec{ Trackers: [][]string{m.Trackers}, DisplayName: m.DisplayName, - InfoHash: m.InfoHash, + InfoHash: m.InfoHash.UnwrapOrZeroValue(), + InfoHashV2: m.V2InfoHash, Webseeds: m.Params["ws"], Sources: append(m.Params["xs"], m.Params["as"]...), PeerAddrs: m.Params["x.pe"], // BEP 9 @@ -74,7 +76,7 @@ func TorrentSpecFromMetaInfoErr(mi *metainfo.MetaInfo) (*TorrentSpec, error) { if info.HasV2() { v2Infohash.Set(infohash_v2.HashBytes(mi.InfoBytes)) if !info.HasV1() { - v1Ih = v2Infohash.Value.ToShort() + v1Ih = *v2Infohash.Value.ToShort() } } diff --git a/t.go b/t.go index 83ca5a90..dda65d36 100644 --- a/t.go +++ b/t.go @@ -13,7 +13,7 @@ import ( // The Torrent's infohash. This is fixed and cannot change. It uniquely identifies a torrent. func (t *Torrent) InfoHash() metainfo.Hash { - return t.infoHash + return *t.canonicalShortInfohash() } // Returns a channel that is closed when the info (.Info()) for the torrent has become available. @@ -100,7 +100,7 @@ func (t *Torrent) Drop() { defer wg.Wait() t.cl.lock() defer t.cl.unlock() - err := t.cl.dropTorrent(t.infoHash, &wg) + err := t.cl.dropTorrent(*t.canonicalShortInfohash(), &wg) if err != nil { panic(err) } @@ -254,7 +254,7 @@ func (t *Torrent) DownloadAll() { func (t *Torrent) String() string { s := t.name() if s == "" { - return t.infoHash.HexString() + return t.canonicalShortInfohash().HexString() } else { return strconv.Quote(s) } diff --git a/test_test.go b/test_test.go index 6babc911..11ebb3b3 100644 --- a/test_test.go +++ b/test_test.go @@ -19,5 +19,5 @@ func newTestingClient(t testing.TB) *Client { } func (cl *Client) newTorrentForTesting() *Torrent { - return cl.newTorrent(metainfo.Hash{}, nil) + return cl.newTorrent(metainfo.Hash{1}, nil) } diff --git a/torrent.go b/torrent.go index f15459b5..3c925ae5 100644 --- a/torrent.go +++ b/torrent.go @@ -7,9 +7,6 @@ import ( "crypto/sha1" "errors" "fmt" - "github.com/anacrolix/torrent/merkle" - "github.com/anacrolix/torrent/types/infohash" - infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "hash" "io" "math/rand" @@ -36,10 +33,12 @@ import ( "github.com/anacrolix/sync" "github.com/pion/datachannel" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/check" "github.com/anacrolix/torrent/internal/nestedmaps" + "github.com/anacrolix/torrent/merkle" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" @@ -47,6 +46,8 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" typedRoaring "github.com/anacrolix/torrent/typed-roaring" + "github.com/anacrolix/torrent/types/infohash" + infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" "github.com/anacrolix/torrent/webseed" "github.com/anacrolix/torrent/webtorrent" ) @@ -68,7 +69,7 @@ type Torrent struct { closed chansync.SetOnce onClose []func() - infoHash metainfo.Hash + infoHash g.Option[metainfo.Hash] infoHashV2 g.Option[infohash_v2.T] pieces []Piece @@ -118,7 +119,7 @@ type Torrent struct { // Whether we want to know more peers. wantPeersEvent missinggo.Event // An announcer for each tracker URL. - trackerAnnouncers map[string]torrentTrackerAnnouncer + trackerAnnouncers map[torrentTrackerAnnouncerKey]torrentTrackerAnnouncer // How many times we've initiated a DHT announce. TODO: Move into stats. numDHTAnnounces int @@ -177,6 +178,11 @@ type Torrent struct { disableTriggers bool } +type torrentTrackerAnnouncerKey struct { + shortInfohash [20]byte + url string +} + type outgoingConnAttemptKey = *PeerInfo func (t *Torrent) length() int64 { @@ -496,7 +502,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { } if t.storageOpener != nil { var err error - t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash) + t.storage, err = t.storageOpener.OpenTorrent(info, *t.canonicalShortInfohash()) if err != nil { return fmt.Errorf("error opening torrent storage: %s", err) } @@ -515,7 +521,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey { return request_strategy.PieceRequestOrderKey{ - InfoHash: t.infoHash, + InfoHash: *t.canonicalShortInfohash(), Index: i, } } @@ -550,18 +556,23 @@ func (t *Torrent) onSetInfo() { // Called when metadata for a torrent becomes available. func (t *Torrent) setInfoBytesLocked(b []byte) error { - v2Hash := infohash_v2.HashBytes(b) - v1Hash := infohash.HashBytes(b) - v2Short := v2Hash.ToShort() - if v2Short != t.infoHash && v1Hash != t.infoHash { - return errors.New("info bytes have wrong hash") + if t.infoHash.Ok && infohash.HashBytes(b) != t.infoHash.Value { + return errors.New("info bytes have wrong v1 hash") + } + var v2Hash g.Option[infohash_v2.T] + if t.infoHashV2.Ok { + v2Hash.Set(infohash_v2.HashBytes(b)) + if v2Hash.Value != t.infoHashV2.Value { + return errors.New("info bytes have wrong v2 hash") + } } var info metainfo.Info if err := bencode.Unmarshal(b, &info); err != nil { return fmt.Errorf("error unmarshalling info bytes: %s", err) } - if info.HasV2() { - t.infoHashV2.Set(v2Hash) + if !t.infoHashV2.Ok && info.HasV2() { + v2Hash.Set(infohash_v2.HashBytes(b)) + t.infoHashV2.Set(v2Hash.Unwrap()) } t.metadataBytes = b t.metadataCompletedChunks = nil @@ -622,7 +633,7 @@ func (t *Torrent) name() string { if t.displayName != "" { return t.displayName } - return "infohash:" + t.infoHash.HexString() + return "infohash:" + t.canonicalShortInfohash().HexString() } func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) { @@ -738,7 +749,12 @@ func (psr PieceStateRun) String() (ret string) { } func (t *Torrent) writeStatus(w io.Writer) { - fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString()) + if t.infoHash.Ok { + fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.Value.HexString()) + } + if t.infoHashV2.Ok { + fmt.Fprintf(w, "Infohash v2: %s\n", t.infoHashV2.Value.HexString()) + } fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize()) if !t.haveInfo() { fmt.Fprintf(w, "Metadata have: ") @@ -1789,14 +1805,14 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { t.logRunHandshookConn(pc, false, log.Debug) } -func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer { - wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash) - // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for - // the same info hash before the old one is cleaned up. +func (t *Torrent) startWebsocketAnnouncer(u url.URL, shortInfohash [20]byte) torrentTrackerAnnouncer { + wtc, release := t.cl.websocketTrackers.Get(u.String(), shortInfohash) + // This needs to run before the Torrent is dropped from the Client, to prevent a new + // webtorrent.TrackerClient for the same info hash before the old one is cleaned up. t.onClose = append(t.onClose, release) wst := websocketTrackerStatus{u, wtc} go func() { - err := wtc.Announce(tracker.Started, t.infoHash) + err := wtc.Announce(tracker.Started, shortInfohash) if err != nil { t.logger.WithDefaultLevel(log.Warning).Printf( "error in initial announce to %q: %v", @@ -1826,7 +1842,20 @@ func (t *Torrent) startScrapingTracker(_url string) { t.startScrapingTracker(u.String()) return } - if _, ok := t.trackerAnnouncers[_url]; ok { + if t.infoHash.Ok { + t.startScrapingTrackerWithInfohash(u, _url, t.infoHash.Value) + } + if t.infoHashV2.Ok { + t.startScrapingTrackerWithInfohash(u, _url, *t.infoHashV2.Value.ToShort()) + } +} + +func (t *Torrent) startScrapingTrackerWithInfohash(u *url.URL, urlStr string, shortInfohash [20]byte) { + announcerKey := torrentTrackerAnnouncerKey{ + shortInfohash: shortInfohash, + url: urlStr, + } + if _, ok := t.trackerAnnouncers[announcerKey]; ok { return } sl := func() torrentTrackerAnnouncer { @@ -1835,7 +1864,7 @@ func (t *Torrent) startScrapingTracker(_url string) { if t.cl.config.DisableWebtorrent { return nil } - return t.startWebsocketAnnouncer(*u) + return t.startWebsocketAnnouncer(*u, shortInfohash) case "udp4": if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 { return nil @@ -1846,6 +1875,7 @@ func (t *Torrent) startScrapingTracker(_url string) { } } newAnnouncer := &trackerScraper{ + shortInfohash: shortInfohash, u: *u, t: t, lookupTrackerIp: t.cl.config.LookupTrackerIp, @@ -1856,10 +1886,10 @@ func (t *Torrent) startScrapingTracker(_url string) { if sl == nil { return } - if t.trackerAnnouncers == nil { - t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer) + g.MakeMapIfNil(&t.trackerAnnouncers) + if g.MapInsert(t.trackerAnnouncers, announcerKey, sl).Ok { + panic("tracker announcer already exists") } - t.trackerAnnouncers[_url] = sl } // Adds and starts tracker scrapers for tracker URLs that aren't already @@ -1878,21 +1908,26 @@ func (t *Torrent) startMissingTrackerScrapers() { // Returns an AnnounceRequest with fields filled out to defaults and current // values. -func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest { +func (t *Torrent) announceRequest( + event tracker.AnnounceEvent, + shortInfohash [20]byte, +) tracker.AnnounceRequest { // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's // dependent on the network in use. return tracker.AnnounceRequest{ Event: event, NumWant: func() int32 { if t.wantPeers() && len(t.cl.dialers) > 0 { - return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764 + // Windozer has UDP packet limit. See: + // https://github.com/anacrolix/torrent/issues/764 + return 200 } else { return 0 } }(), Port: uint16(t.cl.incomingPeerPort()), PeerId: t.cl.peerID, - InfoHash: t.infoHash, + InfoHash: shortInfohash, Key: t.cl.announceKey(), // The following are vaguely described in BEP 3. @@ -1931,22 +1966,62 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) { } // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the -// announce ends. stop will force the announce to end. +// announce ends. stop will force the announce to end. This interface is really old-school, and +// calls a private one that is much more modern. Both v1 and v2 info hashes are announced if they +// exist. func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) { - ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true) - if err != nil { - return + var ihs [][20]byte + t.cl.lock() + t.eachShortInfohash(func(short [20]byte) { + ihs = append(ihs, short) + }) + t.cl.unlock() + ctx, stop := context.WithCancel(context.Background()) + eg, ctx := errgroup.WithContext(ctx) + for _, ih := range ihs { + var ann DhtAnnounce + ann, err = s.Announce(ih, t.cl.incomingPeerPort(), true) + if err != nil { + stop() + return + } + eg.Go(func() error { + return t.dhtAnnounceConsumer(ctx, ann) + }) } _done := make(chan struct{}) done = _done - stop = ps.Close go func() { - t.consumeDhtAnnouncePeers(ps.Peers()) - close(_done) + defer stop() + defer close(_done) + // Won't this race? + err = eg.Wait() }() return } +// Announce using the provided DHT server. Peers are consumed automatically. done is closed when the +// announce ends. stop will force the announce to end. +func (t *Torrent) dhtAnnounceConsumer( + ctx context.Context, + ps DhtAnnounce, +) ( + err error, +) { + defer ps.Close() + done := make(chan struct{}) + go func() { + defer close(done) + t.consumeDhtAnnouncePeers(ps.Peers()) + }() + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-done: + return nil + } +} + func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error { _, stop, err := t.AnnounceToDht(s) if err != nil { @@ -3015,3 +3090,19 @@ func (t *Torrent) getDialTimeoutUnlocked() time.Duration { defer cl.rUnlock() return t.dialTimeout() } + +func (t *Torrent) canonicalShortInfohash() *infohash.T { + if t.infoHash.Ok { + return &t.infoHash.Value + } + return t.infoHashV2.UnwrapPtr().ToShort() +} + +func (t *Torrent) eachShortInfohash(each func(short [20]byte)) { + if t.infoHash.Ok { + each(t.infoHash.Value) + } + if t.infoHashV2.Ok { + each(*t.infoHashV2.Value.ToShort()) + } +} diff --git a/torrent_test.go b/torrent_test.go index 808947e9..15c2a4fc 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -71,8 +71,10 @@ func TestAppendToCopySlice(t *testing.T) { func TestTorrentString(t *testing.T) { tor := &Torrent{} + tor.infoHash.Ok = true + tor.infoHash.Value[0] = 1 s := tor.InfoHash().HexString() - if s != "0000000000000000000000000000000000000000" { + if s != "0100000000000000000000000000000000000000" { t.FailNow() } } @@ -87,7 +89,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) { ) cl := &Client{config: TestingConfig(b)} cl.initLogger() - t := cl.newTorrent(metainfo.Hash{}, nil) + t := cl.newTorrentForTesting() require.NoError(b, t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*numPieces), PieceLength: pieceLength, diff --git a/tracker_scraper.go b/tracker_scraper.go index 863838ac..0668c912 100644 --- a/tracker_scraper.go +++ b/tracker_scraper.go @@ -18,6 +18,7 @@ import ( // Announces a torrent to a tracker at regular intervals, when peers are // required. type trackerScraper struct { + shortInfohash [20]byte u url.URL t *Torrent lastAnnounce trackerAnnounceResult @@ -117,7 +118,10 @@ func (me *trackerScraper) trackerUrl(ip net.IP) string { // Return how long to wait before trying again. For most errors, we return 5 // minutes, a relatively quick turn around for DNS changes. -func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEvent) (ret trackerAnnounceResult) { +func (me *trackerScraper) announce( + ctx context.Context, + event tracker.AnnounceEvent, +) (ret trackerAnnounceResult) { defer func() { ret.Completed = time.Now() }() @@ -146,7 +150,7 @@ func (me *trackerScraper) announce(ctx context.Context, event tracker.AnnounceEv return } me.t.cl.rLock() - req := me.t.announceRequest(event) + req := me.t.announceRequest(event, me.shortInfohash) me.t.cl.rUnlock() // The default timeout works well as backpressure on concurrent access to the tracker. Since // we're passing our own Context now, we will include that timeout ourselves to maintain similar diff --git a/types/infohash-v2/infohash-v2.go b/types/infohash-v2/infohash-v2.go index 02ddd1d8..c9d00732 100644 --- a/types/infohash-v2/infohash-v2.go +++ b/types/infohash-v2/infohash-v2.go @@ -5,6 +5,7 @@ import ( "encoding" "encoding/hex" "fmt" + "unsafe" "github.com/multiformats/go-multihash" @@ -56,9 +57,8 @@ func (t *T) FromHexString(s string) (err error) { } // Truncates the hash to 20 bytes for use in auxiliary interfaces, like DHT and trackers. -func (t *T) ToShort() (short infohash.T) { - copy(short[:], t[:]) - return +func (t *T) ToShort() (short *infohash.T) { + return (*infohash.T)(unsafe.Pointer(t)) } var ( diff --git a/types/infohash/infohash.go b/types/infohash/infohash.go index c90f7451..30f68a24 100644 --- a/types/infohash/infohash.go +++ b/types/infohash/infohash.go @@ -52,6 +52,10 @@ func (t *T) FromHexString(s string) (err error) { return } +func (t *T) IsZero() bool { + return *t == T{} +} + var ( _ encoding.TextUnmarshaler = (*T)(nil) _ encoding.TextMarshaler = T{} -- 2.48.1