5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
20 use Data::Dump qw(dump);
29 mkdir $log_dir unless -d $log_dir;
31 open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
46 0x0c Sensor_temperature
47 0x0d Power_source_voltage
50 0x18 Sensor_operating_mode
54 my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
65 my $host = 'localhost';
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
72 'verbose+' => \$verbose,
76 'one|1' => sub { $count = 1 },
77 'client_id|client-id|C=s' => \$client_id,
78 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
87 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88 Timeout => $keep_alive_timer,
89 ) or die "Socket connect failed: $!\n";
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102 message_id => $mid++,
103 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
108 $msg = read_message($socket, $buf);
111 my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113 if ($msg->message_type == MQTT_PUBLISH) {
115 print $msg->topic, " ", $msg->message, "\n";
117 print $msg->string, "\n";
121 next if $msg->string =~ m{Publish/at-most-once,retain};
123 my $topic = $msg->topic;
124 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
125 my $dir = $topic; # leave imei in dir
126 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
129 mkdir "$queue" if ( ! -e "$queue" );
130 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
131 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
133 my $function_code = unpack('C',substr($msg->message,2,1));
135 mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
136 write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
138 if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm
139 my $hash = protocol_decode( $up_down, $msg->message );
140 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
146 # send pending messages on any connection
147 my @all_pending = sort glob "$queue/$dir/.pending/*";
148 if ( my $pending = shift @all_pending ) {
149 my $raw = read_file $pending;
150 my $pending_function_code = unpack('C',substr($msg->message,2,1));
152 $topic =~ s/up$/down/;
154 send_message($socket,
155 message_type => MQTT_PUBLISH,
156 retain => 0, #$retain,
159 $pending =~ s{$queue/$dir/.pending/}{};
160 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
164 if (defined $count && --$count == 0) {
167 } elsif ($msg->message_type == MQTT_PINGRESP) {
168 $got_ping_response = 1;
169 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
171 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
174 if (Time::HiRes::time > $next_ping) {
175 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
176 send_message($socket, message_type => MQTT_PINGREQ);
182 my $msg = Net::MQTT::Message->new(@_);
183 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
185 syswrite $socket, $msg, length $msg;
186 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
187 $next_ping = Time::HiRes::time + $keep_alive_timer;
192 my $select = IO::Select->new($socket);
193 my $timeout = $next_ping - Time::HiRes::time;
195 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
196 return $mqtt if (defined $mqtt);
197 $select->can_read($timeout) || return;
198 $timeout = $next_ping - Time::HiRes::time;
199 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
201 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
205 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
207 } while ($timeout > 0);
219 net-mqtt-sub - Perl script for subscribing to an MQTT topic
227 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
231 This script subscribes to one or more MQTT topics and prints any
232 messages that it receives to stdout.
240 Print a brief help message.
244 Print the manual page.
248 The host running the MQTT service. The default is C<127.0.0.1>.
252 The port of the running MQTT service. The default is 1883.
256 The client id to use in the connect message. The default is
257 'NetMQTTpm' followed by the process id of the process.
261 Include more verbose output. Without this option the script only
262 outputs errors and received messages one per line in the form:
266 With one B<-verbose> options, publish messages are printed in a form
267 of a summary of the header fields and the payload in hex dump and text
270 With two B<-verbose> options, summaries are printed for all messages
273 With three B<-verbose> options, a hex dump of all data transmitted and
276 =item B<-keepalive NNN>
278 The keep alive timer value. Defaults to 120 seconds. For simplicity,
279 it is also currently used as the connection/subscription timeout.
283 Read the specificed number of MQTT messages and then exit. Default
286 =item B<-one> or B<-1>
288 Short for B<-count 1>. Read one message and exit.
294 Net::MQTT::Message(3)
298 This is B<not> official IBM code. I work for IBM but I'm writing this
299 in my spare time (with permission) for fun.
303 Mark Hindess <soft-cpan@temporalanomaly.com>
305 =head1 COPYRIGHT AND LICENSE
307 This software is copyright (c) 2014 by Mark Hindess.
309 This is free software; you can redistribute it and/or modify it under
310 the same terms as the Perl 5 programming language system itself.