use Pod::Usage;
use POSIX qw(strftime);
use File::Slurp;
+use DBD::Pg;
+use Data::Dump qw(dump);
use autodie;
+BEGIN {
+ my $cwd = $0;
+ $cwd =~ s{/[^/]+$}{};
+ chdir $cwd;
+
+ my $log_dir = 'log';
+ mkdir $log_dir unless -d $log_dir;
+
+ open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
+ select($log);
+ $|=1;
+}
+
use lib '.';
use Protocol;
my $queue = "queue";
-$|=1; # flush STDOUT
+my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
+my @columns = split(/\n/, <<__COLUMNS__);
+0x01 PN
+0x03 X_axis_angle
+0x04 Y_axis_angle
+0x0c Sensor_temperature
+0x0d Power_source_voltage
+0x11 Arming_disarming
+0x17 Signal_strength
+0x18 Sensor_operating_mode
+0x19 Alarm_axis
+__COLUMNS__
+
+my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
+my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
+
+my $sql_placeholders = ',?' x ($#columns+1);
+$sql_placeholders =~ s/^,//;
+
+my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
my $help;
my $man;
$msg = read_message($socket, $buf);
if ($msg) {
my $t = time();
+ my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
if ($msg->message_type == MQTT_PUBLISH) {
if ($verbose == 0) {
print $msg->string, "\n";
}
+ # skip retained
+ next if $msg->string =~ m{Publish/at-most-once,retain};
+
my $topic = $msg->topic;
- my $dir = $topic;
- $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
+ # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
+ my $dir = $topic; # leave imei in dir
+ $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
+ my $up_down = $2;
mkdir "$queue" if ( ! -e "$queue" );
mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
- mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
- write_file "$queue/$dir/up/$t", $msg->string;
+ mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
- mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
- mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
+ my $function_code = unpack('C',substr($msg->message,2,1));
+
+ mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
+ write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
+
+ if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm
+ my $hash = protocol_decode( $up_down, $msg->message );
+ $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
-# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
- if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
- $topic =~ s/up$/down/;
- my @all_pending = sort glob "$queue/$dir/down/*";
- if ( my $pending = shift @all_pending ) {
- my $raw = read_file $pending;
-
- send_message($socket,
- message_type => MQTT_PUBLISH,
- retain => 0, #$retain,
- topic => $topic,
- message => $raw);
- rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
- }
}
+
+ # send pending messages on any connection
+ my @all_pending = sort glob "$queue/$dir/.pending/*";
+ if ( my $pending = shift @all_pending ) {
+ my $raw = read_file $pending;
+ my $pending_function_code = unpack('C',substr($msg->message,2,1));
+
+ $topic =~ s/up$/down/;
+
+ send_message($socket,
+ message_type => MQTT_PUBLISH,
+ retain => 0, #$retain,
+ topic => $topic,
+ message => $raw);
+ $pending =~ s{$queue/$dir/.pending/}{};
+ rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
+ }
+
+
if (defined $count && --$count == 0) {
exit;
}