send to sensors on any message, not just heartbeat
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18 use File::Slurp;
19 use DBD::Pg;
20 use Data::Dump qw(dump);
21 use autodie;
22
23 BEGIN {
24         my $cwd = $0;
25         $cwd =~ s{/[^/]+$}{};
26         chdir $cwd;
27
28         my $log_dir = 'log';
29         mkdir $log_dir unless -d $log_dir;
30
31         open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
32         select($log);
33         $|=1;
34 }
35
36 use lib '.';
37 use Protocol;
38
39 my $queue = "queue";
40
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
43 0x01    PN
44 0x03    X_axis_angle
45 0x04    Y_axis_angle
46 0x0c    Sensor_temperature
47 0x0d    Power_source_voltage
48 0x11    Arming_disarming
49 0x17    Signal_strength
50 0x18    Sensor_operating_mode
51 0x19    Alarm_axis
52 __COLUMNS__
53
54 my $cols = join(',',  map {      (split(/\t/,$_))[1]   } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
56
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
59
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
61
62 my $help;
63 my $man;
64 my $verbose = 2;
65 my $host = 'localhost';
66 my $port = 1883;
67 my $count;
68 my $client_id;
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
71            'man' => \$man,
72            'verbose+' => \$verbose,
73            'host=s' => \$host,
74            'port=i' => \$port,
75            'count=i' => \$count,
76            'one|1' => sub { $count = 1 },
77            'client_id|client-id|C=s' => \$client_id,
78            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
83
84 open_again:
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
86 my $socket =
87   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88                         Timeout => $keep_alive_timer,
89                        ) or die "Socket connect failed: $!\n";
90
91 my $buf = '';
92 my $mid = 1;
93 my $next_ping;
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96                 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102              message_id => $mid++,
103              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
106
107 while (1) {
108   $msg = read_message($socket, $buf);
109   if ($msg) {
110     my $t = time();
111     my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113     if ($msg->message_type == MQTT_PUBLISH) {
114       if ($verbose == 0) {
115         print $msg->topic, " ", $msg->message, "\n";
116       } else {
117         print $msg->string, "\n";
118       }
119
120         my $topic = $msg->topic;
121         # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
122         my $dir = $topic; # leave imei in dir
123         $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
124         my $up_down = $2;
125         
126         mkdir "$queue" if ( ! -e "$queue" );
127         mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
128         mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
129
130         my $function_code = unpack('C',substr($msg->message,2,1));
131
132         mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
133         write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
134
135         if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
136                 my $hash = protocol_decode( $up_down, $msg->message );
137                 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
138
139
140       }
141
142
143         # send pending messages on any connection
144         my @all_pending = sort glob "$queue/$dir/.pending/*";
145         if ( my $pending = shift @all_pending ) {
146                 my $raw = read_file $pending;
147                 my $pending_function_code = unpack('C',substr($msg->message,2,1));
148
149                 $topic =~ s/up$/down/;
150
151                 send_message($socket,
152                         message_type => MQTT_PUBLISH,
153                         retain => 0, #$retain,
154                         topic => $topic,
155                         message => $raw);
156                 $pending =~ s{$queue/$dir/.pending/}{};
157                 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
158         }
159
160
161       if (defined $count && --$count == 0) {
162         exit;
163       }
164     } elsif ($msg->message_type == MQTT_PINGRESP) {
165       $got_ping_response = 1;
166       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
167     } else {
168       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
169     }
170   }
171   if (Time::HiRes::time > $next_ping) {
172     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
173     send_message($socket, message_type => MQTT_PINGREQ);
174   }
175 }
176
177 sub send_message {
178   my $socket = shift;
179   my $msg = Net::MQTT::Message->new(@_);
180   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
181   $msg = $msg->bytes;
182   syswrite $socket, $msg, length $msg;
183   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
184   $next_ping = Time::HiRes::time + $keep_alive_timer;
185 }
186
187 sub read_message {
188   my $socket = shift;
189   my $select = IO::Select->new($socket);
190   my $timeout = $next_ping - Time::HiRes::time;
191   do {
192     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
193     return $mqtt if (defined $mqtt);
194     $select->can_read($timeout) || return;
195     $timeout = $next_ping - Time::HiRes::time;
196     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
197     unless ($bytes) {
198       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
199       # FIXME reopen
200       goto open_again;
201     }
202     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
203       if ($verbose >= 3);
204   } while ($timeout > 0);
205   return;
206 }
207
208 __END__
209
210 =pod
211
212 =encoding UTF-8
213
214 =head1 NAME
215
216 net-mqtt-sub - Perl script for subscribing to an MQTT topic
217
218 =head1 VERSION
219
220 version 1.143260
221
222 =head1 SYNOPSIS
223
224   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
225
226 =head1 DESCRIPTION
227
228 This script subscribes to one or more MQTT topics and prints any
229 messages that it receives to stdout.
230
231 =head1 OPTIONS
232
233 =over
234
235 =item B<-help>
236
237 Print a brief help message.
238
239 =item B<-man>
240
241 Print the manual page.
242
243 =item B<-host>
244
245 The host running the MQTT service.  The default is C<127.0.0.1>.
246
247 =item B<-port>
248
249 The port of the running MQTT service.  The default is 1883.
250
251 =item B<-client-id>
252
253 The client id to use in the connect message.  The default is
254 'NetMQTTpm' followed by the process id of the process.
255
256 =item B<-verbose>
257
258 Include more verbose output.  Without this option the script only
259 outputs errors and received messages one per line in the form:
260
261   topic message
262
263 With one B<-verbose> options, publish messages are printed in a form
264 of a summary of the header fields and the payload in hex dump and text
265 form.
266
267 With two B<-verbose> options, summaries are printed for all messages
268 sent and received.
269
270 With three B<-verbose> options, a hex dump of all data transmitted and
271 received is printed.
272
273 =item B<-keepalive NNN>
274
275 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
276 it is also currently used as the connection/subscription timeout.
277
278 =item B<-count NNN>
279
280 Read the specificed number of MQTT messages and then exit.  Default
281 is 0 - read forever.
282
283 =item B<-one> or B<-1>
284
285 Short for B<-count 1>.  Read one message and exit.
286
287 =back
288
289 =head1 SEE ALSO
290
291 Net::MQTT::Message(3)
292
293 =head1 DISCLAIMER
294
295 This is B<not> official IBM code.  I work for IBM but I'm writing this
296 in my spare time (with permission) for fun.
297
298 =head1 AUTHOR
299
300 Mark Hindess <soft-cpan@temporalanomaly.com>
301
302 =head1 COPYRIGHT AND LICENSE
303
304 This software is copyright (c) 2014 by Mark Hindess.
305
306 This is free software; you can redistribute it and/or modify it under
307 the same terms as the Perl 5 programming language system itself.
308
309 =cut
310