[freeside-commits] branch master updated. dc0f1291d98ac8f8d5b5da9465ab282f36c90034

Mark Wells mark at 420.am
Thu May 22 22:21:30 PDT 2014


The branch, master has been updated
       via  dc0f1291d98ac8f8d5b5da9465ab282f36c90034 (commit)
      from  9b20f80d7bfe5ad53c1870c64b74dca87afa7761 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit dc0f1291d98ac8f8d5b5da9465ab282f36c90034
Author: Mark Wells <mark at freeside.biz>
Date:   Thu May 22 22:21:25 2014 -0700

    fix bad race conditions in parallel schema upgrade, #29163

diff --git a/FS/bin/freeside-upgrade b/FS/bin/freeside-upgrade
index 8c87ab2..b2cd3db 100755
--- a/FS/bin/freeside-upgrade
+++ b/FS/bin/freeside-upgrade
@@ -182,54 +182,111 @@ if ( $opt_c ) {
 
 }
 
+my $MAX_HANDLES; # undef for now, set it if you want a limit
+
 if ( $DRY_RUN ) {
   print
     join(";\n", @statements ). ";\n";
   exit;
-} else {
-
-  my @clones = ();
-  foreach my $statement ( @statements ) {
-
-    if ( $opt_a ) {
-
-      my $clone = '';
-      until ( $clone = $dbh->clone ) {
-        sleep 60; #too many database connections?  wait and retry
-      }
-      until ( $clone->do( $statement, {pg_async=>PG_ASYNC} ) ) {
-        sleep 60; #too many ... running queries?  wait and retry
-      }
-      warn "$statement\n";
-      push @clones, $clone;
-
+} elsif ( $opt_a ) {
+
+  my @phases = map { [] } 0..4;
+  my $fsupgrade_idx = 1;
+  my %idx_map;
+  foreach (@statements) {
+    if ( /^ *(CREATE|ALTER) +TABLE/ ) {
+      # phase 0: CREATE TABLE, ALTER TABLE
+      push @{ $phases[0] }, $_;
+    } elsif ( /^ *ALTER +INDEX.* RENAME TO dbs_temp(\d+)/ ) {
+      # phase 1: rename index to dbs_temp%d
+      # (see DBIx::DBSchema::Table)
+      # but in this case, uniqueify all the dbs_temps.  This method only works
+      # because they are in the right order to begin with...
+      my $dbstemp_idx = $1;
+      s/dbs_temp$dbstemp_idx/fsupgrade_temp$fsupgrade_idx/;
+      $idx_map{ $dbstemp_idx } = $fsupgrade_idx;
+      push @{ $phases[1] }, $_;
+      $fsupgrade_idx++;
+    } elsif ( /^ *(CREATE|DROP)( +UNIQUE)? +INDEX/ ) {
+      # phase 2: create/drop indices
+      push @{ $phases[2] }, $_;
+    } elsif ( /^ *ALTER +INDEX +dbs_temp(\d+) +RENAME/ ) {
+      # phase 3: rename temp indices back to real ones
+      my $dbstemp_idx = $1;
+      my $mapped_idx = $idx_map{ $dbstemp_idx }
+        or die "unable to remap dbs_temp$1 RENAME statement";
+      s/dbs_temp$dbstemp_idx/fsupgrade_temp$mapped_idx/;
+      push @{ $phases[3] }, $_;
     } else {
-      warn "$statement\n";
-      $dbh->do( $statement )
-        or die "Error: ". $dbh->errstr. "\n executing: $statement";
+      # phase 4: everything else (CREATE SEQUENCE, SELECT SETVAL, etc.)
+      push @{ $phases[4] }, $_;
     }
-
   }
-
-  warn "Waiting for all schema changes to complete\n" if @clones; # && $DEBUG;
-  while ( @clones ) {
-    my @newclones = ();
-    foreach my $clone ( @clones ) {
-      if ( $clone->pg_ready ) {
-        $clone->pg_result or die $clone->errstr;
-        $clone->commit    or die $clone->errstr;
-      } else {
-        push @newclones, $clone;
+  my $i = 0;
+  my @busy = ();
+  my @free = ();
+  foreach my $phase (@phases) {
+    warn "Starting schema changes, phase $i...\n";
+    while (@$phase or @busy) {
+      # check status of all running tasks
+      my @newbusy;
+      my $failed_clone;
+      for my $clone (@busy) {
+        if ( $clone->pg_ready ) {
+          # then clean it up
+          my $rv = $clone->pg_result && $clone->commit;
+          $failed_clone = $clone if !$rv;
+          push @free, $clone;
+        } else {
+          push @newbusy, $clone;
+        }
       }
-    }
-    @clones = @newclones;
-    sleep 30 if @clones;
-  }
+      if ( $failed_clone ) {
+        my $errstr = $failed_clone->errstr;
+        foreach my $clone (@newbusy, $failed_clone) {
+          $clone->pg_cancel if $clone->{pg_async_status} == 1;
+          $clone->disconnect;
+        }
+        die "$errstr\n";
+      }
+      @busy = @newbusy;
+      if (my $statement = $phase->[0]) {
+        my $clone;
+        if ( @free ) {
+          $clone = shift(@free);
+        } elsif ( !$MAX_HANDLES or 
+                  scalar(@free) + scalar(@busy) < $MAX_HANDLES ) {
+          $clone = $dbh->clone; # this will fail if over the server limit
+        }
+
+        if ( $clone ) {
+          my $rv = $clone->do($statement, {pg_async => PG_ASYNC});
+          if ( $rv ) {
+            warn "$statement\n";
+            shift @{ $phase }; # and actually take the statement off the queue
+            push @busy, $clone;
+          } # else I don't know, wait and retry
+        } # else too many handles, wait and retry
+      } elsif (@busy) {
+        # all statements are dispatched
+        warn "Waiting for phase $i to complete\n";
+        sleep 30;
+      }
+    } # while @$phase or @busy
+    $i++;
+  } # foreach $phase
+  warn "Schema changes complete.\n";
 
 #  warn "Pre-schema change upgrades completed in ". (time-$start). " seconds\n"; # if $DEBUG;
 #  $start = time;
 
 #  dbdef->update_schema( dbdef_dist(datasrc), $dbh );
+} else { # normal case, run statements sequentially
+  foreach my $statement ( @statements ) {
+    warn "$statement\n";
+    $dbh->do( $statement )
+      or die "Error: ". $dbh->errstr. "\n executing: $statement";
+  }
 }
 
 warn "Schema upgrade completed in ". (time-$start). " seconds\n"; # if $DEBUG;

-----------------------------------------------------------------------

Summary of changes:
 FS/bin/freeside-upgrade |  127 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 92 insertions(+), 35 deletions(-)




More information about the freeside-commits mailing list