X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=lib%2FPublicInbox%2FSearchIdx.pm;h=446cfb0d50afd8be723a24c4f6f5751de80ba548;hb=2f5bb422a75f38e9c2fdca5d526fd8832ca9de45;hp=1c107282e86da4a0def28967544b49b2e0082d23;hpb=3f2d9e30255824da52bca422d07e97e0859500e2;p=public-inbox.git diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 1c107282..446cfb0d 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -9,9 +9,8 @@ package PublicInbox::SearchIdx; use strict; use warnings; -use Fcntl qw(:flock :DEFAULT); +use base qw(PublicInbox::Search PublicInbox::Lock); use PublicInbox::MIME; -use base qw(PublicInbox::Search); use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/; use PublicInbox::MsgIter; use Carp qw(croak); @@ -19,7 +18,6 @@ use POSIX qw(strftime); require PublicInbox::Git; use constant { - MAX_MID_SIZE => 244, # max term size (Xapian limitation) - length('Q') PERM_UMASK => 0, OLD_PERM_GROUP => 1, OLD_PERM_EVERYBODY => 2, @@ -97,7 +95,7 @@ sub _xdb_release { my ($self) = @_; my $xdb = delete $self->{xdb} or croak 'not acquired'; $xdb->close; - _lock_release($self) if $self->{creat}; + $self->lock_release if $self->{creat}; undef; } @@ -108,33 +106,13 @@ sub _xdb_acquire { my $flag = Search::Xapian::DB_OPEN; if ($self->{creat}) { require File::Path; - _lock_acquire($self); + $self->lock_acquire; File::Path::mkpath($dir); $flag = Search::Xapian::DB_CREATE_OR_OPEN; } $self->{xdb} = Search::Xapian::WritableDatabase->new($dir, $flag); } -# we only acquire the flock if creating or reindexing; -# PublicInbox::Import already has the lock on its own. -sub _lock_acquire { - my ($self) = @_; - croak 'already locked' if $self->{lockfh}; - my $lock_path = $self->{lock_path} or return; - sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or - die "failed to open lock $lock_path: $!\n"; - flock($lockfh, LOCK_EX) or die "lock failed: $!\n"; - $self->{lockfh} = $lockfh; -} - -sub _lock_release { - my ($self) = @_; - return unless $self->{lock_path}; - my $lockfh = delete $self->{lockfh} or croak 'not locked'; - flock($lockfh, LOCK_UN) or die "unlock failed: $!\n"; - close $lockfh or die "close failed: $!\n"; -} - sub add_val ($$$) { my ($doc, $col, $num) = @_; $num = Search::Xapian::sortable_serialise($num); @@ -156,7 +134,9 @@ sub add_values ($$) { my $lines = $values->[PublicInbox::Search::LINES]; add_val($doc, PublicInbox::Search::LINES, $lines); - my $yyyymmdd = strftime('%Y%m%d', gmtime($ts)); + my $ds = $values->[PublicInbox::Search::DS]; + add_val($doc, PublicInbox::Search::DS, $ds); + my $yyyymmdd = strftime('%Y%m%d', gmtime($ds)); add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd); } @@ -175,14 +155,19 @@ sub index_users ($$) { $tg->increase_termpos; } -sub index_text_inc ($$$) { - my ($tg, $text, $pfx) = @_; +sub index_diff_inc ($$$$) { + my ($tg, $text, $pfx, $xnq) = @_; + if (@$xnq) { + $tg->index_text(join("\n", @$xnq), 1, 'XNQ'); + $tg->increase_termpos; + @$xnq = (); + } $tg->index_text($text, 1, $pfx); $tg->increase_termpos; } sub index_old_diff_fn { - my ($tg, $seen, $fa, $fb) = @_; + my ($tg, $seen, $fa, $fb, $xnq) = @_; # no renames or space support for traditional diffs, # find the number of leading common paths to strip: @@ -192,7 +177,9 @@ sub index_old_diff_fn { $fa = join('/', @fa); $fb = join('/', @fb); if ($fa eq $fb) { - index_text_inc($tg, $fa,'XDFN') unless $seen->{$fa}++; + unless ($seen->{$fa}++) { + index_diff_inc($tg, $fa, 'XDFN', $xnq); + } return 1; } shift @fa; @@ -205,40 +192,46 @@ sub index_diff ($$$) { my ($tg, $lines, $doc) = @_; my %seen; my $in_diff; + my @xnq; + my $xnq = \@xnq; foreach (@$lines) { if ($in_diff && s/^ //) { # diff context - index_text_inc($tg, $_, 'XDFCTX'); + index_diff_inc($tg, $_, 'XDFCTX', $xnq); } elsif (/^-- $/) { # email signature begins $in_diff = undef; } elsif (m!^diff --git ("?a/.+) ("?b/.+)\z!) { my ($fa, $fb) = ($1, $2); my $fn = (split('/', git_unquote($fa), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $fn = (split('/', git_unquote($fb), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; # traditional diff: } elsif (m/^diff -(.+) (\S+) (\S+)$/) { my ($opt, $fa, $fb) = ($1, $2, $3); + push @xnq, $_; # only support unified: next unless $opt =~ /[uU]/; - $in_diff = index_old_diff_fn($tg, \%seen, $fa, $fb); + $in_diff = index_old_diff_fn($tg, \%seen, $fa, $fb, + $xnq); } elsif (m!^--- ("?a/.+)!) { my $fn = (split('/', git_unquote($1), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; } elsif (m!^\+\+\+ ("?b/.+)!) { my $fn = (split('/', git_unquote($1), 2))[1]; - index_text_inc($tg, $fn, 'XDFN') unless $seen{$fn}++; + $seen{$fn}++ or index_diff_inc($tg, $fn, 'XDFN', $xnq); $in_diff = 1; } elsif (/^--- (\S+)/) { $in_diff = $1; + push @xnq, $_; } elsif (defined $in_diff && /^\+\+\+ (\S+)/) { - $in_diff = index_old_diff_fn($tg, \%seen, $in_diff, $1); + $in_diff = index_old_diff_fn($tg, \%seen, $in_diff, $1, + $xnq); } elsif ($in_diff && s/^\+//) { # diff added - index_text_inc($tg, $_, 'XDFB'); + index_diff_inc($tg, $_, 'XDFB', $xnq); } elsif ($in_diff && s/^-//) { # diff removed - index_text_inc($tg, $_, 'XDFA'); + index_diff_inc($tg, $_, 'XDFA', $xnq); } elsif (m!^index ([a-f0-9]+)\.\.([a-f0-9]+)!) { my ($ba, $bb) = ($1, $2); index_git_blob_id($doc, 'XDFPRE', $ba); @@ -248,39 +241,50 @@ sub index_diff ($$$) { # traditional diff w/o -p } elsif (/^@@ (?:\S+) (?:\S+) @@\s*(\S+.*)$/) { # hunk header context - index_text_inc($tg, $1, 'XDFHH'); + index_diff_inc($tg, $1, 'XDFHH', $xnq); # ignore the following lines: - } elsif (/^(?:dis)similarity index/) { - } elsif (/^(?:old|new) mode/) { - } elsif (/^(?:deleted|new) file mode/) { - } elsif (/^(?:copy|rename) (?:from|to) /) { - } elsif (/^(?:dis)?similarity index /) { - } elsif (/^\\ No newline at end of file/) { - } elsif (/^Binary files .* differ/) { + } elsif (/^(?:dis)similarity index/ || + /^(?:old|new) mode/ || + /^(?:deleted|new) file mode/ || + /^(?:copy|rename) (?:from|to) / || + /^(?:dis)?similarity index / || + /^\\ No newline at end of file/ || + /^Binary files .* differ/) { + push @xnq, $_; } elsif ($_ eq '') { $in_diff = undef; } else { + push @xnq, $_; warn "non-diff line: $_\n" if DEBUG && $_ ne ''; $in_diff = undef; } } + + $tg->index_text(join("\n", @xnq), 1, 'XNQ'); + $tg->increase_termpos; } sub index_body ($$$) { my ($tg, $lines, $doc) = @_; my $txt = join("\n", @$lines); - $tg->index_text($txt, !!$doc, $doc ? 'XNQ' : 'XQUOT'); - $tg->increase_termpos; - # does it look like a diff? - if ($doc && $txt =~ /^(?:diff|---|\+\+\+) /ms) { - $txt = undef; - index_diff($tg, $lines, $doc); + if ($doc) { + # does it look like a diff? + if ($txt =~ /^(?:diff|---|\+\+\+) /ms) { + $txt = undef; + index_diff($tg, $lines, $doc); + } else { + $tg->index_text($txt, 1, 'XNQ'); + } + } else { + $tg->index_text($txt, 0, 'XQUOT'); } + $tg->increase_termpos; @$lines = (); } sub add_message { - my ($self, $mime, $bytes, $num, $blob) = @_; # mime = Email::MIME object + # mime = Email::MIME object + my ($self, $mime, $bytes, $num, $oid, $mid0) = @_; my $doc_id; my $mids = mids($mime->header_obj); my $skel = $self->{skeleton}; @@ -288,22 +292,15 @@ sub add_message { eval { my $smsg = PublicInbox::SearchMsg->new($mime); my $doc = $smsg->{doc}; - foreach my $mid (@$mids) { - # FIXME: may be abused to prevent archival - length($mid) > MAX_MID_SIZE and - die 'Message-ID too long'; - $doc->add_term('Q' . $mid); - } my $subj = $smsg->subject; my $xpath; if ($subj ne '') { $xpath = $self->subject_path($subj); $xpath = id_compress($xpath); - $doc->add_term('XPATH' . $xpath); } my $lines = $mime->body_raw =~ tr!\n!\n!; - my @values = ($smsg->ts, $num, $bytes, $lines); + my @values = ($smsg->ds, $num, $bytes, $lines, $smsg->ts); add_values($doc, \@values); my $tg = $self->term_generator; @@ -355,26 +352,33 @@ sub add_message { # populates smsg->references for smsg->to_doc_data my $refs = parse_references($smsg); - my $data = $smsg->to_doc_data($blob); + $mid0 = $mids->[0] unless defined $mid0; # v1 compatibility + my $data = $smsg->to_doc_data($oid, $mid0); foreach my $mid (@$mids) { $tg->index_text($mid, 1, 'XM'); } $doc->set_data($data); if (my $altid = $self->{-altid}) { foreach my $alt (@$altid) { + my $pfx = $alt->{xprefix}; foreach my $mid (@$mids) { my $id = $alt->mid2alt($mid); next unless defined $id; - $doc->add_term($alt->{xprefix} . $id); + $doc->add_boolean_term($pfx . $id); } } } + + $self->delete_article($num) if defined $num; # for reindexing if ($skel) { push @values, $mids, $xpath, $data; $skel->index_skeleton(\@values); + $doc->add_boolean_term('Q' . $_) foreach @$mids; + $doc->add_boolean_term('XNUM' . $num) if defined $num; $doc_id = $self->{xdb}->add_document($doc); } else { - $doc_id = link_and_save($self, $doc, $mids, $refs); + $doc_id = link_and_save($self, $doc, $mids, $refs, + $num, $xpath); } }; @@ -385,29 +389,73 @@ sub add_message { $doc_id; } -# returns deleted doc_id on success, undef on missing +sub batch_do { + my ($self, $termval, $cb) = @_; + my $batch_size = 1000; # don't let @ids grow too large to avoid OOM + while (1) { + my ($head, $tail) = $self->find_doc_ids($termval); + return if $head == $tail; + my @ids; + for (; $head != $tail && @ids < $batch_size; $head->inc) { + push @ids, $head->get_docid; + } + $cb->(\@ids); + } +} + sub remove_message { my ($self, $mid) = @_; my $db = $self->{xdb}; - my $doc_id; + my $called; $mid = mid_clean($mid); eval { - my ($head, $tail) = $self->find_doc_ids('Q' . $mid); - if ($head->equal($tail)) { - warn "cannot remove non-existent <$mid>\n"; - } - for (; $head != $tail; $head->inc) { - my $docid = $head->get_docid; - $db->delete_document($docid); - } + batch_do($self, 'Q' . $mid, sub { + my ($ids) = @_; + $db->delete_document($_) for @$ids; + $called = 1; + }); }; - if ($@) { warn "failed to remove message <$mid>: $@\n"; - return undef; + } elsif (!$called) { + warn "cannot remove non-existent <$mid>\n"; } - $doc_id; +} + +sub delete_article { + my ($self, $num) = @_; + my $ndel = 0; + batch_do($self, 'XNUM' . $num, sub { + my ($ids) = @_; + $ndel += scalar @$ids; + $self->{xdb}->delete_document($_) for @$ids; + }); +} + +# MID is a hint in V2 +sub remove_by_oid { + my ($self, $oid, $mid) = @_; + my $db = $self->{xdb}; + + # XXX careful, we cannot use batch_do here since we conditionally + # delete documents based on other factors, so we cannot call + # find_doc_ids twice. + my ($head, $tail) = $self->find_doc_ids('Q' . $mid); + return if $head == $tail; + + # there is only ONE element in @delete unless we + # have bugs in our v2writable deduplication check + my @delete; + for (; $head != $tail; $head->inc) { + my $docid = $head->get_docid; + my $doc = $db->get_document($docid); + my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); + $smsg->load_expand; + push(@delete, $docid) if $smsg->{blob} eq $oid; + } + $db->delete_document($_) foreach @delete; + scalar(@delete); } sub term_generator { # write-only @@ -445,9 +493,9 @@ sub parse_references ($) { my %mids = map { $_ => 1 } @{mids($hdr)}; my @keep; foreach my $ref (@$refs) { - # FIXME: this is an archive-prevention vector like X-No-Archive - if (length($ref) > MAX_MID_SIZE) { + if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) { warn "References: <$ref> too long, ignoring\n"; + next; } next if $mids{$ref}; push @keep, $ref; @@ -475,16 +523,21 @@ sub link_doc { } else { $tid = defined $old_tid ? $old_tid : $self->next_thread_id; } - $doc->add_term('G' . $tid); + $doc->add_boolean_term('G' . $tid); $tid; } sub link_and_save { - my ($self, $doc, $mids, $refs) = @_; + my ($self, $doc, $mids, $refs, $num, $xpath) = @_; my $db = $self->{xdb}; my $old_tid; my $doc_id; + $doc->add_boolean_term('XNUM' . $num) if defined $num; + $doc->add_boolean_term('XPATH' . $xpath) if defined $xpath; + $doc->add_boolean_term('Q' . $_) foreach @$mids; + my $vivified = 0; + $self->{skel} and die "Should not have read-only skel here\n";; foreach my $mid (@$mids) { $self->each_smsg_by_mid($mid, sub { my ($cur) = @_; @@ -508,6 +561,10 @@ sub link_and_save { 1; }); } + if ($vivified > 1) { + my $id = '<'.join('> <', @$mids).'>'; + warn "BUG: vivified multiple ($vivified) ghosts for $id\n"; + } # not really important, but we return any vivified ghost docid, here: return $doc_id if defined $doc_id; link_doc($self, $doc, $refs, $old_tid); @@ -756,9 +813,9 @@ sub create_ghost { my $tid = $self->next_thread_id; my $doc = Search::Xapian::Document->new; - $doc->add_term('Q' . $mid); - $doc->add_term('G' . $tid); - $doc->add_term('T' . 'ghost'); + $doc->add_boolean_term('Q' . $mid); + $doc->add_boolean_term('G' . $tid); + $doc->add_boolean_term('T' . 'ghost'); my $smsg = PublicInbox::SearchMsg->wrap($doc, $mid); $self->{xdb}->add_document($doc); @@ -770,29 +827,22 @@ sub merge_threads { my ($self, $winner_tid, $loser_tid) = @_; return if $winner_tid == $loser_tid; my $db = $self->{xdb}; - - my $batch_size = 1000; # don't let @ids grow too large to avoid OOM - while (1) { - my ($head, $tail) = $self->find_doc_ids('G' . $loser_tid); - return if $head == $tail; - my @ids; - for (; $head != $tail && @ids < $batch_size; $head->inc) { - push @ids, $head->get_docid; - } - foreach my $docid (@ids) { + batch_do($self, 'G' . $loser_tid, sub { + my ($ids) = @_; + foreach my $docid (@$ids) { my $doc = $db->get_document($docid); $doc->remove_term('G' . $loser_tid); - $doc->add_term('G' . $winner_tid); + $doc->add_boolean_term('G' . $winner_tid); $db->replace_document($docid, $doc); } - } + }); } sub _read_git_config_perm { my ($self) = @_; my @cmd = qw(config); if ($self->{version} == 2) { - push @cmd, "--file=$self->{mainrepo}/inbox-config"; + push @cmd, "--file=$self->{mainrepo}/all.git/config"; } my $fh = $self->{git}->popen(@cmd, 'core.sharedRepository'); local $/ = "\n"; @@ -856,17 +906,59 @@ sub DESTROY { # remote_* subs are only used by SearchIdxPart and SearchIdxSkeleton sub remote_commit { my ($self) = @_; - print { $self->{w} } "commit\n" or die "failed to write commit: $!"; + if (my $w = $self->{w}) { + print $w "commit\n" or die "failed to write commit: $!"; + } else { + $self->commit_txn_lazy; + if (my $skel = $self->{skeleton}) { + $skel->commit_txn_lazy; + } + } } sub remote_close { my ($self) = @_; - my $pid = delete $self->{pid} or die "no process to wait on\n"; - my $w = delete $self->{w} or die "no pipe to write to\n"; - print $w "close\n" or die "failed to write to pid:$pid: $!\n"; - close $w or die "failed to close pipe for pid:$pid: $!\n"; - waitpid($pid, 0) == $pid or die "remote process did not finish"; - $? == 0 or die ref($self)." pid:$pid exited with: $?"; + if (my $w = delete $self->{w}) { + my $pid = delete $self->{pid} or die "no process to wait on\n"; + print $w "close\n" or die "failed to write to pid:$pid: $!\n"; + close $w or die "failed to close pipe for pid:$pid: $!\n"; + waitpid($pid, 0) == $pid or die "remote process did not finish"; + $? == 0 or die ref($self)." pid:$pid exited with: $?"; + } else { + die "transaction in progress $self\n" if $self->{txn}; + $self->_xdb_release if $self->{xdb}; + } +} + +sub remote_remove { + my ($self, $oid, $mid) = @_; + if (my $w = $self->{w}) { + # triggers remove_by_oid in partition or skeleton + print $w "D $oid $mid\n" or die "failed to write remove $!"; + } else { + $self->begin_txn_lazy; + $self->remove_by_oid($oid, $mid); + } +} + +sub begin_txn_lazy { + my ($self) = @_; + return if $self->{txn}; + my $xdb = $self->{xdb} || $self->_xdb_acquire; + $xdb->begin_transaction; + $self->{txn} = 1; +} + +sub commit_txn_lazy { + my ($self) = @_; + delete $self->{txn} or return; + $self->{xdb}->commit_transaction; +} + +sub worker_done { + my ($self) = @_; + die "$$ $0 xdb not released\n" if $self->{xdb}; + die "$$ $0 still in transaction\n" if $self->{txn}; } 1;