move CPE specific stuff back into CWMP::Vendor
[perl-cwmp.git] / lib / CWMP / Queue.pm
1 package CWMP::Queue;
2
3 use strict;
4 use warnings;
5
6
7 use base qw/Class::Accessor/;
8 __PACKAGE__->mk_accessors( qw/
9 id
10 dir
11 clean
12 debug
13
14 / );
15
16 #use Carp qw/confess/;
17 use Data::Dump qw/dump/;
18 use File::Spec;
19 use File::Path qw/mkpath rmtree/;
20 use IPC::DirQueue;
21 use YAML::Syck qw/Dump/;
22 use Carp qw/confess/;
23
24 #use Devel::LeakTrace::Fast;
25
26 =head1 NAME
27
28 CWMP::Queue - implement commands queue for CPE
29
30 =head1 METHODS
31
32 =head2 new
33
34   my $obj = CWMP::Queue->new({
35         id => 'CPE_serial_number',
36         dir => 'queue',
37         clean => 1,
38         debug => 1
39   });
40
41 =cut
42
43 sub new {
44         my $class = shift;
45         my $self = $class->SUPER::new( @_ );
46
47         die "need id" unless $self->id;
48
49         warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
50
51         my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id );
52
53         if ( -e $dir && $self->clean ) {
54                 rmtree $dir || die "can't remove $dir: $!";
55                 warn "## clean $dir\n" if $self->debug;
56         }
57
58         if ( ! -e $dir ) {
59                 mkpath $dir || die "can't create $dir: $!";
60                 print "created new queue $dir\n";
61         }
62
63         my $id = $self->id;
64
65         if ( ! defined( $self->{dq}->{$id} ) ) {
66                 $self->{dq}->{$id} = IPC::DirQueue->new({
67                         dir => $dir,
68                         ordered => 1,
69                         queue_fanout => 0,
70                 });
71                 warn "## created queue object for CPE $id path $dir\n" if $self->debug;
72         }
73
74         return $self;
75 }
76
77 =head2 enqueue
78
79   $q->enqueue(
80         'CommandToDispatch', {
81                 'foo.bar.baz' => 42,
82         }
83   );
84
85 =cut
86
87 sub enqueue {
88         my $self = shift;
89
90         my $id = $self->id;
91         my ( $dispatch, $args ) = @_;
92
93         warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
94         
95         $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
96 }
97
98 =head2 dequeue
99
100   my $job = $q->dequeue;
101   my ( $dispatch, $args ) = $job->dispatch;
102   # after dispatch is processed
103   $job->finish;
104
105 =cut
106
107 sub dequeue {
108         my $self = shift;
109
110         my $id = $self->id;
111
112         my $job = $self->{dq}->{$id}->pickup_queued_job();
113         return unless defined $job;
114
115         warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug;
116
117         return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
118 }
119
120 =head2 dq
121
122 Accessor to C<IPC::DirQueue> object
123
124   my $dq = $q->dq;
125
126 =cut
127
128 sub dq {
129         my $self = shift;
130         return $self->{dq}->{$self->id};
131 }
132
133 package CWMP::Queue::Job;
134
135 =head1 CWMP::Queue::Job
136
137 Single queued job
138
139 =cut
140
141 use base qw/Class::Accessor/;
142 __PACKAGE__->mk_accessors( qw/
143 job
144 debug
145 / );
146
147 use YAML qw/LoadFile/;
148 use Data::Dump qw/dump/;
149
150 =head2 dispatch
151
152   my ( $dispatch, $args ) = $job->dispatch;
153
154 =cut
155
156 sub dispatch {
157         my $self = shift;
158         my $path = $self->job->get_data_path || die "get_data_path?";
159         my $data = LoadFile( $path ) || die "can't read $path: $!";
160         warn "## dispatch returns ",dump($data),"\n" if $self->debug;
161         return ( $data->{dispatch}, $data->{args} );
162 }
163
164 =head2 finish
165
166 Finish job and remove it from queue
167
168   $job->finish;
169
170 =cut
171
172 sub finish {
173         my $self = shift;
174         $self->job->finish;
175         return 1;
176 }
177
178 1;