5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
27 my $host = 'mqtt.zc-sensor.com';
31 my $keep_alive_timer = 120;
32 GetOptions('help|?' => \$help,
34 'verbose+' => \$verbose,
38 'one|1' => sub { $count = 1 },
39 'client_id|client-id|C=s' => \$client_id,
40 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
41 pod2usage(1) if ($help);
42 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
43 #pod2usage(2) unless (@ARGV); # need a topic
44 push @ARGV, '#' unless (@ARGV);
47 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
49 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
50 Timeout => $keep_alive_timer,
51 ) or die "Socket connect failed: $!\n";
56 my $got_ping_response = 1;
57 my @connect = ( message_type => MQTT_CONNECT,
58 keep_alive_timer => $keep_alive_timer );
59 push @connect, client_id => $client_id if (defined $client_id);
60 send_message($socket, @connect);
61 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
62 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
63 send_message($socket, message_type => MQTT_SUBSCRIBE,
65 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
66 $msg = read_message($socket, $buf) or die "No SubAck\n";
67 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
70 $msg = read_message($socket, $buf);
72 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP;
73 if ($msg->message_type == MQTT_PUBLISH) {
75 print $msg->topic, " ", $msg->message, "\n";
77 print $msg->string, "\n";
80 # if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
81 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
82 #my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3";
83 my $raw = read_parameter_frame( "\x00" => "\x04\xe8\x03\x00\x00", "\x21", "\x44", "\x12", "\x14", "\x1a\x21\x22\x23\x24\x33\x34\x35\x3b" );
84 my $topic = $msg->topic;
85 $topic =~ s/up$/down/;
87 message_type => MQTT_PUBLISH,
88 retain => 0, #$retain,
93 if (defined $count && --$count == 0) {
96 } elsif ($msg->message_type == MQTT_PINGRESP) {
97 $got_ping_response = 1;
98 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
103 if (Time::HiRes::time > $next_ping) {
104 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
105 send_message($socket, message_type => MQTT_PINGREQ);
111 my $msg = Net::MQTT::Message->new(@_);
112 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
114 syswrite $socket, $msg, length $msg;
115 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
116 $next_ping = Time::HiRes::time + $keep_alive_timer;
121 my $select = IO::Select->new($socket);
122 my $timeout = $next_ping - Time::HiRes::time;
124 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
125 return $mqtt if (defined $mqtt);
126 $select->can_read($timeout) || return;
127 $timeout = $next_ping - Time::HiRes::time;
128 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
130 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
134 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
136 } while ($timeout > 0);
148 net-mqtt-sub - Perl script for subscribing to an MQTT topic
156 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
160 This script subscribes to one or more MQTT topics and prints any
161 messages that it receives to stdout.
169 Print a brief help message.
173 Print the manual page.
177 The host running the MQTT service. The default is C<127.0.0.1>.
181 The port of the running MQTT service. The default is 1883.
185 The client id to use in the connect message. The default is
186 'NetMQTTpm' followed by the process id of the process.
190 Include more verbose output. Without this option the script only
191 outputs errors and received messages one per line in the form:
195 With one B<-verbose> options, publish messages are printed in a form
196 of a summary of the header fields and the payload in hex dump and text
199 With two B<-verbose> options, summaries are printed for all messages
202 With three B<-verbose> options, a hex dump of all data transmitted and
205 =item B<-keepalive NNN>
207 The keep alive timer value. Defaults to 120 seconds. For simplicity,
208 it is also currently used as the connection/subscription timeout.
212 Read the specificed number of MQTT messages and then exit. Default
215 =item B<-one> or B<-1>
217 Short for B<-count 1>. Read one message and exit.
223 Net::MQTT::Message(3)
227 This is B<not> official IBM code. I work for IBM but I'm writing this
228 in my spare time (with permission) for fun.
232 Mark Hindess <soft-cpan@temporalanomaly.com>
234 =head1 COPYRIGHT AND LICENSE
236 This software is copyright (c) 2014 by Mark Hindess.
238 This is free software; you can redistribute it and/or modify it under
239 the same terms as the Perl 5 programming language system itself.