use File::Spec;
use File::Path qw/mkpath/;
use IPC::DirQueue;
+use YAML qw/Dump/;
use Carp qw/confess/;
=head1 NAME
=head2 enqueue
$q->enqueue(
- 'foo.bar.baz' => 42,
+ 'CommandToDispatch', {
+ 'foo.bar.baz' => 42,
+ }
);
=cut
my $self = shift;
my $id = $self->id;
- my $data = {@_} || confess "need data";
-
- warn "## enqueue( $id, ", dump( $data ), " )\n" if $self->debug;
+ my ( $dispatch, $args ) = @_;
- $self->{dq}->{$id}->enqueue_string( $id, $data );
+ warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
+
+ $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
}
=head2 dequeue
- my $data = $q->dequeue;
+ my $job = $q->dequeue;
+ my ( $dispatch, $args ) = $job->dispatch;
+ # after dispatch is processed
+ $job->finish;
=cut
my $id = $self->id;
- my $data = $self->{dq}->{$id}->pickup_queued_job();
- return unless defined $data;
+ my $job = $self->{dq}->{$id}->pickup_queued_job();
+ return unless defined $job;
+
+ warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug;
+
+ return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
+}
+
+=head2 dq
+
+Accessor to C<IPC::DirQueue> object
+
+ my $dq = $q->dq;
+
+=cut
+
+sub dq {
+ my $self = shift;
+ return $self->{dq}->{$self->id};
+}
+
+package CWMP::Queue::Job;
+
+=head1 CWMP::Queue::Job
+
+Single queued job
+
+=cut
+
+use base qw/Class::Accessor/;
+__PACKAGE__->mk_accessors( qw/
+job
+debug
+/ );
+
+use YAML qw/LoadFile/;
+use Data::Dump qw/dump/;
+
+=head2 dispatch
- warn "## dequeue( $id ) = ", dump( $data ), " )\n" if $self->debug;
+ my ( $dispatch, $args ) = $job->dispatch;
- return $data->{metadata};
+=cut
+
+sub dispatch {
+ my $self = shift;
+ my $path = $self->job->get_data_path || die "get_data_path?";
+ my $data = LoadFile( $path ) || die "can't read $path: $!";
+ warn "## dispatch returns ",dump($data),"\n" if $self->debug;
+ return ( $data->{dispatch}, $data->{args} );
}
+
+=head2 finish
+
+Finish job and remove it from queue
+
+ $job->finish;
+
+=cut
+
+sub finish {
+ my $self = shift;
+ $self->job->finish;
+}
+
1;