mkdir "$queue" if ( ! -e "$queue" );
mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
- mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
+ mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
my $function_code = unpack('C',substr($msg->message,2,1));
- write_file "$queue/$dir/up/$t.$function_code", $msg->message;
-
- mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
- mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
+ write_file "$queue/$dir/$t.up.$function_code", $msg->message;
# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
$topic =~ s/up$/down/;
- my @all_pending = sort glob "$queue/$dir/down/*";
+ my @all_pending = sort glob "$queue/$dir/.pending/*";
if ( my $pending = shift @all_pending ) {
my $raw = read_file $pending;
+ my $pending_function_code = unpack('C',substr($msg->message,2,1));
send_message($socket,
message_type => MQTT_PUBLISH,
retain => 0, #$retain,
topic => $topic,
message => $raw);
- $pending =~ s{$queue/$dir/down/}{};
- rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
+ $pending =~ s{$queue/$dir/.pending/}{};
+ rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.down.$function_code";
}
}
# temporary store to done
my $t = time();
-write_file "$queue/$imei/down/.done/$t", $raw;
+write_file "$queue/$imei/.todo.$t", $raw;
# atomic rename to ensure that file is complete
-rename "$queue/$imei/down/.done/$t", "$queue/$imei/down/$t";
+rename "$queue/$imei/.todo.$t", "$queue/$imei/.pending/$t";