move 2 sensors from klanjec to tuhelj
[zc] / zc-mqtt
diff --git a/zc-mqtt b/zc-mqtt
index 8cea33c..56d8f03 100755 (executable)
--- a/zc-mqtt
+++ b/zc-mqtt
@@ -15,16 +15,54 @@ use Time::HiRes;
 use Getopt::Long;
 use Pod::Usage;
 use POSIX qw(strftime);
+use File::Slurp;
+use DBD::Pg;
+use Data::Dump qw(dump);
+use autodie;
+
+BEGIN {
+       my $cwd = $0;
+       $cwd =~ s{/[^/]+$}{};
+       chdir $cwd;
+
+       my $log_dir = 'log';
+       mkdir $log_dir unless -d $log_dir;
+
+       open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
+       select($log);
+       $|=1;
+}
 
 use lib '.';
 use Protocol;
 
-$|=1; # flush STDOUT
+my $queue = "queue";
+
+my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
+my @columns = split(/\n/, <<__COLUMNS__);
+0x01   PN
+0x03   X_axis_angle
+0x04   Y_axis_angle
+0x0c   Sensor_temperature
+0x0d   Power_source_voltage
+0x11   Arming_disarming
+0x17   Signal_strength
+0x18   Sensor_operating_mode
+0x19   Alarm_axis
+__COLUMNS__
+
+my $cols = join(',',  map {      (split(/\t/,$_))[1]   } @columns);
+my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
+
+my $sql_placeholders = ',?' x ($#columns+1);
+$sql_placeholders =~ s/^,//;
+
+my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
 
 my $help;
 my $man;
 my $verbose = 2;
-my $host = 'mqtt.zc-sensor.com';
+my $host = 'localhost';
 my $port = 1883;
 my $count;
 my $client_id;
@@ -69,7 +107,9 @@ print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
 while (1) {
   $msg = read_message($socket, $buf);
   if ($msg) {
-    print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP;
+    my $t = time();
+    my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
+    print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
     if ($msg->message_type == MQTT_PUBLISH) {
       if ($verbose == 0) {
         print $msg->topic, " ", $msg->message, "\n";
@@ -77,18 +117,48 @@ while (1) {
         print $msg->string, "\n";
       }
 
-#      if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
-      if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
-       #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3";
-       my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" );
+      # skip retained
+      next if $msg->string =~ m{Publish/at-most-once,retain};
+
        my $topic = $msg->topic;
-       $topic =~ s/up$/down/;
-       send_message($socket,
-               message_type => MQTT_PUBLISH,
-               retain => 0, #$retain,
-               topic => $topic,
-               message => $raw);
-      }
+       # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
+       my $dir = $topic; # leave imei in dir
+       $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
+       my $up_down = $2;
+       $dir =~ s{/}{_}g; # sanitize
+
+       mkdir "$queue" if ( ! -e "$queue" );
+       mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
+       mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
+
+       my $function_code = unpack('C',substr($msg->message,2,1));
+
+       mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
+       write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
+
+       if ( $function_code == 7 || $function_code == 8 ) { #  7 = heartbeat, 8 = alarm
+               my $hash = protocol_decode( $up_down, $msg->message );
+               $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
+       }
+
+
+       # send pending messages on any connection
+       my @all_pending = sort glob "$queue/$dir/.pending/*";
+       if ( my $pending = shift @all_pending ) {
+               my $raw = read_file $pending;
+               my $pending_function_code = unpack('C',substr($msg->message,2,1));
+
+               $topic =~ s/up$/down/;
+
+               send_message($socket,
+                       message_type => MQTT_PUBLISH,
+                       retain => 0, #$retain,
+                       topic => $topic,
+                       message => $raw);
+               $pending =~ s{$queue/$dir/.pending/}{};
+               rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
+       }
+
 
       if (defined $count && --$count == 0) {
         exit;