update offsets
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18 use File::Slurp;
19 use DBD::Pg;
20 use Data::Dump qw(dump);
21 use autodie;
22
23 BEGIN {
24         my $cwd = $0;
25         $cwd =~ s{/[^/]+$}{};
26         chdir $cwd;
27
28         my $log_dir = 'log';
29         mkdir $log_dir unless -d $log_dir;
30
31         open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
32         select($log);
33         $|=1;
34 }
35
36 use lib '.';
37 use Protocol;
38
39 my $queue = "queue";
40
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
43 0x01    PN
44 0x03    X_axis_angle
45 0x04    Y_axis_angle
46 0x0c    Sensor_temperature
47 0x0d    Power_source_voltage
48 0x11    Arming_disarming
49 0x17    Signal_strength
50 0x18    Sensor_operating_mode
51 0x19    Alarm_axis
52 __COLUMNS__
53
54 my $cols = join(',',  map {      (split(/\t/,$_))[1]   } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
56
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
59
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
61
62 my $help;
63 my $man;
64 my $verbose = 2;
65 my $host = 'localhost';
66 my $port = 1883;
67 my $count;
68 my $client_id;
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
71            'man' => \$man,
72            'verbose+' => \$verbose,
73            'host=s' => \$host,
74            'port=i' => \$port,
75            'count=i' => \$count,
76            'one|1' => sub { $count = 1 },
77            'client_id|client-id|C=s' => \$client_id,
78            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
83
84 open_again:
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
86 my $socket =
87   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88                         Timeout => $keep_alive_timer,
89                        ) or die "Socket connect failed: $!\n";
90
91 my $buf = '';
92 my $mid = 1;
93 my $next_ping;
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96                 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102              message_id => $mid++,
103              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
106
107 while (1) {
108   $msg = read_message($socket, $buf);
109   if ($msg) {
110     my $t = time();
111     my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113     if ($msg->message_type == MQTT_PUBLISH) {
114       if ($verbose == 0) {
115         print $msg->topic, " ", $msg->message, "\n";
116       } else {
117         print $msg->string, "\n";
118       }
119
120         my $topic = $msg->topic;
121         # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
122         my $dir = $topic; # leave imei in dir
123         $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
124         my $up_down = $2;
125         
126         mkdir "$queue" if ( ! -e "$queue" );
127         mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
128         mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
129
130         my $function_code = unpack('C',substr($msg->message,2,1));
131
132         mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
133         write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
134
135         if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
136                 $topic =~ s/up$/down/;
137
138                 my @all_pending = sort glob "$queue/$dir/.pending/*";
139                 if ( my $pending = shift @all_pending ) {
140                         my $raw = read_file $pending;
141                         my $pending_function_code = unpack('C',substr($msg->message,2,1));
142
143                         send_message($socket,
144                                 message_type => MQTT_PUBLISH,
145                                 retain => 0, #$retain,
146                                 topic => $topic,
147                                 message => $raw);
148                         $pending =~ s{$queue/$dir/.pending/}{};
149                         rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
150                 }
151
152                 my $hash = protocol_decode( $up_down, $msg->message );
153                 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
154
155
156       }
157
158       if (defined $count && --$count == 0) {
159         exit;
160       }
161     } elsif ($msg->message_type == MQTT_PINGRESP) {
162       $got_ping_response = 1;
163       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
164     } else {
165       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
166     }
167   }
168   if (Time::HiRes::time > $next_ping) {
169     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
170     send_message($socket, message_type => MQTT_PINGREQ);
171   }
172 }
173
174 sub send_message {
175   my $socket = shift;
176   my $msg = Net::MQTT::Message->new(@_);
177   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
178   $msg = $msg->bytes;
179   syswrite $socket, $msg, length $msg;
180   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
181   $next_ping = Time::HiRes::time + $keep_alive_timer;
182 }
183
184 sub read_message {
185   my $socket = shift;
186   my $select = IO::Select->new($socket);
187   my $timeout = $next_ping - Time::HiRes::time;
188   do {
189     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
190     return $mqtt if (defined $mqtt);
191     $select->can_read($timeout) || return;
192     $timeout = $next_ping - Time::HiRes::time;
193     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
194     unless ($bytes) {
195       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
196       # FIXME reopen
197       goto open_again;
198     }
199     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
200       if ($verbose >= 3);
201   } while ($timeout > 0);
202   return;
203 }
204
205 __END__
206
207 =pod
208
209 =encoding UTF-8
210
211 =head1 NAME
212
213 net-mqtt-sub - Perl script for subscribing to an MQTT topic
214
215 =head1 VERSION
216
217 version 1.143260
218
219 =head1 SYNOPSIS
220
221   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
222
223 =head1 DESCRIPTION
224
225 This script subscribes to one or more MQTT topics and prints any
226 messages that it receives to stdout.
227
228 =head1 OPTIONS
229
230 =over
231
232 =item B<-help>
233
234 Print a brief help message.
235
236 =item B<-man>
237
238 Print the manual page.
239
240 =item B<-host>
241
242 The host running the MQTT service.  The default is C<127.0.0.1>.
243
244 =item B<-port>
245
246 The port of the running MQTT service.  The default is 1883.
247
248 =item B<-client-id>
249
250 The client id to use in the connect message.  The default is
251 'NetMQTTpm' followed by the process id of the process.
252
253 =item B<-verbose>
254
255 Include more verbose output.  Without this option the script only
256 outputs errors and received messages one per line in the form:
257
258   topic message
259
260 With one B<-verbose> options, publish messages are printed in a form
261 of a summary of the header fields and the payload in hex dump and text
262 form.
263
264 With two B<-verbose> options, summaries are printed for all messages
265 sent and received.
266
267 With three B<-verbose> options, a hex dump of all data transmitted and
268 received is printed.
269
270 =item B<-keepalive NNN>
271
272 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
273 it is also currently used as the connection/subscription timeout.
274
275 =item B<-count NNN>
276
277 Read the specificed number of MQTT messages and then exit.  Default
278 is 0 - read forever.
279
280 =item B<-one> or B<-1>
281
282 Short for B<-count 1>.  Read one message and exit.
283
284 =back
285
286 =head1 SEE ALSO
287
288 Net::MQTT::Message(3)
289
290 =head1 DISCLAIMER
291
292 This is B<not> official IBM code.  I work for IBM but I'm writing this
293 in my spare time (with permission) for fun.
294
295 =head1 AUTHOR
296
297 Mark Hindess <soft-cpan@temporalanomaly.com>
298
299 =head1 COPYRIGHT AND LICENSE
300
301 This software is copyright (c) 2014 by Mark Hindess.
302
303 This is free software; you can redistribute it and/or modify it under
304 the same terms as the Perl 5 programming language system itself.
305
306 =cut
307