X-Git-Url: http://git.rot13.org/?p=zc;a=blobdiff_plain;f=zc-mqtt;h=5a6cd86204a79a9f38b375e3ade3634161f191aa;hp=f7bd2b03837196732906d959875c606a0be5c8b7;hb=5eb661024eed4e5f624b31a2f80111c69f234cd0;hpb=dec910cff29456db90cb78858b8f37a36e98b2bf diff --git a/zc-mqtt b/zc-mqtt index f7bd2b0..5a6cd86 100755 --- a/zc-mqtt +++ b/zc-mqtt @@ -16,14 +16,48 @@ use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); use File::Slurp; +use DBD::Pg; +use Data::Dump qw(dump); use autodie; +BEGIN { + my $cwd = $0; + $cwd =~ s{/[^/]+$}{}; + chdir $cwd; + + my $log_dir = 'log'; + mkdir $log_dir unless -d $log_dir; + + open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime())); + select($log); + $|=1; +} + use lib '.'; use Protocol; my $queue = "queue"; -$|=1; # flush STDOUT +my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 }); +my @columns = split(/\n/, <<__COLUMNS__); +0x01 PN +0x03 X_axis_angle +0x04 Y_axis_angle +0x0c Sensor_temperature +0x0d Power_source_voltage +0x11 Arming_disarming +0x17 Signal_strength +0x18 Sensor_operating_mode +0x19 Alarm_axis +__COLUMNS__ + +my $cols = join(',', map { (split(/\t/,$_))[1] } @columns); +my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ; + +my $sql_placeholders = ',?' x ($#columns+1); +$sql_placeholders =~ s/^,//; + +my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)}); my $help; my $man; @@ -74,6 +108,7 @@ while (1) { $msg = read_message($socket, $buf); if ($msg) { my $t = time(); + my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t))); print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { @@ -83,8 +118,10 @@ while (1) { } my $topic = $msg->topic; - my $dir = $topic; - $dir =~ s{\w+/\w+/(\w+)/\w+}{$1}; + # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up + my $dir = $topic; # leave imei in dir + $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1}; + my $up_down = $2; mkdir "$queue" if ( ! -e "$queue" ); mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); @@ -92,9 +129,9 @@ while (1) { my $function_code = unpack('C',substr($msg->message,2,1)); - write_file "$queue/$dir/$t.up.$function_code", $msg->message; + mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date"; + write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message; -# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat $topic =~ s/up$/down/; @@ -109,8 +146,13 @@ while (1) { topic => $topic, message => $raw); $pending =~ s{$queue/$dir/.pending/}{}; - rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.down.$function_code"; + rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; } + + my $hash = protocol_decode( $up_down, $msg->message ); + $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids ); + + } if (defined $count && --$count == 0) {