r206@brr: dpavlin | 2007-11-12 23:02:21 +0100
[perl-cwmp.git] / lib / CWMP / Queue.pm
1 package CWMP::Queue;
2
3 use strict;
4 use warnings;
5
6
7 use base qw/Class::Accessor/;
8 __PACKAGE__->mk_accessors( qw/
9 id
10 debug
11
12 / );
13
14 #use Carp qw/confess/;
15 use Data::Dump qw/dump/;
16 use File::Spec;
17 use File::Path qw/mkpath/;
18 use IPC::DirQueue;
19 use YAML qw/Dump/;
20 use Carp qw/confess/;
21
22 =head1 NAME
23
24 CWMP::Queue - implement commands queue for CPE
25
26 =head1 METHODS
27
28 =head2 new
29
30   my $obj = CWMP::Queue->new({
31         id => 'CPE_serial_number',
32         debug => 1
33   });
34
35 =cut
36
37 sub new {
38         my $class = shift;
39         my $self = $class->SUPER::new( @_ );
40
41         die "need id" unless $self->id;
42
43         warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
44
45         my $dir = File::Spec->catfile('queue',$self->id);
46
47         if ( ! -e $dir ) {
48                 mkpath $dir || die "can't create $dir: $!";
49                 print "created new queue $dir\n";
50         }
51
52         my $id = $self->id;
53
54         if ( ! defined( $self->{dq}->{$id} ) ) {
55                 $self->{dq}->{$id} = IPC::DirQueue->new({
56                         dir => $dir,
57                         ordered => 1,
58                         queue_fanout => 0,
59                 });
60                 warn "## created queue object for CPE $id path $dir\n" if $self->debug;
61         }
62
63         return $self;
64 }
65
66 =head2 enqueue
67
68   $q->enqueue(
69         'foo.bar.baz' => 42,
70   );
71
72 =cut
73
74 sub enqueue {
75         my $self = shift;
76
77         my $id = $self->id;
78         
79         warn "## enqueue( $id, ", dump( @_ ), " )\n" if $self->debug;
80         
81         $self->{dq}->{$id}->enqueue_string( Dump( @_ ) );
82 }
83
84 =head2 dequeue
85
86   my $job = $q->dequeue;
87   my $dispatch = $job->dispatch;
88   # after dispatch is processed
89   $job->finish;
90
91 =cut
92
93 sub dequeue {
94         my $self = shift;
95
96         my $id = $self->id;
97
98         my $job = $self->{dq}->{$id}->pickup_queued_job();
99         return unless defined $job;
100
101         warn "## dequeue( $id ) = ", dump( $job ), " )\n" if $self->debug;
102
103         return CWMP::Queue::Job->new({ job => $job });
104 }
105
106 package CWMP::Queue::Job;
107
108 use base qw/Class::Accessor/;
109 __PACKAGE__->mk_accessors( qw/
110 job
111 / );
112
113 use YAML qw/LoadFile/;
114
115 sub dispatch {
116         my $self = shift;
117         my $path = $self->job->get_data_path || die "get_data_path?";
118         return LoadFile( $path ) || die "can't read $path: $!";
119 }
120
121 sub finish {
122         my $self = shift;
123         $self->job->finish;
124 }
125
126 1;