5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
31 my $host = 'localhost';
35 my $keep_alive_timer = 120;
36 GetOptions('help|?' => \$help,
38 'verbose+' => \$verbose,
42 'one|1' => sub { $count = 1 },
43 'client_id|client-id|C=s' => \$client_id,
44 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
45 pod2usage(1) if ($help);
46 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
47 #pod2usage(2) unless (@ARGV); # need a topic
48 push @ARGV, '#' unless (@ARGV);
51 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
53 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
54 Timeout => $keep_alive_timer,
55 ) or die "Socket connect failed: $!\n";
60 my $got_ping_response = 1;
61 my @connect = ( message_type => MQTT_CONNECT,
62 keep_alive_timer => $keep_alive_timer );
63 push @connect, client_id => $client_id if (defined $client_id);
64 send_message($socket, @connect);
65 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
66 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
67 send_message($socket, message_type => MQTT_SUBSCRIBE,
69 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
70 $msg = read_message($socket, $buf) or die "No SubAck\n";
71 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
74 $msg = read_message($socket, $buf);
77 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
78 if ($msg->message_type == MQTT_PUBLISH) {
80 print $msg->topic, " ", $msg->message, "\n";
82 print $msg->string, "\n";
85 my $topic = $msg->topic;
86 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
87 my $dir = $topic; # leave imei in dir
88 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
91 mkdir "$queue" if ( ! -e "$queue" );
92 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
93 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
95 my $function_code = unpack('C',substr($msg->message,2,1));
97 write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message;
99 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
100 $topic =~ s/up$/down/;
102 my @all_pending = sort glob "$queue/$dir/.pending/*";
103 if ( my $pending = shift @all_pending ) {
104 my $raw = read_file $pending;
105 my $pending_function_code = unpack('C',substr($msg->message,2,1));
107 send_message($socket,
108 message_type => MQTT_PUBLISH,
109 retain => 0, #$retain,
112 $pending =~ s{$queue/$dir/.pending/}{};
113 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
117 if (defined $count && --$count == 0) {
120 } elsif ($msg->message_type == MQTT_PINGRESP) {
121 $got_ping_response = 1;
122 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
124 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
127 if (Time::HiRes::time > $next_ping) {
128 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
129 send_message($socket, message_type => MQTT_PINGREQ);
135 my $msg = Net::MQTT::Message->new(@_);
136 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
138 syswrite $socket, $msg, length $msg;
139 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
140 $next_ping = Time::HiRes::time + $keep_alive_timer;
145 my $select = IO::Select->new($socket);
146 my $timeout = $next_ping - Time::HiRes::time;
148 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
149 return $mqtt if (defined $mqtt);
150 $select->can_read($timeout) || return;
151 $timeout = $next_ping - Time::HiRes::time;
152 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
154 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
158 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
160 } while ($timeout > 0);
172 net-mqtt-sub - Perl script for subscribing to an MQTT topic
180 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
184 This script subscribes to one or more MQTT topics and prints any
185 messages that it receives to stdout.
193 Print a brief help message.
197 Print the manual page.
201 The host running the MQTT service. The default is C<127.0.0.1>.
205 The port of the running MQTT service. The default is 1883.
209 The client id to use in the connect message. The default is
210 'NetMQTTpm' followed by the process id of the process.
214 Include more verbose output. Without this option the script only
215 outputs errors and received messages one per line in the form:
219 With one B<-verbose> options, publish messages are printed in a form
220 of a summary of the header fields and the payload in hex dump and text
223 With two B<-verbose> options, summaries are printed for all messages
226 With three B<-verbose> options, a hex dump of all data transmitted and
229 =item B<-keepalive NNN>
231 The keep alive timer value. Defaults to 120 seconds. For simplicity,
232 it is also currently used as the connection/subscription timeout.
236 Read the specificed number of MQTT messages and then exit. Default
239 =item B<-one> or B<-1>
241 Short for B<-count 1>. Read one message and exit.
247 Net::MQTT::Message(3)
251 This is B<not> official IBM code. I work for IBM but I'm writing this
252 in my spare time (with permission) for fun.
256 Mark Hindess <soft-cpan@temporalanomaly.com>
258 =head1 COPYRIGHT AND LICENSE
260 This software is copyright (c) 2014 by Mark Hindess.
262 This is free software; you can redistribute it and/or modify it under
263 the same terms as the Perl 5 programming language system itself.