[freeside-commits] branch master updated. 2b740ea3cbb32a3fb3906546552503d9d3cca590

Mark Wells mark at 420.am
Sat May 19 14:57:41 PDT 2012


The branch, master has been updated
       via  2b740ea3cbb32a3fb3906546552503d9d3cca590 (commit)
      from  bec96a5b94e6c2484a48ed2d4300a1294fa80de6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 2b740ea3cbb32a3fb3906546552503d9d3cca590
Author: Mark Wells <mark at freeside.biz>
Date:   Sat May 19 14:57:29 2012 -0700

    paged search to conserve memory in CDR processing, #16723

diff --git a/FS/FS/ClientAPI/MyAccount.pm b/FS/FS/ClientAPI/MyAccount.pm
index a07e345..e79fbfc 100644
--- a/FS/FS/ClientAPI/MyAccount.pm
+++ b/FS/FS/ClientAPI/MyAccount.pm
@@ -1913,6 +1913,8 @@ sub list_support_usage {
 
 sub _list_cdr_usage {
   # XXX CDR type support...
+  # XXX any way to do a paged search on this?
+  # we have to return the results all at once...
   my($svc_phone, $begin, $end, %opt) = @_;
   map [ $_->downstream_csv(%opt, 'keeparray' => 1) ],
     $svc_phone->get_cdrs( 'begin'=>$begin, 'end'=>$end, );
diff --git a/FS/FS/PagedSearch.pm b/FS/FS/PagedSearch.pm
new file mode 100644
index 0000000..09d05c4
--- /dev/null
+++ b/FS/FS/PagedSearch.pm
@@ -0,0 +1,189 @@
+package FS::PagedSearch;
+
+use strict;
+use vars qw($DEBUG $default_limit @EXPORT_OK);
+use base qw( Exporter );
+use FS::Record qw(qsearch dbdef);
+use Data::Dumper;
+
+$DEBUG = 0;
+$default_limit = 100;
+
+ at EXPORT_OK = 'psearch';
+
+=head1 NAME
+
+FS::PagedSearch - Iterator for querying large data sets
+
+=head1 SYNOPSIS
+
+use FS::PagedSearch qw(psearch);
+
+my $search = psearch('table', { field => 'value' ... });
+$search->limit(100); #optional
+while ( my $row = $search->fetch ) {
+...
+}
+
+=head1 SUBROUTINES
+
+=over 4
+
+=item psearch ARGUMENTS
+
+A wrapper around L<FS::Record::qsearch>.  Accepts all the same arguments 
+as qsearch, except for the arrayref union query mode, and returns an 
+FS::PagedSearch object to access the rows of the query one at a time.  
+If the query doesn't contain an ORDER BY clause already, it will be ordered
+by the table's primary key.
+
+=cut
+
+sub psearch {
+  # deep-copy qsearch args
+  my $q;
+  if ( ref($_[0]) eq 'ARRAY' ) {
+    die "union query not supported with psearch"; #yet
+  }
+  elsif ( ref($_[0]) eq 'HASH' ) {
+    %$q = %{ $_[0] };
+  }
+  else {
+    $q = {
+      'table'     => shift,
+      'hashref'   => shift,
+      'select'    => shift,
+      'extra_sql' => shift,
+      'cache_obj' => shift,
+      'addl_from' => shift,
+    };
+  }
+  warn Dumper($q) if $DEBUG > 1;
+
+  # clean up query
+  my $dbdef = dbdef->table($q->{table});
+  # qsearch just appends order_by to extra_sql, so do that ourselves
+  $q->{extra_sql} ||= '';
+  $q->{extra_sql} .= ' '.$q->{order_by} if $q->{order_by};
+  $q->{order_by} = '';
+  # and impose an ordering if needed
+  if ( not $q->{extra_sql} =~ /order by/i ) {
+    $q->{extra_sql} .= ' ORDER BY '.$dbdef->primary_key;
+  }
+  # and then we'll use order_by for LIMIT/OFFSET
+
+  my $self = {
+    query     => $q,
+    buffer    => [],
+    offset    => 0,
+    limit     => $default_limit,
+    increment => 1,
+  };
+  bless $self, 'FS::PagedSearch';
+
+  $self;
+}
+
+=back
+
+=head1 METHODS
+
+=over 4
+
+=item fetch
+
+Fetch the next row from the search results and remove it from the buffer.
+Returns undef if there are no more rows.
+
+=cut
+
+sub fetch {
+  my $self = shift;
+  my $b = $self->{buffer};
+  $self->refill if @$b == 0;
+  $self->{offset} += $self->{increment} if @$b;
+  return shift @$b;
+}
+
+=item adjust ROWS
+
+Add ROWS to the offset counter.  This won't cause rows to be skipped in the
+current buffer but will affect the starting point of the next refill.
+
+=cut
+
+sub adjust {
+  my $self = shift;
+  my $r = shift;
+  $self->{offset} += $r;
+}
+
+=item limit [ VALUE ]
+
+Set/get the number of rows to retrieve per page.  The default is 100.
+
+=cut
+
+sub limit {
+  my $self = shift;
+  my $new_limit = shift;
+  if ( defined($new_limit) ) {
+    $self->{limit} = $new_limit;
+  }
+  $self->{limit};
+}
+
+=item increment [ VALUE ]
+
+Set/get the number of rows to increment the offset for each row that's
+retrieved.  Defaults to 1.  If the rows are being modified in a way that 
+removes them from the result set of the query, it's probably wise to set 
+this to zero.  Setting it to anything else is probably nonsense.
+
+=cut
+
+sub increment {
+  my $self = shift;
+  my $new_inc = shift;
+  if ( defined($new_inc) ) {
+    $self->{increment} = $new_inc;
+  }
+  $self->{increment};
+}
+
+
+=item refill
+
+Run the query, skipping a number of rows set by the row offset, and replace 
+the contents of the buffer with the result.  If there are no more rows, 
+this will just empty the buffer.  Called automatically as needed; don't call 
+this from outside.
+
+=cut
+
+sub refill {
+  my $self = shift;
+  my $b = $self->{buffer};
+  warn "refilling (limit ".$self->{limit}.", offset ".$self->{offset}.")\n"
+    if $DEBUG;
+  warn "discarding ".scalar(@$b)." rows\n" if $DEBUG and @$b;
+  if ( $self->{limit} > 0 ) {
+    $self->{query}->{order_by} = 'LIMIT ' . $self->{limit} . 
+                                 ' OFFSET ' . $self->{offset};
+  }
+  @$b = qsearch( $self->{query} );
+  my $rows = scalar @$b;
+  warn "$rows returned\n" if $DEBUG;
+
+  $rows;
+}
+
+=back
+
+=head1 SEE ALSO
+
+L<FS::Record>
+
+=cut
+
+1;
diff --git a/FS/FS/part_pkg/voip_cdr.pm b/FS/FS/part_pkg/voip_cdr.pm
index aaad974..8c3d80d 100644
--- a/FS/FS/part_pkg/voip_cdr.pm
+++ b/FS/FS/part_pkg/voip_cdr.pm
@@ -401,9 +401,10 @@ sub calc_usage {
     #my @invoice_details_sort;
 
     #first rate any outstanding CDRs not yet rated
-    foreach my $cdr (
-      $svc_x->get_cdrs( %options )
-    ) {
+    my $cdr_search = $svc_x->psearch_cdrs(%options);
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0); # because we're changing their status as we go
+    while ( my $cdr = $cdr_search->fetch ) {
 
       my $error = $cdr->rate(
         'part_pkg'                          => $self,
@@ -414,14 +415,19 @@ sub calc_usage {
       );
       die $error if $error; #??
 
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+      # it was skipped without changing status, so increment the 
+      # offset so that we don't re-fetch it on refill
+
     } # $cdr
 
     #then add details to invoices & get a total
     $options{'status'} = 'rated';
 
-    foreach my $cdr (
-      $svc_x->get_cdrs( %options ) 
-    ) {
+    $cdr_search = $svc_x->psearch_cdrs(%options);
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0);
+    while ( my $cdr = $cdr_search->fetch ) {
       my $error;
       # at this point we officially Do Not Care about the rating method
       if ( $included_calls > 0 ) {
@@ -436,7 +442,9 @@ sub calc_usage {
       }
       die $error if $error;
       $formatter->append($cdr);
-    }
+
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq 'rated';
+    } #$cdr
   }
 
   $formatter->finish; #writes into $details
diff --git a/FS/FS/part_pkg/voip_inbound.pm b/FS/FS/part_pkg/voip_inbound.pm
index f4e5183..ecc4f47 100644
--- a/FS/FS/part_pkg/voip_inbound.pm
+++ b/FS/FS/part_pkg/voip_inbound.pm
@@ -227,13 +227,15 @@ sub calc_usage {
   ) {
     my $svc_phone = $cust_svc->svc_x;
 
-    foreach my $cdr ( $svc_phone->get_cdrs(
+    my $cdr_search = $svc_phone->psearch_cdrs(
       'inbound'        => 1,
       'default_prefix' => $self->option('default_prefix'),
       'status'         => '', # unprocessed only
       'for_update'     => 1,
-      )
-    ) {
+    );
+    $cdr_search->limit(1000);
+    $cdr_search->increment(0);
+    while ( my $cdr = $cdr_search->fetch ) {
 
       my $reason = $self->check_chargable( $cdr,
                                            'option_cache' => \%opt_cache,
@@ -310,6 +312,8 @@ sub calc_usage {
       die $error if $error;
       $formatter->append($cdr);
 
+      $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
     } #$cdr
   } # $cust_svc
 #  unshift @$details, { format => 'C',
diff --git a/FS/FS/part_pkg/voip_tiered.pm b/FS/FS/part_pkg/voip_tiered.pm
index e5dcf6d..d8d74c1 100644
--- a/FS/FS/part_pkg/voip_tiered.pm
+++ b/FS/FS/part_pkg/voip_tiered.pm
@@ -132,9 +132,11 @@ sub calc_usage {
 
       $options{'inbound'} = ( $pass eq 'inbound' );
 
-      foreach my $cdr (
-        $svc_x->get_cdrs( %options )
-      ) {
+      my $cdr_search = $svc_x->psearch_cdrs(%options);
+      $cdr_search->limit(1000);
+      $cdr_search->increment(0);
+      while ( my $cdr = $cdr_search->fetch ) {
+
         if ( $DEBUG > 1 ) {
           warn "rating CDR $cdr\n".
                join('', map { "  $_ => ". $cdr->{$_}. "\n" } keys %$cdr );
@@ -173,6 +175,8 @@ sub calc_usage {
 
         $total += $charge_min;
 
+        $cdr_search->adjust(1) if $cdr->freesidestatus eq '';
+
       } # $cdr
 
     } # $pass
@@ -213,9 +217,10 @@ sub calc_usage {
       # tell the formatter what we're sending it
       $formatter->inbound($options{'inbound'});
 
-      foreach my $cdr (
-        $svc_x->get_cdrs( %options )
-      ) {
+      my $cdr_search = $svc_x->psearch_cdrs(%options);
+      $cdr_search->limit(1000);
+      $cdr_search->increment(0);
+      while ( my $cdr = $cdr_search->fetch ) {
 
         my $object = $options{'inbound'}
                        ? $cdr->cdr_termination( 1 ) #1: inbound
@@ -242,6 +247,8 @@ sub calc_usage {
 
         $formatter->append($cdr);
 
+        $cdr_search->adjust(1) if $cdr->freesidestatus eq 'processing-tiered';
+
       } # $cdr
 
     } # $pass
diff --git a/FS/FS/svc_pbx.pm b/FS/FS/svc_pbx.pm
index f8b9605..4182a13 100644
--- a/FS/FS/svc_pbx.pm
+++ b/FS/FS/svc_pbx.pm
@@ -3,6 +3,7 @@ package FS::svc_pbx;
 use strict;
 use base qw( FS::svc_External_Common );
 use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
 use FS::Conf;
 use FS::cust_svc;
 use FS::svc_phone;
@@ -259,11 +260,13 @@ sub _check_duplicate {
   return '';
 }
 
-=item get_cdrs
+=item psearch_cdrs OPTIONS
 
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this 
-service.  By default, "associated with" means that the "charged_party" field of
-the CDR matches the "title" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records 
+associated with this service.  By default, "associated with" means that 
+the "charged_party" field of the CDR matches the "title" field of the 
+service.  To access the CDRs themselves, call "->fetch" on the resulting
+object.
 
 =over 2
 
@@ -295,7 +298,7 @@ to allow title to indicate a range of IP addresses.
 
 =cut
 
-sub get_cdrs {
+sub psearch_cdrs {
   my($self, %options) = @_;
   my %hash = ();
   my @where = ();
@@ -343,15 +346,26 @@ sub get_cdrs {
   my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where )
     if @where;
 
-  my @cdrs =
-    qsearch( {
+  psearch( {
       'table'      => 'cdr',
       'hashref'    => \%hash,
       'extra_sql'  => $extra_sql,
       'order_by'   => "ORDER BY startdate $for_update",
-    } );
+  } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a 
+single list.  Arguments are the same as for psearch_cdrs.  This can take
+an unreasonably large amount of memory and is best avoided.
 
-  @cdrs;
+=cut
+
+sub get_cdrs {
+  my $self = shift;
+  my $psearch = $self->psearch_cdrs($_);
+  qsearch ( $psearch->{query} )
 }
 
 =back
diff --git a/FS/FS/svc_phone.pm b/FS/FS/svc_phone.pm
index b395ea6..1296c1e 100644
--- a/FS/FS/svc_phone.pm
+++ b/FS/FS/svc_phone.pm
@@ -7,6 +7,7 @@ use Data::Dumper;
 use Scalar::Util qw( blessed );
 use FS::Conf;
 use FS::Record qw( qsearch qsearchs dbh );
+use FS::PagedSearch qw( psearch );
 use FS::Msgcat qw(gettext);
 use FS::part_svc;
 use FS::phone_device;
@@ -648,11 +649,13 @@ sub cust_location_or_main {
   $cust_pkg ? $cust_pkg->cust_location_or_main : '';
 }
 
-=item get_cdrs
+=item psearch_cdrs OPTIONS
 
-Returns a set of Call Detail Records (see L<FS::cdr>) associated with this 
-service.  By default, "associated with" means that either the "src" or the 
-"charged_party" field of the CDR matches the "phonenum" field of the service.
+Returns a paged search (L<FS::PagedSearch>) for Call Detail Records 
+associated with this service.  By default, "associated with" means that 
+either the "src" or the "charged_party" field of the CDR matches the 
+"phonenum" field of the service.  To access the CDRs themselves, call
+"->fetch" on the resulting object.
 
 =over 2
 
@@ -676,11 +679,16 @@ with the chosen prefix.
 
 =item by_svcnum: not supported for svc_phone
 
+=item billsec_sum: Instead of returning all of the CDRs, return a single
+record (as an L<FS::cdr> object) with the sum of the 'billsec' field over 
+the entire result set.
+
 =back
 
 =cut
 
-sub get_cdrs {
+sub psearch_cdrs {
+
   my($self, %options) = @_;
   my @fields;
   my %hash;
@@ -739,18 +747,30 @@ sub get_cdrs {
 
   my $extra_sql = ( keys(%hash) ? ' AND ' : ' WHERE ' ). join(' AND ', @where );
 
-  my @cdrs =
-    qsearch( {
+  psearch( {
       'table'      => 'cdr',
       'hashref'    => \%hash,
       'extra_sql'  => $extra_sql,
       'order_by'   => $options{'billsec_sum'} ? '' : "ORDER BY startdate $for_update",
       'select'     => $options{'billsec_sum'} ? 'sum(billsec) as billsec_sum' : '*',
-    } );
+  } );
+}
+
+=item get_cdrs (DEPRECATED)
+
+Like psearch_cdrs, but returns all the L<FS::cdr> objects at once, in a 
+single list.  Arguments are the same as for psearch_cdrs.  This can take 
+an unreasonably large amount of memory and is best avoided.
 
-  @cdrs;
+=cut
+
+sub get_cdrs {
+  my $self = shift;
+  my $psearch = $self->psearch_cdrs(@_);
+  qsearch ( $psearch->{query} )
 }
 
+
 =back
 
 =head1 BUGS

-----------------------------------------------------------------------

Summary of changes:
 FS/FS/ClientAPI/MyAccount.pm   |    2 +
 FS/FS/PagedSearch.pm           |  189 ++++++++++++++++++++++++++++++++++++++++
 FS/FS/part_pkg/voip_cdr.pm     |   22 +++--
 FS/FS/part_pkg/voip_inbound.pm |   10 ++-
 FS/FS/part_pkg/voip_tiered.pm  |   19 +++--
 FS/FS/svc_pbx.pm               |   32 +++++--
 FS/FS/svc_phone.pm             |   38 ++++++--
 7 files changed, 278 insertions(+), 34 deletions(-)
 create mode 100644 FS/FS/PagedSearch.pm




More information about the freeside-commits mailing list