5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
31 my $host = 'localhost';
35 my $keep_alive_timer = 120;
36 GetOptions('help|?' => \$help,
38 'verbose+' => \$verbose,
42 'one|1' => sub { $count = 1 },
43 'client_id|client-id|C=s' => \$client_id,
44 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
45 pod2usage(1) if ($help);
46 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
47 #pod2usage(2) unless (@ARGV); # need a topic
48 push @ARGV, '#' unless (@ARGV);
51 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
53 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
54 Timeout => $keep_alive_timer,
55 ) or die "Socket connect failed: $!\n";
60 my $got_ping_response = 1;
61 my @connect = ( message_type => MQTT_CONNECT,
62 keep_alive_timer => $keep_alive_timer );
63 push @connect, client_id => $client_id if (defined $client_id);
64 send_message($socket, @connect);
65 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
66 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
67 send_message($socket, message_type => MQTT_SUBSCRIBE,
69 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
70 $msg = read_message($socket, $buf) or die "No SubAck\n";
71 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
74 $msg = read_message($socket, $buf);
77 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
78 if ($msg->message_type == MQTT_PUBLISH) {
80 print $msg->topic, " ", $msg->message, "\n";
82 print $msg->string, "\n";
85 my $topic = $msg->topic;
87 $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
89 mkdir "$queue" if ( ! -e "$queue" );
90 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
91 mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
93 my $function_code = unpack('C',substr($msg->message,2,1));
95 write_file "$queue/$dir/up/$t.$function_code", $msg->message;
97 mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
98 mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
100 # if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
101 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
102 $topic =~ s/up$/down/;
104 my @all_pending = sort glob "$queue/$dir/down/*";
105 if ( my $pending = shift @all_pending ) {
106 my $raw = read_file $pending;
108 send_message($socket,
109 message_type => MQTT_PUBLISH,
110 retain => 0, #$retain,
113 $pending =~ s{$queue/$dir/down/}{};
114 rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
118 if (defined $count && --$count == 0) {
121 } elsif ($msg->message_type == MQTT_PINGRESP) {
122 $got_ping_response = 1;
123 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
125 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
128 if (Time::HiRes::time > $next_ping) {
129 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
130 send_message($socket, message_type => MQTT_PINGREQ);
136 my $msg = Net::MQTT::Message->new(@_);
137 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
139 syswrite $socket, $msg, length $msg;
140 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
141 $next_ping = Time::HiRes::time + $keep_alive_timer;
146 my $select = IO::Select->new($socket);
147 my $timeout = $next_ping - Time::HiRes::time;
149 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
150 return $mqtt if (defined $mqtt);
151 $select->can_read($timeout) || return;
152 $timeout = $next_ping - Time::HiRes::time;
153 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
155 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
159 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
161 } while ($timeout > 0);
173 net-mqtt-sub - Perl script for subscribing to an MQTT topic
181 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
185 This script subscribes to one or more MQTT topics and prints any
186 messages that it receives to stdout.
194 Print a brief help message.
198 Print the manual page.
202 The host running the MQTT service. The default is C<127.0.0.1>.
206 The port of the running MQTT service. The default is 1883.
210 The client id to use in the connect message. The default is
211 'NetMQTTpm' followed by the process id of the process.
215 Include more verbose output. Without this option the script only
216 outputs errors and received messages one per line in the form:
220 With one B<-verbose> options, publish messages are printed in a form
221 of a summary of the header fields and the payload in hex dump and text
224 With two B<-verbose> options, summaries are printed for all messages
227 With three B<-verbose> options, a hex dump of all data transmitted and
230 =item B<-keepalive NNN>
232 The keep alive timer value. Defaults to 120 seconds. For simplicity,
233 it is also currently used as the connection/subscription timeout.
237 Read the specificed number of MQTT messages and then exit. Default
240 =item B<-one> or B<-1>
242 Short for B<-count 1>. Read one message and exit.
248 Net::MQTT::Message(3)
252 This is B<not> official IBM code. I work for IBM but I'm writing this
253 in my spare time (with permission) for fun.
257 Mark Hindess <soft-cpan@temporalanomaly.com>
259 =head1 COPYRIGHT AND LICENSE
261 This software is copyright (c) 2014 by Mark Hindess.
263 This is free software; you can redistribute it and/or modify it under
264 the same terms as the Perl 5 programming language system itself.