X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=zc-mqtt;h=cb75957f0430ea024180ada3c2f8efe3f4e68ba4;hb=8c770581c58ce831a0b73b6e8a6b2b1410025981;hp=8cea33cd636d93d2e8113f18e6e0b1678e928b22;hpb=b5b459f5de4309ee47d9d72ee7ef3e44874768d4;p=zc diff --git a/zc-mqtt b/zc-mqtt index 8cea33c..cb75957 100755 --- a/zc-mqtt +++ b/zc-mqtt @@ -15,16 +15,43 @@ use Time::HiRes; use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); +use File::Slurp; +use DBD::Pg; +use Data::Dump qw(dump); +use autodie; use lib '.'; use Protocol; +my $queue = "queue"; + $|=1; # flush STDOUT +my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 }); +my @columns = split(/\n/, <<__COLUMNS__); +0x01 PN +0x03 X_axis_angle +0x04 Y_axis_angle +0x0c Sensor_temperature +0x0d Power_source_voltage +0x11 Arming_disarming +0x17 Signal_strength +0x18 Sensor_operating_mode +0x19 Alarm_axis +__COLUMNS__ + +my $cols = join(',', map { (split(/\t/,$_))[1] } @columns); +my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ; + +my $sql_placeholders = ',?' x ($#columns+1); +$sql_placeholders =~ s/^,//; + +my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)}); + my $help; my $man; my $verbose = 2; -my $host = 'mqtt.zc-sensor.com'; +my $host = 'localhost'; my $port = 1883; my $count; my $client_id; @@ -69,7 +96,8 @@ print 'Received: ', $msg->string, "\n" if ($verbose >= 2); while (1) { $msg = read_message($socket, $buf); if ($msg) { - print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP; + my $t = time(); + print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { print $msg->topic, " ", $msg->message, "\n"; @@ -77,17 +105,41 @@ while (1) { print $msg->string, "\n"; } -# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { - if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat - #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3"; - my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" ); my $topic = $msg->topic; - $topic =~ s/up$/down/; - send_message($socket, - message_type => MQTT_PUBLISH, - retain => 0, #$retain, - topic => $topic, - message => $raw); + # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up + my $dir = $topic; # leave imei in dir + $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1}; + my $up_down = $2; + + mkdir "$queue" if ( ! -e "$queue" ); + mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); + mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" ); + + my $function_code = unpack('C',substr($msg->message,2,1)); + + write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message; + + if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat + $topic =~ s/up$/down/; + + my @all_pending = sort glob "$queue/$dir/.pending/*"; + if ( my $pending = shift @all_pending ) { + my $raw = read_file $pending; + my $pending_function_code = unpack('C',substr($msg->message,2,1)); + + send_message($socket, + message_type => MQTT_PUBLISH, + retain => 0, #$retain, + topic => $topic, + message => $raw); + $pending =~ s{$queue/$dir/.pending/}{}; + rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; + } + + my $hash = protocol_decode( $up_down, $msg->message ); + $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids ); + + } if (defined $count && --$count == 0) {