client.go | 82 +++++++++++++++++++++++++++++++---------------------- request-strategy-impls.go | 5 +++-- request-strategy/order.go | 4 ++-- t.go | 2 +- torrent.go | 63 ++++++++++++++++++++++++++++++++++++++++------------- diff --git a/client.go b/client.go index 4b8e9df1ff58b28b5048952d33856672638523ea..64b053fce260741095b2da51cc4bde2a9cb05d8d 100644 --- a/client.go +++ b/client.go @@ -78,7 +78,12 @@ // include ourselves if we end up trying to connect to our own address // through legitimate channels. dopplegangerAddrs map[string]struct{} badPeerIPs map[netip.Addr]struct{} - torrents map[InfoHash]*Torrent + // All Torrents once. + torrents map[*Torrent]struct{} + // All Torrents by their short infohashes (v1 if valid, and truncated v2 if valid). Unless the + // info has been obtained, there's no knowing if an infohash belongs to v1 or v2. + torrentsByShortHash map[InfoHash]*Torrent + pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder acceptLimiter map[ipStr]int @@ -200,7 +205,9 @@ // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil. func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg g.MakeMap(&cl.dopplegangerAddrs) - cl.torrents = make(map[metainfo.Hash]*Torrent) + g.MakeMap(&cl.torrentsByShortHash) + g.MakeMap(&cl.torrents) + cl.torrentsByShortHash = make(map[metainfo.Hash]*Torrent) cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist @@ -313,7 +320,7 @@ tracker.AnnounceRequest, error, ) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if !ok { return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client") } @@ -326,7 +333,7 @@ DialContext: cl.config.TrackerDialContext, OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[dcc.InfoHash] + t, ok := cl.torrentsByShortHash[dcc.InfoHash] if !ok { cl.logger.WithDefaultLevel(log.Warning).Printf( "got webrtc conn for unloaded torrent with infohash %x", @@ -352,7 +359,7 @@ func (cl *Client) AddDialer(d Dialer) { cl.lock() defer cl.unlock() cl.dialers = append(cl.dialers, d) - for _, t := range cl.torrents { + for t := range cl.torrents { t.openNewConns() } } @@ -448,7 +455,7 @@ // Stops the client. All connections to peers are closed and all activity will come to a halt. func (cl *Client) Close() (errs []error) { var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() - for _, t := range cl.torrents { + for t := range cl.torrents { err := t.close(&closeGroup) if err != nil { errs = append(errs, err) @@ -480,7 +487,7 @@ func (cl *Client) wantConns() bool { if cl.config.AlwaysWantConns { return true } - for _, t := range cl.torrents { + for t := range cl.torrents { if t.wantIncomingConns() { return true } @@ -609,12 +616,8 @@ // Returns a handle to the given torrent, if it's present in the client. func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) { cl.rLock() defer cl.rUnlock() - t, ok = cl.torrents[ih] + t, ok = cl.torrentsByShortHash[ih] return -} - -func (cl *Client) torrent(ih metainfo.Hash) *Torrent { - return cl.torrents[ih] } type DialResult struct { @@ -686,13 +689,13 @@ cl.numHalfOpen-- if cl.numHalfOpen < 0 { panic("should not be possible") } - for _, t := range cl.torrents { + for t := range cl.torrents { t.openNewConns() } } func (cl *Client) countHalfOpenFromTorrents() (count int) { - for _, t := range cl.torrents { + for t := range cl.torrents { count += t.numHalfOpenAttempts() } return @@ -946,18 +949,18 @@ cl.rLock() defer cl.rUnlock() if false { // Emulate the bug from #114 var firstIh InfoHash - for ih := range cl.torrents { + for ih := range cl.torrentsByShortHash { firstIh = ih break } - for range cl.torrents { + for range cl.torrentsByShortHash { if !f(firstIh[:]) { break } } return } - for ih := range cl.torrents { + for ih := range cl.torrentsByShortHash { if !f(ih[:]) { break } @@ -975,7 +978,12 @@ // Do encryption and bittorrent handshakes as receiver. func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) { defer perf.ScopeTimerErr(&err)() var rw io.ReadWriter - rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector) + rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption( + c.rw(), + cl.handshakeReceiverSecretKeys(), + cl.config.HeaderObfuscationPolicy, + cl.config.CryptoSelector, + ) c.setRW(rw) if err == nil || err == mse.ErrNoSecretKeyMatch { if c.headerEncrypted { @@ -1001,7 +1009,7 @@ if err != nil { return nil, fmt.Errorf("during bt handshake: %w", err) } cl.lock() - t = cl.torrents[ih] + t = cl.torrentsByShortHash[ih] cl.unlock() return } @@ -1368,7 +1376,7 @@ specStorage storage.ClientImpl, ) (t *Torrent, new bool) { cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if ok { return } @@ -1380,7 +1388,8 @@ if cl.config.PeriodicallyAnnounceTorrentsToDht { go t.dhtAnnouncer(s) } }) - cl.torrents[infoHash] = t + cl.torrentsByShortHash[infoHash] = t + cl.torrents[t] = struct{}{} cl.clearAcceptLimits() t.updateWantPeersEvent() // Tickle Client.waitAccept, new torrent may want conns. @@ -1394,10 +1403,16 @@ func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { infoHash := opts.InfoHash cl.lock() defer cl.unlock() - t, ok := cl.torrents[infoHash] + t, ok := cl.torrentsByShortHash[infoHash] if ok { return } + if opts.InfoHashV2.Ok { + t, ok = cl.torrentsByShortHash[*opts.InfoHashV2.Value.ToShort()] + if ok { + return + } + } new = true t = cl.newTorrentOpt(opts) @@ -1406,7 +1421,8 @@ if cl.config.PeriodicallyAnnounceTorrentsToDht { go t.dhtAnnouncer(s) } }) - cl.torrents[infoHash] = t + cl.torrentsByShortHash[infoHash] = t + cl.torrents[t] = struct{}{} t.setInfoBytesLocked(opts.InfoBytes) cl.clearAcceptLimits() t.updateWantPeersEvent() @@ -1484,19 +1500,17 @@ t.dataUploadDisallowed = spec.DisallowDataUpload return t.AddPieceLayers(spec.PieceLayers) } -func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) { - t, ok := cl.torrents[infoHash] - if !ok { - err = fmt.Errorf("no such torrent") - return - } +func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) { + t.eachShortInfohash(func(short [20]byte) { + delete(cl.torrentsByShortHash, short) + }) err = t.close(wg) - delete(cl.torrents, infoHash) + delete(cl.torrents, t) return } func (cl *Client) allTorrentsCompleted() bool { - for _, t := range cl.torrents { + for t := range cl.torrents { if !t.haveInfo() { return false } @@ -1529,7 +1543,7 @@ return cl.torrentsAsSlice() } func (cl *Client) torrentsAsSlice() (ret []*Torrent) { - for _, t := range cl.torrents { + for t := range cl.torrents { ret = append(ret, t) } return @@ -1593,7 +1607,7 @@ if !ok { panic(ip) } g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{}) - for _, t := range cl.torrents { + for t := range cl.torrents { t.iterPeers(func(p *Peer) { if p.remoteIp().Equal(ip) { t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip) @@ -1662,7 +1676,7 @@ func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portOk bool) { cl.lock() defer cl.unlock() - t := cl.torrent(ih) + t := cl.torrentsByShortHash[ih] if t == nil { return } diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 661769f3c1a94d5767397444d455d273785e8726..43acca13433242faa736dfdf7c416320d68bdccf 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -59,8 +59,9 @@ } } else { return requestStrategyInputMultiTorrent{ requestStrategyInputCommon: cl.getRequestStrategyInputCommon(), - torrents: cl.torrents, - capFunc: primaryTorrent.storage.Capacity, + // TODO: Check this is an appropriate key + torrents: cl.torrentsByShortHash, + capFunc: primaryTorrent.storage.Capacity, } } } diff --git a/request-strategy/order.go b/request-strategy/order.go index 67adc8587b9c5a8a0d1c2a2afb06bebd089ad5b8..477c44de51ebd1f391eeb8a11af0274cbaa07922 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -53,8 +53,8 @@ } var allTorrentsUnverifiedBytes int64 pro.tree.Scan(func(_i pieceRequestOrderItem) bool { ih := _i.key.InfoHash - var t Torrent = input.Torrent(ih) - var piece Piece = t.Piece(_i.key.Index) + var t = input.Torrent(ih) + var piece = t.Piece(_i.key.Index) pieceLength := t.PieceLength() if storageLeft != nil { if *storageLeft < pieceLength { diff --git a/t.go b/t.go index dda65d363ec6020e57de8d7680b06a0b403de178..836ebc00749a296f69fb1128bddbe8303b7547ff 100644 --- a/t.go +++ b/t.go @@ -100,7 +100,7 @@ var wg sync.WaitGroup defer wg.Wait() t.cl.lock() defer t.cl.unlock() - err := t.cl.dropTorrent(*t.canonicalShortInfohash(), &wg) + err := t.cl.dropTorrent(t, &wg) if err != nil { panic(err) } diff --git a/torrent.go b/torrent.go index a361c4edd64037bfda43b053e44bbc3614698404..b63b09fa2726045bb04c3e69dfac02864dd076ab 100644 --- a/torrent.go +++ b/torrent.go @@ -560,25 +560,58 @@ p.updateRequests("onSetInfo") }) } -// Called when metadata for a torrent becomes available. -func (t *Torrent) setInfoBytesLocked(b []byte) error { - if t.infoHash.Ok && infohash.HashBytes(b) != t.infoHash.Value { - return errors.New("info bytes have wrong v1 hash") - } - var v2Hash g.Option[infohash_v2.T] - if t.infoHashV2.Ok { - v2Hash.Set(infohash_v2.HashBytes(b)) - if v2Hash.Value != t.infoHashV2.Value { - return errors.New("info bytes have wrong v2 hash") +// Checks the info bytes hash to expected values. Fills in any missing infohashes. +func (t *Torrent) hashInfoBytes(b []byte, info *metainfo.Info) error { + v1Hash := infohash.HashBytes(b) + v2Hash := infohash_v2.HashBytes(b) + cl := t.cl + if t.infoHash.Ok && !t.infoHashV2.Ok { + if v1Hash == t.infoHash.Value { + if info.HasV2() { + t.infoHashV2.Set(v2Hash) + cl.torrentsByShortHash[*v2Hash.ToShort()] = t + } + } else if *v2Hash.ToShort() == t.infoHash.Value { + if !info.HasV2() { + return errors.New("invalid v2 info") + } + t.infoHashV2.Set(v2Hash) + if info.HasV1() { + cl.torrentsByShortHash[v1Hash] = t + } + } + } else if t.infoHash.Ok && t.infoHashV2.Ok { + if v1Hash != t.infoHash.Value { + return errors.New("incorrect v1 infohash") + } + if v2Hash != t.infoHashV2.Value { + return errors.New("incorrect v2 infohash") + } + } else if !t.infoHash.Ok && t.infoHashV2.Ok { + if v2Hash != t.infoHashV2.Value { + return errors.New("incorrect v2 infohash") + } + if info.HasV1() { + t.infoHash.Set(v1Hash) + cl.torrentsByShortHash[v1Hash] = t } + } else { + panic("no expected infohashes") } + return nil +} + +// Called when metadata for a torrent becomes available. +func (t *Torrent) setInfoBytesLocked(b []byte) (err error) { var info metainfo.Info - if err := bencode.Unmarshal(b, &info); err != nil { - return fmt.Errorf("error unmarshalling info bytes: %s", err) + err = bencode.Unmarshal(b, &info) + if err != nil { + err = fmt.Errorf("unmarshalling info bytes: %w", err) + return } - if !t.infoHashV2.Ok && info.HasV2() { - v2Hash.Set(infohash_v2.HashBytes(b)) - t.infoHashV2.Set(v2Hash.Unwrap()) + err = t.hashInfoBytes(b, &info) + if err != nil { + return } t.metadataBytes = b t.metadataCompletedChunks = nil