store data in queue files and consume it
authorDobrica Pavlinusic <dpavlin@rot13.org>
Sat, 3 Oct 2020 10:33:20 +0000 (12:33 +0200)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Sat, 3 Oct 2020 10:33:20 +0000 (12:33 +0200)
zc-mqtt

diff --git a/zc-mqtt b/zc-mqtt
index 8cea33c..579282b 100755 (executable)
--- a/zc-mqtt
+++ b/zc-mqtt
@@ -15,16 +15,20 @@ use Time::HiRes;
 use Getopt::Long;
 use Pod::Usage;
 use POSIX qw(strftime);
+use File::Slurp;
+use autodie;
 
 use lib '.';
 use Protocol;
 
+my $queue = "queue";
+
 $|=1; # flush STDOUT
 
 my $help;
 my $man;
 my $verbose = 2;
-my $host = 'mqtt.zc-sensor.com';
+my $host = 'localhost';
 my $port = 1883;
 my $count;
 my $client_id;
@@ -69,7 +73,8 @@ print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
 while (1) {
   $msg = read_message($socket, $buf);
   if ($msg) {
-    print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP;
+    my $t = time();
+    print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
     if ($msg->message_type == MQTT_PUBLISH) {
       if ($verbose == 0) {
         print $msg->topic, " ", $msg->message, "\n";
@@ -77,17 +82,33 @@ while (1) {
         print $msg->string, "\n";
       }
 
-#      if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
-      if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
-       #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3";
-       my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" );
        my $topic = $msg->topic;
-       $topic =~ s/up$/down/;
-       send_message($socket,
-               message_type => MQTT_PUBLISH,
-               retain => 0, #$retain,
-               topic => $topic,
-               message => $raw);
+       my $dir = $topic;
+       $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
+       
+       mkdir "$queue" if ( ! -e "$queue" );
+       mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
+       mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
+       write_file "$queue/$dir/up/$t", $msg->string;
+
+       mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
+       mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
+
+#      if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
+       if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
+               $topic =~ s/up$/down/;
+
+               my @all_pending = sort glob "$queue/$dir/down/*";
+               if ( my $pending = shift @all_pending ) {
+                       my $raw = read_file $pending;
+
+                       send_message($socket,
+                               message_type => MQTT_PUBLISH,
+                               retain => 0, #$retain,
+                               topic => $topic,
+                               message => $raw);
+                       rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
+               }
       }
 
       if (defined $count && --$count == 0) {