X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FCWMP%2FQueue.pm;h=87a7e54e5ea480ea750cb076007c540447e2d41c;hb=df6a32df87becf896cccaae4b24cb88461391ffe;hp=bf4595e76c0a7e5fbe44326e878f9fb6cf475afa;hpb=fec1acf5d1dfce1a661535d2b40b0e9d97de6005;p=perl-cwmp.git diff --git a/lib/CWMP/Queue.pm b/lib/CWMP/Queue.pm index bf4595e..87a7e54 100644 --- a/lib/CWMP/Queue.pm +++ b/lib/CWMP/Queue.pm @@ -7,6 +7,8 @@ use warnings; use base qw/Class::Accessor/; __PACKAGE__->mk_accessors( qw/ id +dir +clean debug / ); @@ -14,11 +16,13 @@ debug #use Carp qw/confess/; use Data::Dump qw/dump/; use File::Spec; -use File::Path qw/mkpath/; +use File::Path qw/mkpath rmtree/; use IPC::DirQueue; -use YAML qw/Dump/; +use YAML::Syck qw/Dump/; use Carp qw/confess/; +#use Devel::LeakTrace::Fast; + =head1 NAME CWMP::Queue - implement commands queue for CPE @@ -29,6 +33,8 @@ CWMP::Queue - implement commands queue for CPE my $obj = CWMP::Queue->new({ id => 'CPE_serial_number', + dir => 'queue', + clean => 1, debug => 1 }); @@ -42,7 +48,12 @@ sub new { warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug; - my $dir = File::Spec->catfile('queue',$self->id); + my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id ); + + if ( -e $dir && $self->clean ) { + rmtree $dir || die "can't remove $dir: $!"; + warn "## clean $dir\n" if $self->debug; + } if ( ! -e $dir ) { mkpath $dir || die "can't create $dir: $!"; @@ -66,7 +77,9 @@ sub new { =head2 enqueue $q->enqueue( - 'foo.bar.baz' => 42, + 'CommandToDispatch', { + 'foo.bar.baz' => 42, + } ); =cut @@ -75,16 +88,17 @@ sub enqueue { my $self = shift; my $id = $self->id; + my ( $dispatch, $args ) = @_; + + warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug; - warn "## enqueue( $id, ", dump( @_ ), " )\n" if $self->debug; - - $self->{dq}->{$id}->enqueue_string( Dump( @_ ) ); + $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) ); } =head2 dequeue my $job = $q->dequeue; - my $dispatch = $job->dispatch; + my ( $dispatch, $args ) = $job->dispatch; # after dispatch is processed $job->finish; @@ -98,29 +112,67 @@ sub dequeue { my $job = $self->{dq}->{$id}->pickup_queued_job(); return unless defined $job; - warn "## dequeue( $id ) = ", dump( $job ), " )\n" if $self->debug; + warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug; - return CWMP::Queue::Job->new({ job => $job }); + return CWMP::Queue::Job->new({ job => $job, debug => $self->debug }); +} + +=head2 dq + +Accessor to C object + + my $dq = $q->dq; + +=cut + +sub dq { + my $self = shift; + return $self->{dq}->{$self->id}; } package CWMP::Queue::Job; +=head1 CWMP::Queue::Job + +Single queued job + +=cut + use base qw/Class::Accessor/; __PACKAGE__->mk_accessors( qw/ job +debug / ); use YAML qw/LoadFile/; +use Data::Dump qw/dump/; + +=head2 dispatch + + my ( $dispatch, $args ) = $job->dispatch; + +=cut sub dispatch { my $self = shift; my $path = $self->job->get_data_path || die "get_data_path?"; - return LoadFile( $path ) || die "can't read $path: $!"; + my $data = LoadFile( $path ) || die "can't read $path: $!"; + warn "## dispatch returns ",dump($data),"\n" if $self->debug; + return ( $data->{dispatch}, $data->{args} ); } +=head2 finish + +Finish job and remove it from queue + + $job->finish; + +=cut + sub finish { my $self = shift; $self->job->finish; + return 1; } 1;