cleanup fancy utf-8 dash
[zc] / zc-mqtt
1 #!/usr/bin/perl
2 use strict;
3 use warnings;
4
5 # based on net-mqtt-sub example from Net::MQTT
6 #
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
8
9 use strict;
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
12 use IO::Select;
13 use IO::Socket::INET;
14 use Time::HiRes;
15 use Getopt::Long;
16 use Pod::Usage;
17 use POSIX qw(strftime);
18
19 use lib '.';
20 use Protocol;
21
22 $|=1; # flush STDOUT
23
24 my $help;
25 my $man;
26 my $verbose = 2;
27 my $host = 'mqtt.zc-sensor.com';
28 my $port = 1883;
29 my $count;
30 my $client_id;
31 my $keep_alive_timer = 120;
32 GetOptions('help|?' => \$help,
33            'man' => \$man,
34            'verbose+' => \$verbose,
35            'host=s' => \$host,
36            'port=i' => \$port,
37            'count=i' => \$count,
38            'one|1' => sub { $count = 1 },
39            'client_id|client-id|C=s' => \$client_id,
40            'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
41 pod2usage(1) if ($help);
42 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
43 #pod2usage(2) unless (@ARGV); # need a topic
44 push @ARGV, '#' unless (@ARGV);
45
46 open_again:
47 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
48 my $socket =
49   IO::Socket::INET->new(PeerAddr => $host.':'.$port,
50                         Timeout => $keep_alive_timer,
51                        ) or die "Socket connect failed: $!\n";
52
53 my $buf = '';
54 my $mid = 1;
55 my $next_ping;
56 my $got_ping_response = 1;
57 my @connect = ( message_type => MQTT_CONNECT,
58                 keep_alive_timer => $keep_alive_timer );
59 push @connect, client_id => $client_id if (defined $client_id);
60 send_message($socket, @connect);
61 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
62 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
63 send_message($socket, message_type => MQTT_SUBSCRIBE,
64              message_id => $mid++,
65              topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
66 $msg = read_message($socket, $buf) or die "No SubAck\n";
67 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
68
69 while (1) {
70   $msg = read_message($socket, $buf);
71   if ($msg) {
72     print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP;
73     if ($msg->message_type == MQTT_PUBLISH) {
74       if ($verbose == 0) {
75         print $msg->topic, " ", $msg->message, "\n";
76       } else {
77         print $msg->string, "\n";
78       }
79
80 #      if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
81       if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
82         #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3";
83         my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" );
84         my $topic = $msg->topic;
85         $topic =~ s/up$/down/;
86         send_message($socket,
87                 message_type => MQTT_PUBLISH,
88                 retain => 0, #$retain,
89                 topic => $topic,
90                 message => $raw);
91       }
92
93       if (defined $count && --$count == 0) {
94         exit;
95       }
96     } elsif ($msg->message_type == MQTT_PINGRESP) {
97       $got_ping_response = 1;
98       print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
99     } else {
100       print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101     }
102   }
103   if (Time::HiRes::time > $next_ping) {
104     die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
105     send_message($socket, message_type => MQTT_PINGREQ);
106   }
107 }
108
109 sub send_message {
110   my $socket = shift;
111   my $msg = Net::MQTT::Message->new(@_);
112   print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
113   $msg = $msg->bytes;
114   syswrite $socket, $msg, length $msg;
115   print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
116   $next_ping = Time::HiRes::time + $keep_alive_timer;
117 }
118
119 sub read_message {
120   my $socket = shift;
121   my $select = IO::Select->new($socket);
122   my $timeout = $next_ping - Time::HiRes::time;
123   do {
124     my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
125     return $mqtt if (defined $mqtt);
126     $select->can_read($timeout) || return;
127     $timeout = $next_ping - Time::HiRes::time;
128     my $bytes = sysread $socket, $_[0], 2048, length $_[0];
129     unless ($bytes) {
130       warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
131       # FIXME reopen
132       goto open_again;
133     }
134     print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
135       if ($verbose >= 3);
136   } while ($timeout > 0);
137   return;
138 }
139
140 __END__
141
142 =pod
143
144 =encoding UTF-8
145
146 =head1 NAME
147
148 net-mqtt-sub - Perl script for subscribing to an MQTT topic
149
150 =head1 VERSION
151
152 version 1.143260
153
154 =head1 SYNOPSIS
155
156   net-mqtt-sub [options] topic1 [topic2] [topic3] ...
157
158 =head1 DESCRIPTION
159
160 This script subscribes to one or more MQTT topics and prints any
161 messages that it receives to stdout.
162
163 =head1 OPTIONS
164
165 =over
166
167 =item B<-help>
168
169 Print a brief help message.
170
171 =item B<-man>
172
173 Print the manual page.
174
175 =item B<-host>
176
177 The host running the MQTT service.  The default is C<127.0.0.1>.
178
179 =item B<-port>
180
181 The port of the running MQTT service.  The default is 1883.
182
183 =item B<-client-id>
184
185 The client id to use in the connect message.  The default is
186 'NetMQTTpm' followed by the process id of the process.
187
188 =item B<-verbose>
189
190 Include more verbose output.  Without this option the script only
191 outputs errors and received messages one per line in the form:
192
193   topic message
194
195 With one B<-verbose> options, publish messages are printed in a form
196 of a summary of the header fields and the payload in hex dump and text
197 form.
198
199 With two B<-verbose> options, summaries are printed for all messages
200 sent and received.
201
202 With three B<-verbose> options, a hex dump of all data transmitted and
203 received is printed.
204
205 =item B<-keepalive NNN>
206
207 The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
208 it is also currently used as the connection/subscription timeout.
209
210 =item B<-count NNN>
211
212 Read the specificed number of MQTT messages and then exit.  Default
213 is 0 - read forever.
214
215 =item B<-one> or B<-1>
216
217 Short for B<-count 1>.  Read one message and exit.
218
219 =back
220
221 =head1 SEE ALSO
222
223 Net::MQTT::Message(3)
224
225 =head1 DISCLAIMER
226
227 This is B<not> official IBM code.  I work for IBM but I'm writing this
228 in my spare time (with permission) for fun.
229
230 =head1 AUTHOR
231
232 Mark Hindess <soft-cpan@temporalanomaly.com>
233
234 =head1 COPYRIGHT AND LICENSE
235
236 This software is copyright (c) 2014 by Mark Hindess.
237
238 This is free software; you can redistribute it and/or modify it under
239 the same terms as the Perl 5 programming language system itself.
240
241 =cut
242