use Data::Dump qw(dump);
use autodie;
+BEGIN {
+ my $cwd = $0;
+ $cwd =~ s{/[^/]+$}{};
+ chdir $cwd;
+
+ my $log_dir = 'log';
+ mkdir $log_dir unless -d $log_dir;
+
+ open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
+ select($log);
+ $|=1;
+}
+
use lib '.';
use Protocol;
my $queue = "queue";
-$|=1; # flush STDOUT
-
my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
my @columns = split(/\n/, <<__COLUMNS__);
0x01 PN
$msg = read_message($socket, $buf);
if ($msg) {
my $t = time();
+ my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
if ($msg->message_type == MQTT_PUBLISH) {
if ($verbose == 0) {
print $msg->string, "\n";
}
+ # skip retained
+ next if $msg->string =~ m{Publish/at-most-once,retain};
+
my $topic = $msg->topic;
# Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
my $dir = $topic; # leave imei in dir
my $function_code = unpack('C',substr($msg->message,2,1));
- write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message;
-
- if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
- $topic =~ s/up$/down/;
-
- my @all_pending = sort glob "$queue/$dir/.pending/*";
- if ( my $pending = shift @all_pending ) {
- my $raw = read_file $pending;
- my $pending_function_code = unpack('C',substr($msg->message,2,1));
-
- send_message($socket,
- message_type => MQTT_PUBLISH,
- retain => 0, #$retain,
- topic => $topic,
- message => $raw);
- $pending =~ s{$queue/$dir/.pending/}{};
- rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
- }
+ mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
+ write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
+ if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm
my $hash = protocol_decode( $up_down, $msg->message );
$sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
}
+
+ # send pending messages on any connection
+ my @all_pending = sort glob "$queue/$dir/.pending/*";
+ if ( my $pending = shift @all_pending ) {
+ my $raw = read_file $pending;
+ my $pending_function_code = unpack('C',substr($msg->message,2,1));
+
+ $topic =~ s/up$/down/;
+
+ send_message($socket,
+ message_type => MQTT_PUBLISH,
+ retain => 0, #$retain,
+ topic => $topic,
+ message => $raw);
+ $pending =~ s{$queue/$dir/.pending/}{};
+ rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
+ }
+
+
if (defined $count && --$count == 0) {
exit;
}