5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
31 my $host = 'localhost';
35 my $keep_alive_timer = 120;
36 GetOptions('help|?' => \$help,
38 'verbose+' => \$verbose,
42 'one|1' => sub { $count = 1 },
43 'client_id|client-id|C=s' => \$client_id,
44 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
45 pod2usage(1) if ($help);
46 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
47 #pod2usage(2) unless (@ARGV); # need a topic
48 push @ARGV, '#' unless (@ARGV);
51 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
53 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
54 Timeout => $keep_alive_timer,
55 ) or die "Socket connect failed: $!\n";
60 my $got_ping_response = 1;
61 my @connect = ( message_type => MQTT_CONNECT,
62 keep_alive_timer => $keep_alive_timer );
63 push @connect, client_id => $client_id if (defined $client_id);
64 send_message($socket, @connect);
65 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
66 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
67 send_message($socket, message_type => MQTT_SUBSCRIBE,
69 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
70 $msg = read_message($socket, $buf) or die "No SubAck\n";
71 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
74 $msg = read_message($socket, $buf);
77 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
78 if ($msg->message_type == MQTT_PUBLISH) {
80 print $msg->topic, " ", $msg->message, "\n";
82 print $msg->string, "\n";
85 my $topic = $msg->topic;
87 $dir =~ s{\w+/\w+/(\w+)/\w+}{$1};
89 mkdir "$queue" if ( ! -e "$queue" );
90 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
91 mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" );
92 write_file "$queue/$dir/up/$t", $msg->string;
94 mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" );
95 mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" );
97 # if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) {
98 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
99 $topic =~ s/up$/down/;
101 my @all_pending = sort glob "$queue/$dir/down/*";
102 if ( my $pending = shift @all_pending ) {
103 my $raw = read_file $pending;
105 send_message($socket,
106 message_type => MQTT_PUBLISH,
107 retain => 0, #$retain,
110 rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending";
114 if (defined $count && --$count == 0) {
117 } elsif ($msg->message_type == MQTT_PINGRESP) {
118 $got_ping_response = 1;
119 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
121 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
124 if (Time::HiRes::time > $next_ping) {
125 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
126 send_message($socket, message_type => MQTT_PINGREQ);
132 my $msg = Net::MQTT::Message->new(@_);
133 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
135 syswrite $socket, $msg, length $msg;
136 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
137 $next_ping = Time::HiRes::time + $keep_alive_timer;
142 my $select = IO::Select->new($socket);
143 my $timeout = $next_ping - Time::HiRes::time;
145 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
146 return $mqtt if (defined $mqtt);
147 $select->can_read($timeout) || return;
148 $timeout = $next_ping - Time::HiRes::time;
149 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
151 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
155 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
157 } while ($timeout > 0);
169 net-mqtt-sub - Perl script for subscribing to an MQTT topic
177 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
181 This script subscribes to one or more MQTT topics and prints any
182 messages that it receives to stdout.
190 Print a brief help message.
194 Print the manual page.
198 The host running the MQTT service. The default is C<127.0.0.1>.
202 The port of the running MQTT service. The default is 1883.
206 The client id to use in the connect message. The default is
207 'NetMQTTpm' followed by the process id of the process.
211 Include more verbose output. Without this option the script only
212 outputs errors and received messages one per line in the form:
216 With one B<-verbose> options, publish messages are printed in a form
217 of a summary of the header fields and the payload in hex dump and text
220 With two B<-verbose> options, summaries are printed for all messages
223 With three B<-verbose> options, a hex dump of all data transmitted and
226 =item B<-keepalive NNN>
228 The keep alive timer value. Defaults to 120 seconds. For simplicity,
229 it is also currently used as the connection/subscription timeout.
233 Read the specificed number of MQTT messages and then exit. Default
236 =item B<-one> or B<-1>
238 Short for B<-count 1>. Read one message and exit.
244 Net::MQTT::Message(3)
248 This is B<not> official IBM code. I work for IBM but I'm writing this
249 in my spare time (with permission) for fun.
253 Mark Hindess <soft-cpan@temporalanomaly.com>
255 =head1 COPYRIGHT AND LICENSE
257 This software is copyright (c) 2014 by Mark Hindess.
259 This is free software; you can redistribute it and/or modify it under
260 the same terms as the Perl 5 programming language system itself.