5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
20 use Data::Dump qw(dump);
29 mkdir $log_dir unless -d $log_dir;
31 open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
46 0x0c Sensor_temperature
47 0x0d Power_source_voltage
50 0x18 Sensor_operating_mode
54 my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
65 my $host = 'localhost';
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
72 'verbose+' => \$verbose,
76 'one|1' => sub { $count = 1 },
77 'client_id|client-id|C=s' => \$client_id,
78 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
87 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88 Timeout => $keep_alive_timer,
89 ) or die "Socket connect failed: $!\n";
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102 message_id => $mid++,
103 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
108 $msg = read_message($socket, $buf);
111 my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113 if ($msg->message_type == MQTT_PUBLISH) {
115 print $msg->topic, " ", $msg->message, "\n";
117 print $msg->string, "\n";
120 my $topic = $msg->topic;
121 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
122 my $dir = $topic; # leave imei in dir
123 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
126 mkdir "$queue" if ( ! -e "$queue" );
127 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
128 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
130 my $function_code = unpack('C',substr($msg->message,2,1));
132 mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
133 write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
135 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
136 $topic =~ s/up$/down/;
138 my @all_pending = sort glob "$queue/$dir/.pending/*";
139 if ( my $pending = shift @all_pending ) {
140 my $raw = read_file $pending;
141 my $pending_function_code = unpack('C',substr($msg->message,2,1));
143 send_message($socket,
144 message_type => MQTT_PUBLISH,
145 retain => 0, #$retain,
148 $pending =~ s{$queue/$dir/.pending/}{};
149 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
152 my $hash = protocol_decode( $up_down, $msg->message );
153 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
158 if (defined $count && --$count == 0) {
161 } elsif ($msg->message_type == MQTT_PINGRESP) {
162 $got_ping_response = 1;
163 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
165 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
168 if (Time::HiRes::time > $next_ping) {
169 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
170 send_message($socket, message_type => MQTT_PINGREQ);
176 my $msg = Net::MQTT::Message->new(@_);
177 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
179 syswrite $socket, $msg, length $msg;
180 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
181 $next_ping = Time::HiRes::time + $keep_alive_timer;
186 my $select = IO::Select->new($socket);
187 my $timeout = $next_ping - Time::HiRes::time;
189 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
190 return $mqtt if (defined $mqtt);
191 $select->can_read($timeout) || return;
192 $timeout = $next_ping - Time::HiRes::time;
193 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
195 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
199 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
201 } while ($timeout > 0);
213 net-mqtt-sub - Perl script for subscribing to an MQTT topic
221 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
225 This script subscribes to one or more MQTT topics and prints any
226 messages that it receives to stdout.
234 Print a brief help message.
238 Print the manual page.
242 The host running the MQTT service. The default is C<127.0.0.1>.
246 The port of the running MQTT service. The default is 1883.
250 The client id to use in the connect message. The default is
251 'NetMQTTpm' followed by the process id of the process.
255 Include more verbose output. Without this option the script only
256 outputs errors and received messages one per line in the form:
260 With one B<-verbose> options, publish messages are printed in a form
261 of a summary of the header fields and the payload in hex dump and text
264 With two B<-verbose> options, summaries are printed for all messages
267 With three B<-verbose> options, a hex dump of all data transmitted and
270 =item B<-keepalive NNN>
272 The keep alive timer value. Defaults to 120 seconds. For simplicity,
273 it is also currently used as the connection/subscription timeout.
277 Read the specificed number of MQTT messages and then exit. Default
280 =item B<-one> or B<-1>
282 Short for B<-count 1>. Read one message and exit.
288 Net::MQTT::Message(3)
292 This is B<not> official IBM code. I work for IBM but I'm writing this
293 in my spare time (with permission) for fun.
297 Mark Hindess <soft-cpan@temporalanomaly.com>
299 =head1 COPYRIGHT AND LICENSE
301 This software is copyright (c) 2014 by Mark Hindess.
303 This is free software; you can redistribute it and/or modify it under
304 the same terms as the Perl 5 programming language system itself.