5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
31 my $host = 'localhost';
35 my $keep_alive_timer = 120;
36 GetOptions('help|?' => \$help,
38 'verbose+' => \$verbose,
42 'one|1' => sub { $count = 1 },
43 'client_id|client-id|C=s' => \$client_id,
44 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
45 pod2usage(1) if ($help);
46 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
47 #pod2usage(2) unless (@ARGV); # need a topic
48 push @ARGV, '#' unless (@ARGV);
51 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
53 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
54 Timeout => $keep_alive_timer,
55 ) or die "Socket connect failed: $!\n";
60 my $got_ping_response = 1;
61 my @connect = ( message_type => MQTT_CONNECT,
62 keep_alive_timer => $keep_alive_timer );
63 push @connect, client_id => $client_id if (defined $client_id);
64 send_message($socket, @connect);
65 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
66 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
67 send_message($socket, message_type => MQTT_SUBSCRIBE,
69 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
70 $msg = read_message($socket, $buf) or die "No SubAck\n";
71 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
74 $msg = read_message($socket, $buf);
77 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
78 if ($msg->message_type == MQTT_PUBLISH) {
80 print $msg->topic, " ", $msg->message, "\n";
82 print $msg->string, "\n";
85 my $topic = $msg->topic;
87 $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
89 mkdir "$queue" if ( ! -e "$queue" );
90 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
91 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
93 my $function_code = unpack('C',substr($msg->message,2,1));
95 write_file "$queue/$dir/$t.up.$function_code", $msg->message;
97 # if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
98 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
99 $topic =~ s/up$/down/;
101 my @all_pending = sort glob "$queue/$dir/.pending/*";
102 if ( my $pending = shift @all_pending ) {
103 my $raw = read_file $pending;
104 my $pending_function_code = unpack('C',substr($msg->message,2,1));
106 send_message($socket,
107 message_type => MQTT_PUBLISH,
108 retain => 0, #$retain,
111 $pending =~ s{$queue/$dir/.pending/}{};
112 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.down.$function_code";
116 if (defined $count && --$count == 0) {
119 } elsif ($msg->message_type == MQTT_PINGRESP) {
120 $got_ping_response = 1;
121 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
123 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
126 if (Time::HiRes::time > $next_ping) {
127 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
128 send_message($socket, message_type => MQTT_PINGREQ);
134 my $msg = Net::MQTT::Message->new(@_);
135 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
137 syswrite $socket, $msg, length $msg;
138 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
139 $next_ping = Time::HiRes::time + $keep_alive_timer;
144 my $select = IO::Select->new($socket);
145 my $timeout = $next_ping - Time::HiRes::time;
147 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
148 return $mqtt if (defined $mqtt);
149 $select->can_read($timeout) || return;
150 $timeout = $next_ping - Time::HiRes::time;
151 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
153 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
157 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
159 } while ($timeout > 0);
171 net-mqtt-sub - Perl script for subscribing to an MQTT topic
179 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
183 This script subscribes to one or more MQTT topics and prints any
184 messages that it receives to stdout.
192 Print a brief help message.
196 Print the manual page.
200 The host running the MQTT service. The default is C<127.0.0.1>.
204 The port of the running MQTT service. The default is 1883.
208 The client id to use in the connect message. The default is
209 'NetMQTTpm' followed by the process id of the process.
213 Include more verbose output. Without this option the script only
214 outputs errors and received messages one per line in the form:
218 With one B<-verbose> options, publish messages are printed in a form
219 of a summary of the header fields and the payload in hex dump and text
222 With two B<-verbose> options, summaries are printed for all messages
225 With three B<-verbose> options, a hex dump of all data transmitted and
228 =item B<-keepalive NNN>
230 The keep alive timer value. Defaults to 120 seconds. For simplicity,
231 it is also currently used as the connection/subscription timeout.
235 Read the specificed number of MQTT messages and then exit. Default
238 =item B<-one> or B<-1>
240 Short for B<-count 1>. Read one message and exit.
246 Net::MQTT::Message(3)
250 This is B<not> official IBM code. I work for IBM but I'm writing this
251 in my spare time (with permission) for fun.
255 Mark Hindess <soft-cpan@temporalanomaly.com>
257 =head1 COPYRIGHT AND LICENSE
259 This software is copyright (c) 2014 by Mark Hindess.
261 This is free software; you can redistribute it and/or modify it under
262 the same terms as the Perl 5 programming language system itself.