#!/usr/bin/perl use strict; use warnings; # based on net-mqtt-sub example from Net::MQTT # # 2020-08-15 Dobrica Pavlinusic use strict; use Net::MQTT::Constants; use Net::MQTT::Message; use IO::Select; use IO::Socket::INET; use Time::HiRes; use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); use File::Slurp; use DBD::Pg; use Data::Dump qw(dump); use autodie; BEGIN { my $cwd = $0; $cwd =~ s{/[^/]+$}{}; chdir $cwd; my $log_dir = 'log'; mkdir $log_dir unless -d $log_dir; open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime())); select($log); $|=1; } use lib '.'; use Protocol; my $queue = "queue"; my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 }); my @columns = split(/\n/, <<__COLUMNS__); 0x01 PN 0x03 X_axis_angle 0x04 Y_axis_angle 0x0c Sensor_temperature 0x0d Power_source_voltage 0x11 Arming_disarming 0x17 Signal_strength 0x18 Sensor_operating_mode 0x19 Alarm_axis __COLUMNS__ my $cols = join(',', map { (split(/\t/,$_))[1] } @columns); my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ; my $sql_placeholders = ',?' x ($#columns+1); $sql_placeholders =~ s/^,//; my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)}); my $help; my $man; my $verbose = 2; my $host = 'localhost'; my $port = 1883; my $count; my $client_id; my $keep_alive_timer = 120; GetOptions('help|?' => \$help, 'man' => \$man, 'verbose+' => \$verbose, 'host=s' => \$host, 'port=i' => \$port, 'count=i' => \$count, 'one|1' => sub { $count = 1 }, 'client_id|client-id|C=s' => \$client_id, 'keepalive=i' => \$keep_alive_timer) or pod2usage(2); pod2usage(1) if ($help); pod2usage(-exitstatus => 0, -verbose => 2) if $man; #pod2usage(2) unless (@ARGV); # need a topic push @ARGV, '#' unless (@ARGV); open_again: print "Open $host:port keep_alive_timer: $keep_alive_timer\n"; my $socket = IO::Socket::INET->new(PeerAddr => $host.':'.$port, Timeout => $keep_alive_timer, ) or die "Socket connect failed: $!\n"; my $buf = ''; my $mid = 1; my $next_ping; my $got_ping_response = 1; my @connect = ( message_type => MQTT_CONNECT, keep_alive_timer => $keep_alive_timer ); push @connect, client_id => $client_id if (defined $client_id); send_message($socket, @connect); my $msg = read_message($socket, $buf) or die "No ConnAck\n"; print 'Received: ', $msg->string, "\n" if ($verbose >= 2); send_message($socket, message_type => MQTT_SUBSCRIBE, message_id => $mid++, topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]); $msg = read_message($socket, $buf) or die "No SubAck\n"; print 'Received: ', $msg->string, "\n" if ($verbose >= 2); while (1) { $msg = read_message($socket, $buf); if ($msg) { my $t = time(); my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t))); print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { print $msg->topic, " ", $msg->message, "\n"; } else { print $msg->string, "\n"; } # skip retained next if $msg->string =~ m{Publish/at-most-once,retain}; my $topic = $msg->topic; # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up my $dir = $topic; # leave imei in dir $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1}; my $up_down = $2; mkdir "$queue" if ( ! -e "$queue" ); mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" ); my $function_code = unpack('C',substr($msg->message,2,1)); mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date"; write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message; if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm my $hash = protocol_decode( $up_down, $msg->message ); $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids ); } # send pending messages on any connection my @all_pending = sort glob "$queue/$dir/.pending/*"; if ( my $pending = shift @all_pending ) { my $raw = read_file $pending; my $pending_function_code = unpack('C',substr($msg->message,2,1)); $topic =~ s/up$/down/; send_message($socket, message_type => MQTT_PUBLISH, retain => 0, #$retain, topic => $topic, message => $raw); $pending =~ s{$queue/$dir/.pending/}{}; rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; } if (defined $count && --$count == 0) { exit; } } elsif ($msg->message_type == MQTT_PINGRESP) { $got_ping_response = 1; print 'Received: ', $msg->string, "\n" if ($verbose >= 3); } else { print 'Received: ', $msg->string, "\n" if ($verbose >= 2); } } if (Time::HiRes::time > $next_ping) { die "Ping Response timeout. Exiting\n" unless ($got_ping_response); send_message($socket, message_type => MQTT_PINGREQ); } } sub send_message { my $socket = shift; my $msg = Net::MQTT::Message->new(@_); print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ); $msg = $msg->bytes; syswrite $socket, $msg, length $msg; print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3); $next_ping = Time::HiRes::time + $keep_alive_timer; } sub read_message { my $socket = shift; my $select = IO::Select->new($socket); my $timeout = $next_ping - Time::HiRes::time; do { my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); return $mqtt if (defined $mqtt); $select->can_read($timeout) || return; $timeout = $next_ping - Time::HiRes::time; my $bytes = sysread $socket, $_[0], 2048, length $_[0]; unless ($bytes) { warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n"; # FIXME reopen goto open_again; } print "Receive buffer: ", dump_string($_[0], ' '), "\n\n" if ($verbose >= 3); } while ($timeout > 0); return; } __END__ =pod =encoding UTF-8 =head1 NAME net-mqtt-sub - Perl script for subscribing to an MQTT topic =head1 VERSION version 1.143260 =head1 SYNOPSIS net-mqtt-sub [options] topic1 [topic2] [topic3] ... =head1 DESCRIPTION This script subscribes to one or more MQTT topics and prints any messages that it receives to stdout. =head1 OPTIONS =over =item B<-help> Print a brief help message. =item B<-man> Print the manual page. =item B<-host> The host running the MQTT service. The default is C<127.0.0.1>. =item B<-port> The port of the running MQTT service. The default is 1883. =item B<-client-id> The client id to use in the connect message. The default is 'NetMQTTpm' followed by the process id of the process. =item B<-verbose> Include more verbose output. Without this option the script only outputs errors and received messages one per line in the form: topic message With one B<-verbose> options, publish messages are printed in a form of a summary of the header fields and the payload in hex dump and text form. With two B<-verbose> options, summaries are printed for all messages sent and received. With three B<-verbose> options, a hex dump of all data transmitted and received is printed. =item B<-keepalive NNN> The keep alive timer value. Defaults to 120 seconds. For simplicity, it is also currently used as the connection/subscription timeout. =item B<-count NNN> Read the specificed number of MQTT messages and then exit. Default is 0 - read forever. =item B<-one> or B<-1> Short for B<-count 1>. Read one message and exit. =back =head1 SEE ALSO Net::MQTT::Message(3) =head1 DISCLAIMER This is B official IBM code. I work for IBM but I'm writing this in my spare time (with permission) for fun. =head1 AUTHOR Mark Hindess =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2014 by Mark Hindess. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut