X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=zc-mqtt;h=e1593bec86aec997d0d36564c3e120c6353b55fc;hb=0d16afce17ea41345464e581734aa0984262b5dd;hp=365dc94d1c25823378db1f09aaef74211de36c4c;hpb=07c8962520f2fa1e913567f8cb647dc0636fa230;p=zc diff --git a/zc-mqtt b/zc-mqtt index 365dc94..e1593be 100755 --- a/zc-mqtt +++ b/zc-mqtt @@ -25,7 +25,10 @@ BEGIN { $cwd =~ s{/[^/]+$}{}; chdir $cwd; - open(my $log, '>', 'log.'.strftime("%Y-%m-%dT%H:%M:%S",localtime())); + my $log_dir = 'log'; + mkdir $log_dir unless -d $log_dir; + + open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime())); select($log); $|=1; } @@ -105,6 +108,7 @@ while (1) { $msg = read_message($socket, $buf); if ($msg) { my $t = time(); + my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t))); print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { @@ -113,6 +117,9 @@ while (1) { print $msg->string, "\n"; } + # skip retained + next if $msg->string =~ m{Publish/at-most-once,retain}; + my $topic = $msg->topic; # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up my $dir = $topic; # leave imei in dir @@ -125,31 +132,35 @@ while (1) { my $function_code = unpack('C',substr($msg->message,2,1)); - write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message; - - if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat - $topic =~ s/up$/down/; - - my @all_pending = sort glob "$queue/$dir/.pending/*"; - if ( my $pending = shift @all_pending ) { - my $raw = read_file $pending; - my $pending_function_code = unpack('C',substr($msg->message,2,1)); - - send_message($socket, - message_type => MQTT_PUBLISH, - retain => 0, #$retain, - topic => $topic, - message => $raw); - $pending =~ s{$queue/$dir/.pending/}{}; - rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; - } + mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date"; + write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message; + if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm my $hash = protocol_decode( $up_down, $msg->message ); $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids ); } + + # send pending messages on any connection + my @all_pending = sort glob "$queue/$dir/.pending/*"; + if ( my $pending = shift @all_pending ) { + my $raw = read_file $pending; + my $pending_function_code = unpack('C',substr($msg->message,2,1)); + + $topic =~ s/up$/down/; + + send_message($socket, + message_type => MQTT_PUBLISH, + retain => 0, #$retain, + topic => $topic, + message => $raw); + $pending =~ s{$queue/$dir/.pending/}{}; + rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; + } + + if (defined $count && --$count == 0) { exit; }