From 442647ba467f319c76d6baf63818fab4e12144ad Mon Sep 17 00:00:00 2001 From: Dobrica Pavlinusic Date: Sat, 3 Oct 2020 12:33:20 +0200 Subject: [PATCH] store data in queue files and consume it --- zc-mqtt | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/zc-mqtt b/zc-mqtt index 8cea33c..579282b 100755 --- a/zc-mqtt +++ b/zc-mqtt @@ -15,16 +15,20 @@ use Time::HiRes; use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); +use File::Slurp; +use autodie; use lib '.'; use Protocol; +my $queue = "queue"; + $|=1; # flush STDOUT my $help; my $man; my $verbose = 2; -my $host = 'mqtt.zc-sensor.com'; +my $host = 'localhost'; my $port = 1883; my $count; my $client_id; @@ -69,7 +73,8 @@ print 'Received: ', $msg->string, "\n" if ($verbose >= 2); while (1) { $msg = read_message($socket, $buf); if ($msg) { - print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP; + my $t = time(); + print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { print $msg->topic, " ", $msg->message, "\n"; @@ -77,17 +82,33 @@ while (1) { print $msg->string, "\n"; } -# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { - if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat - #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3"; - my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" ); my $topic = $msg->topic; - $topic =~ s/up$/down/; - send_message($socket, - message_type => MQTT_PUBLISH, - retain => 0, #$retain, - topic => $topic, - message => $raw); + my $dir = $topic; + $dir =~ s{\w+/\w+/(\w+)/\w+}{$1}; + + mkdir "$queue" if ( ! -e "$queue" ); + mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); + mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" ); + write_file "$queue/$dir/up/$t", $msg->string; + + mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" ); + mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" ); + +# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { + if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat + $topic =~ s/up$/down/; + + my @all_pending = sort glob "$queue/$dir/down/*"; + if ( my $pending = shift @all_pending ) { + my $raw = read_file $pending; + + send_message($socket, + message_type => MQTT_PUBLISH, + retain => 0, #$retain, + topic => $topic, + message => $raw); + rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending"; + } } if (defined $count && --$count == 0) { -- 2.20.1