protocol_decode
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18 use File::Slurp;
19 use autodie;
20
21 use lib '.';
22 use Protocol;
23
24 my $queue = "queue";
25
26 $|=1; # flush STDOUT
27
28 my $help;
29 my $man;
30 my $verbose = 2;
31 my $host = 'localhost';
32 my $port = 1883;
33 my $count;
34 my $client_id;
35 my $keep_alive_timer = 120;
36 GetOptions('help|?' => \$help,
37            'man' => \$man,
38            'verbose+' => \$verbose,
39            'host=s' => \$host,
40            'port=i' => \$port,
41            'count=i' => \$count,
42            'one|1' => sub { $count = 1 },
43            'client_id|client-id|C=s' => \$client_id,
44            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
45 pod2usage(1) if ($help);
46 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
47 #pod2usage(2) unless (@ARGV); # need a topic
48 push @ARGV, '#' unless (@ARGV);
49
50 open_again:
51 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
52 my $socket =
53   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
54                         Timeout => $keep_alive_timer,
55                        ) or die "Socket connect failed: $!\n";
56
57 my $buf = '';
58 my $mid = 1;
59 my $next_ping;
60 my $got_ping_response = 1;
61 my @connect = ( message_type => MQTT_CONNECT,
62                 keep_alive_timer => $keep_alive_timer );
63 push @connect, client_id => $client_id if (defined $client_id);
64 send_message($socket, @connect);
65 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
66 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
67 send_message($socket, message_type => MQTT_SUBSCRIBE,
68              message_id => $mid++,
69              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
70 $msg = read_message($socket, $buf) or die "No SubAck\n";
71 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
72
73 while (1) {
74   $msg = read_message($socket, $buf);
75   if ($msg) {
76     my $t = time();
77     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
78     if ($msg->message_type == MQTT_PUBLISH) {
79       if ($verbose == 0) {
80         print $msg->topic, " ", $msg->message, "\n";
81       } else {
82         print $msg->string, "\n";
83       }
84
85         my $topic = $msg->topic;
86         my $dir = $topic;
87         $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
88         
89         mkdir "$queue" if ( ! -e "$queue" );
90         mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
91         mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
92
93         my $function_code = unpack('C',substr($msg->message,2,1));
94
95         write_file "$queue/$dir/up/$t.$function_code", $msg->message;
96
97         mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
98         mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
99
100 #       if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
101         if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
102                 $topic =~ s/up$/down/;
103
104                 my @all_pending = sort glob "$queue/$dir/down/*";
105                 if ( my $pending = shift @all_pending ) {
106                         my $raw = read_file $pending;
107
108                         send_message($socket,
109                                 message_type => MQTT_PUBLISH,
110                                 retain => 0, #$retain,
111                                 topic => $topic,
112                                 message => $raw);
113                         $pending =~ s{$queue/$dir/down/}{};
114                         rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
115                 }
116       }
117
118       if (defined $count && --$count == 0) {
119         exit;
120       }
121     } elsif ($msg->message_type == MQTT_PINGRESP) {
122       $got_ping_response = 1;
123       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
124     } else {
125       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
126     }
127   }
128   if (Time::HiRes::time > $next_ping) {
129     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
130     send_message($socket, message_type => MQTT_PINGREQ);
131   }
132 }
133
134 sub send_message {
135   my $socket = shift;
136   my $msg = Net::MQTT::Message->new(@_);
137   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
138   $msg = $msg->bytes;
139   syswrite $socket, $msg, length $msg;
140   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
141   $next_ping = Time::HiRes::time + $keep_alive_timer;
142 }
143
144 sub read_message {
145   my $socket = shift;
146   my $select = IO::Select->new($socket);
147   my $timeout = $next_ping - Time::HiRes::time;
148   do {
149     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
150     return $mqtt if (defined $mqtt);
151     $select->can_read($timeout) || return;
152     $timeout = $next_ping - Time::HiRes::time;
153     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
154     unless ($bytes) {
155       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
156       # FIXME reopen
157       goto open_again;
158     }
159     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
160       if ($verbose >= 3);
161   } while ($timeout > 0);
162   return;
163 }
164
165 __END__
166
167 =pod
168
169 =encoding UTF-8
170
171 =head1 NAME
172
173 net-mqtt-sub - Perl script for subscribing to an MQTT topic
174
175 =head1 VERSION
176
177 version 1.143260
178
179 =head1 SYNOPSIS
180
181   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
182
183 =head1 DESCRIPTION
184
185 This script subscribes to one or more MQTT topics and prints any
186 messages that it receives to stdout.
187
188 =head1 OPTIONS
189
190 =over
191
192 =item B<-help>
193
194 Print a brief help message.
195
196 =item B<-man>
197
198 Print the manual page.
199
200 =item B<-host>
201
202 The host running the MQTT service.  The default is C<127.0.0.1>.
203
204 =item B<-port>
205
206 The port of the running MQTT service.  The default is 1883.
207
208 =item B<-client-id>
209
210 The client id to use in the connect message.  The default is
211 'NetMQTTpm' followed by the process id of the process.
212
213 =item B<-verbose>
214
215 Include more verbose output.  Without this option the script only
216 outputs errors and received messages one per line in the form:
217
218   topic message
219
220 With one B<-verbose> options, publish messages are printed in a form
221 of a summary of the header fields and the payload in hex dump and text
222 form.
223
224 With two B<-verbose> options, summaries are printed for all messages
225 sent and received.
226
227 With three B<-verbose> options, a hex dump of all data transmitted and
228 received is printed.
229
230 =item B<-keepalive NNN>
231
232 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
233 it is also currently used as the connection/subscription timeout.
234
235 =item B<-count NNN>
236
237 Read the specificed number of MQTT messages and then exit.  Default
238 is 0 - read forever.
239
240 =item B<-one> or B<-1>
241
242 Short for B<-count 1>.  Read one message and exit.
243
244 =back
245
246 =head1 SEE ALSO
247
248 Net::MQTT::Message(3)
249
250 =head1 DISCLAIMER
251
252 This is B<not> official IBM code.  I work for IBM but I'm writing this
253 in my spare time (with permission) for fun.
254
255 =head1 AUTHOR
256
257 Mark Hindess <soft-cpan@temporalanomaly.com>
258
259 =head1 COPYRIGHT AND LICENSE
260
261 This software is copyright (c) 2014 by Mark Hindess.
262
263 This is free software; you can redistribute it and/or modify it under
264 the same terms as the Perl 5 programming language system itself.
265
266 =cut
267