store heartbeat in postgresql
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18 use File::Slurp;
19 use DBD::Pg;
20 use Data::Dump qw(dump);
21 use autodie;
22
23 use lib '.';
24 use Protocol;
25
26 my $queue = "queue";
27
28 $|=1; # flush STDOUT
29
30 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
31 my @columns = split(/\n/, <<__COLUMNS__);
32 0x01    PN
33 0x03    X_axis_angle
34 0x04    Y_axis_angle
35 0x0c    Sensor_temperature
36 0x0d    Power_source_voltage
37 0x11    Arming_disarming
38 0x17    Signal_strength
39 0x18    Sensor_operating_mode
40 0x19    Alarm_axis
41 __COLUMNS__
42
43 my $cols = join(',',  map {      (split(/\t/,$_))[1]   } @columns);
44 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
45
46 my $sql_placeholders = ',?' x ($#columns+1);
47 $sql_placeholders =~ s/^,//;
48
49 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
50
51 my $help;
52 my $man;
53 my $verbose = 2;
54 my $host = 'localhost';
55 my $port = 1883;
56 my $count;
57 my $client_id;
58 my $keep_alive_timer = 120;
59 GetOptions('help|?' => \$help,
60            'man' => \$man,
61            'verbose+' => \$verbose,
62            'host=s' => \$host,
63            'port=i' => \$port,
64            'count=i' => \$count,
65            'one|1' => sub { $count = 1 },
66            'client_id|client-id|C=s' => \$client_id,
67            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
68 pod2usage(1) if ($help);
69 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
70 #pod2usage(2) unless (@ARGV); # need a topic
71 push @ARGV, '#' unless (@ARGV);
72
73 open_again:
74 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
75 my $socket =
76   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
77                         Timeout => $keep_alive_timer,
78                        ) or die "Socket connect failed: $!\n";
79
80 my $buf = '';
81 my $mid = 1;
82 my $next_ping;
83 my $got_ping_response = 1;
84 my @connect = ( message_type => MQTT_CONNECT,
85                 keep_alive_timer => $keep_alive_timer );
86 push @connect, client_id => $client_id if (defined $client_id);
87 send_message($socket, @connect);
88 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
89 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
90 send_message($socket, message_type => MQTT_SUBSCRIBE,
91              message_id => $mid++,
92              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
93 $msg = read_message($socket, $buf) or die "No SubAck\n";
94 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
95
96 while (1) {
97   $msg = read_message($socket, $buf);
98   if ($msg) {
99     my $t = time();
100     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
101     if ($msg->message_type == MQTT_PUBLISH) {
102       if ($verbose == 0) {
103         print $msg->topic, " ", $msg->message, "\n";
104       } else {
105         print $msg->string, "\n";
106       }
107
108         my $topic = $msg->topic;
109         # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
110         my $dir = $topic; # leave imei in dir
111         $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
112         my $up_down = $2;
113         
114         mkdir "$queue" if ( ! -e "$queue" );
115         mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
116         mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
117
118         my $function_code = unpack('C',substr($msg->message,2,1));
119
120         write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message;
121
122         if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
123                 $topic =~ s/up$/down/;
124
125                 my @all_pending = sort glob "$queue/$dir/.pending/*";
126                 if ( my $pending = shift @all_pending ) {
127                         my $raw = read_file $pending;
128                         my $pending_function_code = unpack('C',substr($msg->message,2,1));
129
130                         send_message($socket,
131                                 message_type => MQTT_PUBLISH,
132                                 retain => 0, #$retain,
133                                 topic => $topic,
134                                 message => $raw);
135                         $pending =~ s{$queue/$dir/.pending/}{};
136                         rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
137                 }
138
139                 my $hash = protocol_decode( $up_down, $msg->message );
140                 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
141
142
143       }
144
145       if (defined $count && --$count == 0) {
146         exit;
147       }
148     } elsif ($msg->message_type == MQTT_PINGRESP) {
149       $got_ping_response = 1;
150       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
151     } else {
152       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
153     }
154   }
155   if (Time::HiRes::time > $next_ping) {
156     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
157     send_message($socket, message_type => MQTT_PINGREQ);
158   }
159 }
160
161 sub send_message {
162   my $socket = shift;
163   my $msg = Net::MQTT::Message->new(@_);
164   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
165   $msg = $msg->bytes;
166   syswrite $socket, $msg, length $msg;
167   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
168   $next_ping = Time::HiRes::time + $keep_alive_timer;
169 }
170
171 sub read_message {
172   my $socket = shift;
173   my $select = IO::Select->new($socket);
174   my $timeout = $next_ping - Time::HiRes::time;
175   do {
176     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
177     return $mqtt if (defined $mqtt);
178     $select->can_read($timeout) || return;
179     $timeout = $next_ping - Time::HiRes::time;
180     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
181     unless ($bytes) {
182       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
183       # FIXME reopen
184       goto open_again;
185     }
186     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
187       if ($verbose >= 3);
188   } while ($timeout > 0);
189   return;
190 }
191
192 __END__
193
194 =pod
195
196 =encoding UTF-8
197
198 =head1 NAME
199
200 net-mqtt-sub - Perl script for subscribing to an MQTT topic
201
202 =head1 VERSION
203
204 version 1.143260
205
206 =head1 SYNOPSIS
207
208   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
209
210 =head1 DESCRIPTION
211
212 This script subscribes to one or more MQTT topics and prints any
213 messages that it receives to stdout.
214
215 =head1 OPTIONS
216
217 =over
218
219 =item B<-help>
220
221 Print a brief help message.
222
223 =item B<-man>
224
225 Print the manual page.
226
227 =item B<-host>
228
229 The host running the MQTT service.  The default is C<127.0.0.1>.
230
231 =item B<-port>
232
233 The port of the running MQTT service.  The default is 1883.
234
235 =item B<-client-id>
236
237 The client id to use in the connect message.  The default is
238 'NetMQTTpm' followed by the process id of the process.
239
240 =item B<-verbose>
241
242 Include more verbose output.  Without this option the script only
243 outputs errors and received messages one per line in the form:
244
245   topic message
246
247 With one B<-verbose> options, publish messages are printed in a form
248 of a summary of the header fields and the payload in hex dump and text
249 form.
250
251 With two B<-verbose> options, summaries are printed for all messages
252 sent and received.
253
254 With three B<-verbose> options, a hex dump of all data transmitted and
255 received is printed.
256
257 =item B<-keepalive NNN>
258
259 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
260 it is also currently used as the connection/subscription timeout.
261
262 =item B<-count NNN>
263
264 Read the specificed number of MQTT messages and then exit.  Default
265 is 0 - read forever.
266
267 =item B<-one> or B<-1>
268
269 Short for B<-count 1>.  Read one message and exit.
270
271 =back
272
273 =head1 SEE ALSO
274
275 Net::MQTT::Message(3)
276
277 =head1 DISCLAIMER
278
279 This is B<not> official IBM code.  I work for IBM but I'm writing this
280 in my spare time (with permission) for fun.
281
282 =head1 AUTHOR
283
284 Mark Hindess <soft-cpan@temporalanomaly.com>
285
286 =head1 COPYRIGHT AND LICENSE
287
288 This software is copyright (c) 2014 by Mark Hindess.
289
290 This is free software; you can redistribute it and/or modify it under
291 the same terms as the Perl 5 programming language system itself.
292
293 =cut
294