my $debug = 0;
my $store_path = './';
my $store_plugin = 'YAML';
-my $protocol_dump = 0;
GetOptions(
'debug+' => \$debug,
'port=i' => \$port,
'store-path=s' => \$store_path,
'store-plugin=s' => \$store_plugin,
- 'protocol-dump!' => \$protocol_dump,
);
-my $queue;
-
-if ( $protocol_dump ) {
-
- warn "generating dump of xml protocol with CPE\n";
-
- $queue = [
- 'GetRPCMethods',
- 'GetParameterNames',
-# [ 'GetParameterNames', 'InternetGatewayDevice.DeviceInfo.SerialNumber', 0 ],
-# [ 'GetParameterNames', 'InternetGatewayDevice.DeviceInfo.', 1 ],
- [ 'GetParameterValues',
- 'InternetGatewayDevice.DeviceInfo.SerialNumber',
- 'InternetGatewayDevice.DeviceInfo.VendorConfigFile.',
- 'InternetGatewayDevice.DeviceInfo.X_000E50_Country',
- ],
- [ 'SetParameterValues',
- 'InternetGatewayDevice.DeviceInfo.ProvisioningCode' => 'test provision',
-# 'InternetGatewayDevice.DeviceInfo.X_000E50_Country' => 1,
- ],
-# 'Reboot',
- ];
-};
-
my $server = CWMP::Server->new({
port => $port,
debug => $debug,
},
debug => $debug,
- default_queue => [ $queue ],
});
$server->run();
--- /dev/null
+#!/usr/bin/perl -w
+
+# cpe-queue.pl
+#
+# 11/12/2007 10:03:53 PM CET <>
+
+use strict;
+
+use lib './lib';
+use CWMP::Queue;
+use Getopt::Long;
+
+my $debug = 0;
+my $protocol_dump = 1;
+
+GetOptions(
+ 'debug+' => \$debug,
+ 'protocol-dump!' => \$protocol_dump,
+);
+
+my $id = shift @ARGV || die "usage: $0 CPE_id [--protocol-dump]\n";
+
+$id =~ s!^.*queue/+!!;
+$id =~ s!/+$!!; #!
+
+die "ID isn't valid: $id\n" unless $id =~ m/^\w+$/;
+
+my $q = CWMP::Queue->new({ id => $id, debug => $debug });
+
+if ( $protocol_dump ) {
+
+ warn "generating dump of xml protocol with CPE\n";
+
+ $q->enqueue( 'GetRPCMethods' );
+ $q->enqueue( 'GetParameterNames' );
+
+# $q->enqueue( 'GetParameterNames', 'InternetGatewayDevice.DeviceInfo.SerialNumber', 0 );
+# $q->enqueue( 'GetParameterNames', 'InternetGatewayDevice.DeviceInfo.', 1 );
+
+ $q->enqueue( 'GetParameterValues',
+ 'InternetGatewayDevice.DeviceInfo.SerialNumber',
+ 'InternetGatewayDevice.DeviceInfo.VendorConfigFile.',
+ 'InternetGatewayDevice.DeviceInfo.X_000E50_Country',
+ );
+ $q->enqueue( 'SetParameterValues',
+ 'InternetGatewayDevice.DeviceInfo.ProvisioningCode' => 'test provision',
+# 'InternetGatewayDevice.DeviceInfo.X_000E50_Country' => 1,
+ );
+
+# $q->enqueue( 'Reboot' );
+
+}
+
use File::Spec;
use File::Path qw/mkpath/;
use IPC::DirQueue;
+use YAML qw/Dump/;
use Carp qw/confess/;
=head1 NAME
my $self = shift;
my $id = $self->id;
- my $data = {@_} || confess "need data";
-
- warn "## enqueue( $id, ", dump( $data ), " )\n" if $self->debug;
-
- $self->{dq}->{$id}->enqueue_string( $id, $data );
+
+ warn "## enqueue( $id, ", dump( @_ ), " )\n" if $self->debug;
+
+ $self->{dq}->{$id}->enqueue_string( Dump( @_ ) );
}
=head2 dequeue
- my $data = $q->dequeue;
+ my $job = $q->dequeue;
+ my $dispatch = $job->dispatch;
+ # after dispatch is processed
+ $job->finish;
=cut
my $id = $self->id;
- my $data = $self->{dq}->{$id}->pickup_queued_job();
- return unless defined $data;
+ my $job = $self->{dq}->{$id}->pickup_queued_job();
+ return unless defined $job;
- warn "## dequeue( $id ) = ", dump( $data ), " )\n" if $self->debug;
+ warn "## dequeue( $id ) = ", dump( $job ), " )\n" if $self->debug;
- return $data->{metadata};
+ return CWMP::Queue::Job->new({ job => $job });
}
+
+package CWMP::Queue::Job;
+
+use base qw/Class::Accessor/;
+__PACKAGE__->mk_accessors( qw/
+job
+/ );
+
+use YAML qw/LoadFile/;
+
+sub dispatch {
+ my $self = shift;
+ my $path = $self->job->get_data_path || die "get_data_path?";
+ return LoadFile( $path ) || die "can't read $path: $!";
+}
+
+sub finish {
+ my $self = shift;
+ $self->job->finish;
+}
+
1;
__PACKAGE__->mk_accessors( qw/
port
store
-default_queue
background
debug
/ );
use CWMP::Session;
+use CWMP::Queue;
use Carp qw/confess/;
use Data::Dump qw/dump/;
module => 'DBMDeep',
path => 'var/',
},
- default_queue => [ qw/GetRPCMethods GetParameterNames/ ],
background => 1,
debug => 1
});
hash with key C<module> with value C<DBMDeep> if L<CWMP::Store::DBMDeep>
is used. Other parametars are optional.
-=item default_queue
-
-commands which will be issued to every CPE on connect
-
=back
=cut
CWMP::Server::Helper->new({
proto => 'tcp',
port => $self->port,
- default_queue => $self->default_queue,
store => $self->store,
debug => $self->debug,
background => $self->background,
}
# new multi-value options
- foreach my $p ( qw/ default_queue / ) {
- $prop->{ $p } ||= [];
- $template->{ $p } = $prop->{ $p };
- }
+# foreach my $p ( qw/ default_queue / ) {
+# $prop->{ $p } ||= [];
+# $template->{ $p } = $prop->{ $p };
+# }
}
my $sock = $prop->{client};
confess "no sock in ", ref( $self ) unless $sock;
- warn "default CPE queue ", dump( $prop->{default_queue} ), "\n" if defined($prop->{default_queue});
-
eval {
my $session = CWMP::Session->new({
sock => $sock,
- queue => $prop->{default_queue},
store => $prop->{store},
debug => $prop->{debug},
}) || confess "can't create session";
sock
state
-queue
store
/ );
my $server = CWMP::Session->new({
sock => $io_socket_object,
store => 'state.db',
- queue => [
- 'GetRPCMethods',
- [ 'GetParameterValyes', 'InternetGatewayDevice.DeviceInfo.SerialNumber', 0 ],
- ],
debug => 1,
});
)."\r\n");
$sock->send( "Set-Cookie: ID=" . $state->{ID} . "; path=/\r\n" ) if ( $state->{ID} );
-
+
+ my $queue = CWMP::Queue->new({
+ id => $self->store->ID_to_uid( $state->{ID}, $state ),
+ debug => $self->debug,
+ });
+ my $job;
$xml = '';
if ( my $dispatch = $state->{_dispatch} ) {
$xml = $self->dispatch( $dispatch );
- } elsif ( $dispatch = shift @{ $self->queue } ) {
- $xml = $self->dispatch( $dispatch );
+ } elsif ( $job = $queue->dequeue ) {
+ $xml = $self->dispatch( $job->dispatch );
} elsif ( $size == 0 ) {
warn ">>> no more queued commands, closing connection\n";
return 0;
warn ">>>> " . $sock->peerhost . " [" . localtime() . "] sent ", length( $xml )," bytes\n";
+ $job->finish if $job;
warn "### request over\n" if $self->debug;
return 1; # next request
my $debug = shift @ARGV;
-use Test::More tests => 129;
+use Test::More tests => 213;
use Data::Dump qw/dump/;
use lib 'lib';
isa_ok( $obj, 'CWMP::Queue' );
for my $i ( 1 .. 42 ) {
- ok( $obj->enqueue(
+ ok( $obj->enqueue({
i => $i,
foo => 'bar',
- ), "enqueue $i" );
+ }), "enqueue $i" );
};
my $i = 1;
-while ( my $data = $obj->dequeue ) {
- ok( $data, "dequeue $i" );
- cmp_ok( $data->{i}, '==', $i, "i == $i" );
+while ( my $job = $obj->dequeue ) {
+ ok( $job, "dequeue $i" );
+ ok( my $dispatch = $job->dispatch, "dispatch $i" );
+ cmp_ok( $dispatch->{i}, '==', $i, "i == $i" );
+ ok( $job->finish, "finish $i" );
$i++;
}
is_deeply( $store->current_store->get_state( 'CP0644JTHJ4' ), $state, 'store->current_store->get_state' );
+diag "shutdown server";
+
ok( kill(9,$pid), 'kill ' . $pid );
ok( waitpid($pid,0), 'waitpid' );