[freeside-commits] branch FREESIDE_3_BRANCH updated. 38c28f5486c5fb02d1d6edfebf878b10a2a9239e
Mark Wells
mark at 420.am
Thu May 22 22:21:30 PDT 2014
The branch, FREESIDE_3_BRANCH has been updated
via 38c28f5486c5fb02d1d6edfebf878b10a2a9239e (commit)
from 1b4b00291c912647c6225f62c2e2be05b9be1f8f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit 38c28f5486c5fb02d1d6edfebf878b10a2a9239e
Author: Mark Wells <mark at freeside.biz>
Date: Thu May 22 22:21:10 2014 -0700
fix bad race conditions in parallel schema upgrade, #29163
diff --git a/FS/bin/freeside-upgrade b/FS/bin/freeside-upgrade
index 6c9c240..c3d070e 100755
--- a/FS/bin/freeside-upgrade
+++ b/FS/bin/freeside-upgrade
@@ -182,54 +182,111 @@ if ( $opt_c ) {
}
+my $MAX_HANDLES; # undef for now, set it if you want a limit
+
if ( $DRY_RUN ) {
print
join(";\n", @statements ). ";\n";
exit;
-} else {
-
- my @clones = ();
- foreach my $statement ( @statements ) {
-
- if ( $opt_a ) {
-
- my $clone = '';
- until ( $clone = $dbh->clone ) {
- sleep 60; #too many database connections? wait and retry
- }
- until ( $clone->do( $statement, {pg_async=>PG_ASYNC} ) ) {
- sleep 60; #too many ... running queries? wait and retry
- }
- warn "$statement\n";
- push @clones, $clone;
-
+} elsif ( $opt_a ) {
+
+ my @phases = map { [] } 0..4;
+ my $fsupgrade_idx = 1;
+ my %idx_map;
+ foreach (@statements) {
+ if ( /^ *(CREATE|ALTER) +TABLE/ ) {
+ # phase 0: CREATE TABLE, ALTER TABLE
+ push @{ $phases[0] }, $_;
+ } elsif ( /^ *ALTER +INDEX.* RENAME TO dbs_temp(\d+)/ ) {
+ # phase 1: rename index to dbs_temp%d
+ # (see DBIx::DBSchema::Table)
+ # but in this case, uniqueify all the dbs_temps. This method only works
+ # because they are in the right order to begin with...
+ my $dbstemp_idx = $1;
+ s/dbs_temp$dbstemp_idx/fsupgrade_temp$fsupgrade_idx/;
+ $idx_map{ $dbstemp_idx } = $fsupgrade_idx;
+ push @{ $phases[1] }, $_;
+ $fsupgrade_idx++;
+ } elsif ( /^ *(CREATE|DROP)( +UNIQUE)? +INDEX/ ) {
+ # phase 2: create/drop indices
+ push @{ $phases[2] }, $_;
+ } elsif ( /^ *ALTER +INDEX +dbs_temp(\d+) +RENAME/ ) {
+ # phase 3: rename temp indices back to real ones
+ my $dbstemp_idx = $1;
+ my $mapped_idx = $idx_map{ $dbstemp_idx }
+ or die "unable to remap dbs_temp$1 RENAME statement";
+ s/dbs_temp$dbstemp_idx/fsupgrade_temp$mapped_idx/;
+ push @{ $phases[3] }, $_;
} else {
- warn "$statement\n";
- $dbh->do( $statement )
- or die "Error: ". $dbh->errstr. "\n executing: $statement";
+ # phase 4: everything else (CREATE SEQUENCE, SELECT SETVAL, etc.)
+ push @{ $phases[4] }, $_;
}
-
}
-
- warn "Waiting for all schema changes to complete\n" if @clones; # && $DEBUG;
- while ( @clones ) {
- my @newclones = ();
- foreach my $clone ( @clones ) {
- if ( $clone->pg_ready ) {
- $clone->pg_result or die $clone->errstr;
- $clone->commit or die $clone->errstr;
- } else {
- push @newclones, $clone;
+ my $i = 0;
+ my @busy = ();
+ my @free = ();
+ foreach my $phase (@phases) {
+ warn "Starting schema changes, phase $i...\n";
+ while (@$phase or @busy) {
+ # check status of all running tasks
+ my @newbusy;
+ my $failed_clone;
+ for my $clone (@busy) {
+ if ( $clone->pg_ready ) {
+ # then clean it up
+ my $rv = $clone->pg_result && $clone->commit;
+ $failed_clone = $clone if !$rv;
+ push @free, $clone;
+ } else {
+ push @newbusy, $clone;
+ }
}
- }
- @clones = @newclones;
- sleep 30 if @clones;
- }
+ if ( $failed_clone ) {
+ my $errstr = $failed_clone->errstr;
+ foreach my $clone (@newbusy, $failed_clone) {
+ $clone->pg_cancel if $clone->{pg_async_status} == 1;
+ $clone->disconnect;
+ }
+ die "$errstr\n";
+ }
+ @busy = @newbusy;
+ if (my $statement = $phase->[0]) {
+ my $clone;
+ if ( @free ) {
+ $clone = shift(@free);
+ } elsif ( !$MAX_HANDLES or
+ scalar(@free) + scalar(@busy) < $MAX_HANDLES ) {
+ $clone = $dbh->clone; # this will fail if over the server limit
+ }
+
+ if ( $clone ) {
+ my $rv = $clone->do($statement, {pg_async => PG_ASYNC});
+ if ( $rv ) {
+ warn "$statement\n";
+ shift @{ $phase }; # and actually take the statement off the queue
+ push @busy, $clone;
+ } # else I don't know, wait and retry
+ } # else too many handles, wait and retry
+ } elsif (@busy) {
+ # all statements are dispatched
+ warn "Waiting for phase $i to complete\n";
+ sleep 30;
+ }
+ } # while @$phase or @busy
+ $i++;
+ } # foreach $phase
+ warn "Schema changes complete.\n";
# warn "Pre-schema change upgrades completed in ". (time-$start). " seconds\n"; # if $DEBUG;
# $start = time;
# dbdef->update_schema( dbdef_dist(datasrc), $dbh );
+} else { # normal case, run statements sequentially
+ foreach my $statement ( @statements ) {
+ warn "$statement\n";
+ $dbh->do( $statement )
+ or die "Error: ". $dbh->errstr. "\n executing: $statement";
+ }
}
warn "Schema upgrade completed in ". (time-$start). " seconds\n"; # if $DEBUG;
-----------------------------------------------------------------------
Summary of changes:
FS/bin/freeside-upgrade | 127 ++++++++++++++++++++++++++++++++++-------------
1 files changed, 92 insertions(+), 35 deletions(-)
More information about the freeside-commits
mailing list