X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=lib%2FCWMP%2FQueue.pm;h=87a7e54e5ea480ea750cb076007c540447e2d41c;hb=5ab6475a4ac2a732d2cbd13780eb4d629d4c4691;hp=014f5dbbf3fe39f3852c0ce47f2af91dfe97142b;hpb=b7d41b116145fb3ae84f32e493ef7a7b1fad4da7;p=perl-cwmp.git diff --git a/lib/CWMP/Queue.pm b/lib/CWMP/Queue.pm index 014f5db..87a7e54 100644 --- a/lib/CWMP/Queue.pm +++ b/lib/CWMP/Queue.pm @@ -7,6 +7,8 @@ use warnings; use base qw/Class::Accessor/; __PACKAGE__->mk_accessors( qw/ id +dir +clean debug / ); @@ -14,10 +16,13 @@ debug #use Carp qw/confess/; use Data::Dump qw/dump/; use File::Spec; -use File::Path qw/mkpath/; +use File::Path qw/mkpath rmtree/; use IPC::DirQueue; +use YAML::Syck qw/Dump/; use Carp qw/confess/; +#use Devel::LeakTrace::Fast; + =head1 NAME CWMP::Queue - implement commands queue for CPE @@ -28,6 +33,8 @@ CWMP::Queue - implement commands queue for CPE my $obj = CWMP::Queue->new({ id => 'CPE_serial_number', + dir => 'queue', + clean => 1, debug => 1 }); @@ -41,7 +48,12 @@ sub new { warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug; - my $dir = File::Spec->catfile('queue',$self->id); + my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id ); + + if ( -e $dir && $self->clean ) { + rmtree $dir || die "can't remove $dir: $!"; + warn "## clean $dir\n" if $self->debug; + } if ( ! -e $dir ) { mkpath $dir || die "can't create $dir: $!"; @@ -65,7 +77,9 @@ sub new { =head2 enqueue $q->enqueue( - 'foo.bar.baz' => 42, + 'CommandToDispatch', { + 'foo.bar.baz' => 42, + } ); =cut @@ -74,16 +88,19 @@ sub enqueue { my $self = shift; my $id = $self->id; - my $data = {@_} || confess "need data"; - - warn "## enqueue( $id, ", dump( $data ), " )\n" if $self->debug; + my ( $dispatch, $args ) = @_; - $self->{dq}->{$id}->enqueue_string( $id, $data ); + warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug; + + $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) ); } =head2 dequeue - my $data = $q->dequeue; + my $job = $q->dequeue; + my ( $dispatch, $args ) = $job->dispatch; + # after dispatch is processed + $job->finish; =cut @@ -92,11 +109,70 @@ sub dequeue { my $id = $self->id; - my $data = $self->{dq}->{$id}->pickup_queued_job(); - return unless defined $data; + my $job = $self->{dq}->{$id}->pickup_queued_job(); + return unless defined $job; - warn "## dequeue( $id ) = ", dump( $data ), " )\n" if $self->debug; + warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug; - return $data->{metadata}; + return CWMP::Queue::Job->new({ job => $job, debug => $self->debug }); } + +=head2 dq + +Accessor to C object + + my $dq = $q->dq; + +=cut + +sub dq { + my $self = shift; + return $self->{dq}->{$self->id}; +} + +package CWMP::Queue::Job; + +=head1 CWMP::Queue::Job + +Single queued job + +=cut + +use base qw/Class::Accessor/; +__PACKAGE__->mk_accessors( qw/ +job +debug +/ ); + +use YAML qw/LoadFile/; +use Data::Dump qw/dump/; + +=head2 dispatch + + my ( $dispatch, $args ) = $job->dispatch; + +=cut + +sub dispatch { + my $self = shift; + my $path = $self->job->get_data_path || die "get_data_path?"; + my $data = LoadFile( $path ) || die "can't read $path: $!"; + warn "## dispatch returns ",dump($data),"\n" if $self->debug; + return ( $data->{dispatch}, $data->{args} ); +} + +=head2 finish + +Finish job and remove it from queue + + $job->finish; + +=cut + +sub finish { + my $self = shift; + $self->job->finish; + return 1; +} + 1;