projects
/
perl-cwmp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fix for ERROR: no ID in state
[perl-cwmp.git]
/
lib
/
CWMP
/
Queue.pm
diff --git
a/lib/CWMP/Queue.pm
b/lib/CWMP/Queue.pm
index
bf4595e
..
87a7e54
100644
(file)
--- a/
lib/CWMP/Queue.pm
+++ b/
lib/CWMP/Queue.pm
@@
-7,6
+7,8
@@
use warnings;
use base qw/Class::Accessor/;
__PACKAGE__->mk_accessors( qw/
id
use base qw/Class::Accessor/;
__PACKAGE__->mk_accessors( qw/
id
+dir
+clean
debug
/ );
debug
/ );
@@
-14,11
+16,13
@@
debug
#use Carp qw/confess/;
use Data::Dump qw/dump/;
use File::Spec;
#use Carp qw/confess/;
use Data::Dump qw/dump/;
use File::Spec;
-use File::Path qw/mkpath/;
+use File::Path qw/mkpath
rmtree
/;
use IPC::DirQueue;
use IPC::DirQueue;
-use YAML qw/Dump/;
+use YAML
::Syck
qw/Dump/;
use Carp qw/confess/;
use Carp qw/confess/;
+#use Devel::LeakTrace::Fast;
+
=head1 NAME
CWMP::Queue - implement commands queue for CPE
=head1 NAME
CWMP::Queue - implement commands queue for CPE
@@
-29,6
+33,8
@@
CWMP::Queue - implement commands queue for CPE
my $obj = CWMP::Queue->new({
id => 'CPE_serial_number',
my $obj = CWMP::Queue->new({
id => 'CPE_serial_number',
+ dir => 'queue',
+ clean => 1,
debug => 1
});
debug => 1
});
@@
-42,7
+48,12
@@
sub new {
warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
- my $dir = File::Spec->catfile('queue',$self->id);
+ my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id );
+
+ if ( -e $dir && $self->clean ) {
+ rmtree $dir || die "can't remove $dir: $!";
+ warn "## clean $dir\n" if $self->debug;
+ }
if ( ! -e $dir ) {
mkpath $dir || die "can't create $dir: $!";
if ( ! -e $dir ) {
mkpath $dir || die "can't create $dir: $!";
@@
-66,7
+77,9
@@
sub new {
=head2 enqueue
$q->enqueue(
=head2 enqueue
$q->enqueue(
- 'foo.bar.baz' => 42,
+ 'CommandToDispatch', {
+ 'foo.bar.baz' => 42,
+ }
);
=cut
);
=cut
@@
-75,16
+88,17
@@
sub enqueue {
my $self = shift;
my $id = $self->id;
my $self = shift;
my $id = $self->id;
+ my ( $dispatch, $args ) = @_;
+
+ warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
- warn "## enqueue( $id, ", dump( @_ ), " )\n" if $self->debug;
-
- $self->{dq}->{$id}->enqueue_string( Dump( @_ ) );
+ $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
}
=head2 dequeue
my $job = $q->dequeue;
}
=head2 dequeue
my $job = $q->dequeue;
- my
$dispatch
= $job->dispatch;
+ my
( $dispatch, $args )
= $job->dispatch;
# after dispatch is processed
$job->finish;
# after dispatch is processed
$job->finish;
@@
-98,29
+112,67
@@
sub dequeue {
my $job = $self->{dq}->{$id}->pickup_queued_job();
return unless defined $job;
my $job = $self->{dq}->{$id}->pickup_queued_job();
return unless defined $job;
- warn "## dequeue
( $id )
= ", dump( $job ), " )\n" if $self->debug;
+ warn "## dequeue
for $id
= ", dump( $job ), " )\n" if $self->debug;
- return CWMP::Queue::Job->new({ job => $job });
+ return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
+}
+
+=head2 dq
+
+Accessor to C<IPC::DirQueue> object
+
+ my $dq = $q->dq;
+
+=cut
+
+sub dq {
+ my $self = shift;
+ return $self->{dq}->{$self->id};
}
package CWMP::Queue::Job;
}
package CWMP::Queue::Job;
+=head1 CWMP::Queue::Job
+
+Single queued job
+
+=cut
+
use base qw/Class::Accessor/;
__PACKAGE__->mk_accessors( qw/
job
use base qw/Class::Accessor/;
__PACKAGE__->mk_accessors( qw/
job
+debug
/ );
use YAML qw/LoadFile/;
/ );
use YAML qw/LoadFile/;
+use Data::Dump qw/dump/;
+
+=head2 dispatch
+
+ my ( $dispatch, $args ) = $job->dispatch;
+
+=cut
sub dispatch {
my $self = shift;
my $path = $self->job->get_data_path || die "get_data_path?";
sub dispatch {
my $self = shift;
my $path = $self->job->get_data_path || die "get_data_path?";
- return LoadFile( $path ) || die "can't read $path: $!";
+ my $data = LoadFile( $path ) || die "can't read $path: $!";
+ warn "## dispatch returns ",dump($data),"\n" if $self->debug;
+ return ( $data->{dispatch}, $data->{args} );
}
}
+=head2 finish
+
+Finish job and remove it from queue
+
+ $job->finish;
+
+=cut
+
sub finish {
my $self = shift;
$self->job->finish;
sub finish {
my $self = shift;
$self->job->finish;
+ return 1;
}
1;
}
1;