}
my $packet_type = {
+ TEXT => -1, # fake type for text protocol
+
CAN_DO => 1,
PRE_SLEEP => 4,
my $response;
my $cb = sub {
my ( $self, $data ) = @_;
+
+ if ( substr($data,0,1) ne "\x00" ) {
+ $self->res( $data );
+ $self->stop;
+ return;
+ }
+
my ( $type, @data ) = $self->parse_packet($data);
warn "# <<<< ", $nr2type->{$type}, " ",dump(@data);
# $data .= "\0" if $data;
my $len = length($data);
- my $message = pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data;
+ my $message = $type ne 'TEXT'
+ ? pack("a4NN", "\0REQ", $packet_type->{$type}, length $data ) . $data
+ : "$data\r\n"
+ ;
warn "# >>>> $type ",dump($message);
my $mqueue = $self->{_message_queue} ||= [];
$self->{_function}->{$data} = $callback;
$self->res( $callback );
warn "# installed $data callback $callback";
- } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB)$/ ) { # sync commands
+ } elsif ( $type =~ m/^(ECHO_REQ|SUBMIT_JOB|GRAB_JOB|TEXT)$/ ) { # sync commands
$self->start;
}
$data = $self->{message}->{$id} .= $data;
my ($magic, $type, $len) = unpack( "a4NN", $data );
- if ( length $data < $len ) {
+ if ( substr($data,0,1) ne "\x00" ) {
+ return if $data !~ s/\.[\n\r]$//s;
+ } elsif ( length $data < $len ) {
warn "# _on_read incomplete message";
return;
}
ok( my $echo = $g->req( 'ECHO_REQ', "foobar" ), 'ECHO' );
cmp_ok $echo, 'eq', "foobar";
-ok( my $ping = $g->req( 'SUBMIT_JOB', 'ping', '', 'bla' ), 'SUBMIT_JOB' );
-like $ping, qr/pong/, 'got pong';
-diag dump $ping;
+ok( my $workers = $g->req( 'TEXT', 'workers' ), 'workers' );
+diag "workers $workers";
+
+ok( my $status = $g->req( 'TEXT', 'status' ), 'status' );
+diag dump $status;
ok( $g->req( 'ECHO_REQ', "alive" ), 'ECHO - still alive - still alive?' );