8087255a1f5756266cd66b332b4394c079b04870
[perl-cwmp.git] / lib / CWMP / Queue.pm
1 package CWMP::Queue;
2
3 use strict;
4 use warnings;
5
6
7 use base qw/Class::Accessor/;
8 __PACKAGE__->mk_accessors( qw/
9 id
10 debug
11
12 / );
13
14 #use Carp qw/confess/;
15 use Data::Dump qw/dump/;
16 use File::Spec;
17 use File::Path qw/mkpath/;
18 use IPC::DirQueue;
19 use YAML qw/Dump/;
20 use Carp qw/confess/;
21
22 =head1 NAME
23
24 CWMP::Queue - implement commands queue for CPE
25
26 =head1 METHODS
27
28 =head2 new
29
30   my $obj = CWMP::Queue->new({
31         id => 'CPE_serial_number',
32         debug => 1
33   });
34
35 =cut
36
37 sub new {
38         my $class = shift;
39         my $self = $class->SUPER::new( @_ );
40
41         die "need id" unless $self->id;
42
43         warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
44
45         my $dir = File::Spec->catfile('queue',$self->id);
46
47         if ( ! -e $dir ) {
48                 mkpath $dir || die "can't create $dir: $!";
49                 print "created new queue $dir\n";
50         }
51
52         my $id = $self->id;
53
54         if ( ! defined( $self->{dq}->{$id} ) ) {
55                 $self->{dq}->{$id} = IPC::DirQueue->new({
56                         dir => $dir,
57                         ordered => 1,
58                         queue_fanout => 0,
59                 });
60                 warn "## created queue object for CPE $id path $dir\n" if $self->debug;
61         }
62
63         return $self;
64 }
65
66 =head2 enqueue
67
68   $q->enqueue(
69         'CommandToDispatch', {
70                 'foo.bar.baz' => 42,
71         }
72   );
73
74 =cut
75
76 sub enqueue {
77         my $self = shift;
78
79         my $id = $self->id;
80         my ( $dispatch, $args ) = @_;
81
82         warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
83         
84         $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
85 }
86
87 =head2 dequeue
88
89   my $job = $q->dequeue;
90   my ( $dispatch, $args ) = $job->dispatch;
91   # after dispatch is processed
92   $job->finish;
93
94 =cut
95
96 sub dequeue {
97         my $self = shift;
98
99         my $id = $self->id;
100
101         my $job = $self->{dq}->{$id}->pickup_queued_job();
102         return unless defined $job;
103
104         warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug;
105
106         return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
107 }
108
109 =head2 dq
110
111 Accessor to C<IPC::DirQueue> object
112
113   my $dq = $q->dq;
114
115 =cut
116
117 sub dq {
118         my $self = shift;
119         return $self->{dq}->{$self->id};
120 }
121
122 package CWMP::Queue::Job;
123
124 =head1 CWMP::Queue::Job
125
126 Single queued job
127
128 =cut
129
130 use base qw/Class::Accessor/;
131 __PACKAGE__->mk_accessors( qw/
132 job
133 debug
134 / );
135
136 use YAML qw/LoadFile/;
137 use Data::Dump qw/dump/;
138
139 =head2 dispatch
140
141   my ( $dispatch, $args ) = $job->dispatch;
142
143 =cut
144
145 sub dispatch {
146         my $self = shift;
147         my $path = $self->job->get_data_path || die "get_data_path?";
148         my $data = LoadFile( $path ) || die "can't read $path: $!";
149         warn "## dispatch returns ",dump($data),"\n" if $self->debug;
150         return ( $data->{dispatch}, $data->{args} );
151 }
152
153 =head2 finish
154
155 Finish job and remove it from queue
156
157   $job->finish;
158
159 =cut
160
161 sub finish {
162         my $self = shift;
163         $self->job->finish;
164 }
165
166 1;