level 177
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18 use File::Slurp;
19 use DBD::Pg;
20 use Data::Dump qw(dump);
21 use autodie;
22
23 BEGIN {
24         my $cwd = $0;
25         $cwd =~ s{/[^/]+$}{};
26         chdir $cwd;
27
28         my $log_dir = 'log';
29         mkdir $log_dir unless -d $log_dir;
30
31         open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
32         select($log);
33         $|=1;
34 }
35
36 use lib '.';
37 use Protocol;
38
39 my $queue = "queue";
40
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
43 0x01    PN
44 0x03    X_axis_angle
45 0x04    Y_axis_angle
46 0x0c    Sensor_temperature
47 0x0d    Power_source_voltage
48 0x11    Arming_disarming
49 0x17    Signal_strength
50 0x18    Sensor_operating_mode
51 0x19    Alarm_axis
52 __COLUMNS__
53
54 my $cols = join(',',  map {      (split(/\t/,$_))[1]   } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
56
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
59
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
61
62 my $help;
63 my $man;
64 my $verbose = 2;
65 my $host = 'localhost';
66 my $port = 1883;
67 my $count;
68 my $client_id;
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
71            'man' => \$man,
72            'verbose+' => \$verbose,
73            'host=s' => \$host,
74            'port=i' => \$port,
75            'count=i' => \$count,
76            'one|1' => sub { $count = 1 },
77            'client_id|client-id|C=s' => \$client_id,
78            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
83
84 open_again:
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
86 my $socket =
87   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88                         Timeout => $keep_alive_timer,
89                        ) or die "Socket connect failed: $!\n";
90
91 my $buf = '';
92 my $mid = 1;
93 my $next_ping;
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96                 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102              message_id => $mid++,
103              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
106
107 while (1) {
108   $msg = read_message($socket, $buf);
109   if ($msg) {
110     my $t = time();
111     my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113     if ($msg->message_type == MQTT_PUBLISH) {
114       if ($verbose == 0) {
115         print $msg->topic, " ", $msg->message, "\n";
116       } else {
117         print $msg->string, "\n";
118       }
119
120       # skip retained
121       next if $msg->string =~ m{Publish/at-most-once,retain};
122
123         my $topic = $msg->topic;
124         # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
125         my $dir = $topic; # leave imei in dir
126         $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
127         my $up_down = $2;
128         
129         mkdir "$queue" if ( ! -e "$queue" );
130         mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
131         mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
132
133         my $function_code = unpack('C',substr($msg->message,2,1));
134
135         mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
136         write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
137
138         if ( $function_code == 7 || $function_code == 8 ) { #  7 = heartbeat, 8 = alarm
139                 my $hash = protocol_decode( $up_down, $msg->message );
140                 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
141
142
143       }
144
145
146         # send pending messages on any connection
147         my @all_pending = sort glob "$queue/$dir/.pending/*";
148         if ( my $pending = shift @all_pending ) {
149                 my $raw = read_file $pending;
150                 my $pending_function_code = unpack('C',substr($msg->message,2,1));
151
152                 $topic =~ s/up$/down/;
153
154                 send_message($socket,
155                         message_type => MQTT_PUBLISH,
156                         retain => 0, #$retain,
157                         topic => $topic,
158                         message => $raw);
159                 $pending =~ s{$queue/$dir/.pending/}{};
160                 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
161         }
162
163
164       if (defined $count && --$count == 0) {
165         exit;
166       }
167     } elsif ($msg->message_type == MQTT_PINGRESP) {
168       $got_ping_response = 1;
169       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
170     } else {
171       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
172     }
173   }
174   if (Time::HiRes::time > $next_ping) {
175     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
176     send_message($socket, message_type => MQTT_PINGREQ);
177   }
178 }
179
180 sub send_message {
181   my $socket = shift;
182   my $msg = Net::MQTT::Message->new(@_);
183   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
184   $msg = $msg->bytes;
185   syswrite $socket, $msg, length $msg;
186   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
187   $next_ping = Time::HiRes::time + $keep_alive_timer;
188 }
189
190 sub read_message {
191   my $socket = shift;
192   my $select = IO::Select->new($socket);
193   my $timeout = $next_ping - Time::HiRes::time;
194   do {
195     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
196     return $mqtt if (defined $mqtt);
197     $select->can_read($timeout) || return;
198     $timeout = $next_ping - Time::HiRes::time;
199     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
200     unless ($bytes) {
201       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
202       # FIXME reopen
203       goto open_again;
204     }
205     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
206       if ($verbose >= 3);
207   } while ($timeout > 0);
208   return;
209 }
210
211 __END__
212
213 =pod
214
215 =encoding UTF-8
216
217 =head1 NAME
218
219 net-mqtt-sub - Perl script for subscribing to an MQTT topic
220
221 =head1 VERSION
222
223 version 1.143260
224
225 =head1 SYNOPSIS
226
227   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
228
229 =head1 DESCRIPTION
230
231 This script subscribes to one or more MQTT topics and prints any
232 messages that it receives to stdout.
233
234 =head1 OPTIONS
235
236 =over
237
238 =item B<-help>
239
240 Print a brief help message.
241
242 =item B<-man>
243
244 Print the manual page.
245
246 =item B<-host>
247
248 The host running the MQTT service.  The default is C<127.0.0.1>.
249
250 =item B<-port>
251
252 The port of the running MQTT service.  The default is 1883.
253
254 =item B<-client-id>
255
256 The client id to use in the connect message.  The default is
257 'NetMQTTpm' followed by the process id of the process.
258
259 =item B<-verbose>
260
261 Include more verbose output.  Without this option the script only
262 outputs errors and received messages one per line in the form:
263
264   topic message
265
266 With one B<-verbose> options, publish messages are printed in a form
267 of a summary of the header fields and the payload in hex dump and text
268 form.
269
270 With two B<-verbose> options, summaries are printed for all messages
271 sent and received.
272
273 With three B<-verbose> options, a hex dump of all data transmitted and
274 received is printed.
275
276 =item B<-keepalive NNN>
277
278 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
279 it is also currently used as the connection/subscription timeout.
280
281 =item B<-count NNN>
282
283 Read the specificed number of MQTT messages and then exit.  Default
284 is 0 - read forever.
285
286 =item B<-one> or B<-1>
287
288 Short for B<-count 1>.  Read one message and exit.
289
290 =back
291
292 =head1 SEE ALSO
293
294 Net::MQTT::Message(3)
295
296 =head1 DISCLAIMER
297
298 This is B<not> official IBM code.  I work for IBM but I'm writing this
299 in my spare time (with permission) for fun.
300
301 =head1 AUTHOR
302
303 Mark Hindess <soft-cpan@temporalanomaly.com>
304
305 =head1 COPYRIGHT AND LICENSE
306
307 This software is copyright (c) 2014 by Mark Hindess.
308
309 This is free software; you can redistribute it and/or modify it under
310 the same terms as the Perl 5 programming language system itself.
311
312 =cut
313