From: Matt Joiner Date: Wed, 6 Mar 2024 07:12:49 +0000 (+1100) Subject: Handle v2 Torrents added by short infohash only X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=b119e2016e821021ece732498b96baa519673b0c;p=btrtrc.git Handle v2 Torrents added by short infohash only --- diff --git a/client.go b/client.go index 4b8e9df1..64b053fc 100644 --- a/client.go +++ b/client.go @@ -78,7 +78,12 @@ type Client struct { // through legitimate channels. dopplegangerAddrs map[string]struct{} badPeerIPs map[netip.Addr]struct{} - torrents map[InfoHash]*Torrent + // All Torrents once. + torrents map[*Torrent]struct{} + // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the + // info has been obtained, there's no knowing if an infohash belongs to v1 or v2. + torrentsByShortHash map[InfoHash]*Torrent + pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder acceptLimiter map[ipStr]int @@ -200,7 +205,9 @@ func (cl *Client) announceKey() int32 { func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg g.MakeMap(&cl.dopplegangerAddrs) - cl.torrents = make(map[metainfo.Hash]*Torrent) + g.MakeMap(&cl.torrentsByShortHash) + g.MakeMap(&cl.torrents) + cl.torrentsByShortHash = make(map[metainfo.Hash]*Torrent) cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist @@ -313,7 +320,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { ) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if !ok { return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") } @@ -326,7 +333,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[dcc.InfoHash] + t, ok := cl.torrentsByShortHash[dcc.InfoHash] if !ok { cl.logger.WithDefaultLevel(log.Warning).Printf( "got webrtc conn for unloaded torrent with infohash %x", @@ -352,7 +359,7 @@ func (cl *Client) AddDialer(d Dialer) { cl.lock() defer cl.unlock() cl.dialers = append(cl.dialers, d) - for _, t := range cl.torrents { + for t := range cl.torrents { t.openNewConns() } } @@ -448,7 +455,7 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) { func (cl *Client) Close() (errs []error) { var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() - for _, t := range cl.torrents { + for t := range cl.torrents { err := t.close(&closeGroup) if err != nil { errs = append(errs, err) @@ -480,7 +487,7 @@ func (cl *Client) wantConns() bool { if cl.config.AlwaysWantConns { return true } - for _, t := range cl.torrents { + for t := range cl.torrents { if t.wantIncomingConns() { return true } @@ -609,14 +616,10 @@ func (cl *Client) incomingConnection(nc net.Conn) { func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) { cl.rLock() defer cl.rUnlock() - t, ok = cl.torrents[ih] + t, ok = cl.torrentsByShortHash[ih] return } -func (cl *Client) torrent(ih metainfo.Hash) *Torrent { - return cl.torrents[ih] -} - type DialResult struct { Conn net.Conn Dialer Dialer @@ -686,13 +689,13 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingC if cl.numHalfOpen < 0 { panic("should not be possible") } - for _, t := range cl.torrents { + for t := range cl.torrents { t.openNewConns() } } func (cl *Client) countHalfOpenFromTorrents() (count int) { - for _, t := range cl.torrents { + for t := range cl.torrents { count += t.numHalfOpenAttempts() } return @@ -946,18 +949,18 @@ func (cl *Client) forSkeys(f func([]byte) bool) { defer cl.rUnlock() if false { // Emulate the bug from #114 var firstIh InfoHash - for ih := range cl.torrents { + for ih := range cl.torrentsByShortHash { firstIh = ih break } - for range cl.torrents { + for range cl.torrentsByShortHash { if !f(firstIh[:]) { break } } return } - for ih := range cl.torrents { + for ih := range cl.torrentsByShortHash { if !f(ih[:]) { break } @@ -975,7 +978,12 @@ func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter { func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) { defer perf.ScopeTimerErr(&err)() var rw io.ReadWriter - rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector) + rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption( + c.rw(), + cl.handshakeReceiverSecretKeys(), + cl.config.HeaderObfuscationPolicy, + cl.config.CryptoSelector, + ) c.setRW(rw) if err == nil || err == mse.ErrNoSecretKeyMatch { if c.headerEncrypted { @@ -1001,7 +1009,7 @@ func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) { return nil, fmt.Errorf("during bt handshake: %w", err) } cl.lock() - t = cl.torrents[ih] + t = cl.torrentsByShortHash[ih] cl.unlock() return } @@ -1368,7 +1376,7 @@ func (cl *Client) AddTorrentInfoHashWithStorage( ) (t *Torrent, new bool) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if ok { return } @@ -1380,7 +1388,8 @@ func (cl *Client) AddTorrentInfoHashWithStorage( go t.dhtAnnouncer(s) } }) - cl.torrents[infoHash] = t + cl.torrentsByShortHash[infoHash] = t + cl.torrents[t] = struct{}{} cl.clearAcceptLimits() t.updateWantPeersEvent() // Tickle Client.waitAccept, new torrent may want conns. @@ -1394,10 +1403,16 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { infoHash := opts.InfoHash cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if ok { return } + if opts.InfoHashV2.Ok { + t, ok = cl.torrentsByShortHash[*opts.InfoHashV2.Value.ToShort()] + if ok { + return + } + } new = true t = cl.newTorrentOpt(opts) @@ -1406,7 +1421,8 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { go t.dhtAnnouncer(s) } }) - cl.torrents[infoHash] = t + cl.torrentsByShortHash[infoHash] = t + cl.torrents[t] = struct{}{} t.setInfoBytesLocked(opts.InfoBytes) cl.clearAcceptLimits() t.updateWantPeersEvent() @@ -1484,19 +1500,17 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { return t.AddPieceLayers(spec.PieceLayers) } -func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) { - t, ok := cl.torrents[infoHash] - if !ok { - err = fmt.Errorf("no such torrent") - return - } +func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) { + t.eachShortInfohash(func(short [20]byte) { + delete(cl.torrentsByShortHash, short) + }) err = t.close(wg) - delete(cl.torrents, infoHash) + delete(cl.torrents, t) return } func (cl *Client) allTorrentsCompleted() bool { - for _, t := range cl.torrents { + for t := range cl.torrents { if !t.haveInfo() { return false } @@ -1529,7 +1543,7 @@ func (cl *Client) Torrents() []*Torrent { } func (cl *Client) torrentsAsSlice() (ret []*Torrent) { - for _, t := range cl.torrents { + for t := range cl.torrents { ret = append(ret, t) } return @@ -1593,7 +1607,7 @@ func (cl *Client) banPeerIP(ip net.IP) { panic(ip) } g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{}) - for _, t := range cl.torrents { + for t := range cl.torrents { t.iterPeers(func(p *Peer) { if p.remoteIp().Equal(ip) { t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip) @@ -1662,7 +1676,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) { cl.lock() defer cl.unlock() - t := cl.torrent(ih) + t := cl.torrentsByShortHash[ih] if t == nil { return } diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 661769f3..43acca13 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -59,8 +59,9 @@ func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input reques } else { return requestStrategyInputMultiTorrent{ requestStrategyInputCommon: cl.getRequestStrategyInputCommon(), - torrents: cl.torrents, - capFunc: primaryTorrent.storage.Capacity, + // TODO: Check this is an appropriate key + torrents: cl.torrentsByShortHash, + capFunc: primaryTorrent.storage.Capacity, } } } diff --git a/request-strategy/order.go b/request-strategy/order.go index 67adc858..477c44de 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -53,8 +53,8 @@ func GetRequestablePieces( var allTorrentsUnverifiedBytes int64 pro.tree.Scan(func(_i pieceRequestOrderItem) bool { ih := _i.key.InfoHash - var t Torrent = input.Torrent(ih) - var piece Piece = t.Piece(_i.key.Index) + var t = input.Torrent(ih) + var piece = t.Piece(_i.key.Index) pieceLength := t.PieceLength() if storageLeft != nil { if *storageLeft < pieceLength { diff --git a/t.go b/t.go index dda65d36..836ebc00 100644 --- a/t.go +++ b/t.go @@ -100,7 +100,7 @@ func (t *Torrent) Drop() { defer wg.Wait() t.cl.lock() defer t.cl.unlock() - err := t.cl.dropTorrent(*t.canonicalShortInfohash(), &wg) + err := t.cl.dropTorrent(t, &wg) if err != nil { panic(err) } diff --git a/torrent.go b/torrent.go index a361c4ed..b63b09fa 100644 --- a/torrent.go +++ b/torrent.go @@ -560,25 +560,58 @@ func (t *Torrent) onSetInfo() { }) } -// Called when metadata for a torrent becomes available. -func (t *Torrent) setInfoBytesLocked(b []byte) error { - if t.infoHash.Ok && infohash.HashBytes(b) != t.infoHash.Value { - return errors.New("info bytes have wrong v1 hash") - } - var v2Hash g.Option[infohash_v2.T] - if t.infoHashV2.Ok { - v2Hash.Set(infohash_v2.HashBytes(b)) - if v2Hash.Value != t.infoHashV2.Value { - return errors.New("info bytes have wrong v2 hash") +// Checks the info bytes hash to expected values. Fills in any missing infohashes. +func (t *Torrent) hashInfoBytes(b []byte, info *metainfo.Info) error { + v1Hash := infohash.HashBytes(b) + v2Hash := infohash_v2.HashBytes(b) + cl := t.cl + if t.infoHash.Ok && !t.infoHashV2.Ok { + if v1Hash == t.infoHash.Value { + if info.HasV2() { + t.infoHashV2.Set(v2Hash) + cl.torrentsByShortHash[*v2Hash.ToShort()] = t + } + } else if *v2Hash.ToShort() == t.infoHash.Value { + if !info.HasV2() { + return errors.New("invalid v2 info") + } + t.infoHashV2.Set(v2Hash) + if info.HasV1() { + cl.torrentsByShortHash[v1Hash] = t + } + } + } else if t.infoHash.Ok && t.infoHashV2.Ok { + if v1Hash != t.infoHash.Value { + return errors.New("incorrect v1 infohash") + } + if v2Hash != t.infoHashV2.Value { + return errors.New("incorrect v2 infohash") + } + } else if !t.infoHash.Ok && t.infoHashV2.Ok { + if v2Hash != t.infoHashV2.Value { + return errors.New("incorrect v2 infohash") } + if info.HasV1() { + t.infoHash.Set(v1Hash) + cl.torrentsByShortHash[v1Hash] = t + } + } else { + panic("no expected infohashes") } + return nil +} + +// Called when metadata for a torrent becomes available. +func (t *Torrent) setInfoBytesLocked(b []byte) (err error) { var info metainfo.Info - if err := bencode.Unmarshal(b, &info); err != nil { - return fmt.Errorf("error unmarshalling info bytes: %s", err) + err = bencode.Unmarshal(b, &info) + if err != nil { + err = fmt.Errorf("unmarshalling info bytes: %w", err) + return } - if !t.infoHashV2.Ok && info.HasV2() { - v2Hash.Set(infohash_v2.HashBytes(b)) - t.infoHashV2.Set(v2Hash.Unwrap()) + err = t.hashInfoBytes(b, &info) + if err != nil { + return } t.metadataBytes = b t.metadataCompletedChunks = nil