#!/usr/bin/perl use strict; use warnings; # based on net-mqtt-sub example from Net::MQTT # # 2020-08-15 Dobrica Pavlinusic use strict; use Net::MQTT::Constants; use Net::MQTT::Message; use IO::Select; use IO::Socket::INET; use Time::HiRes; use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); use File::Slurp; use autodie; use lib '.'; use Protocol; my $queue = "queue"; $|=1; # flush STDOUT my $help; my $man; my $verbose = 2; my $host = 'localhost'; my $port = 1883; my $count; my $client_id; my $keep_alive_timer = 120; GetOptions('help|?' => \$help, 'man' => \$man, 'verbose+' => \$verbose, 'host=s' => \$host, 'port=i' => \$port, 'count=i' => \$count, 'one|1' => sub { $count = 1 }, 'client_id|client-id|C=s' => \$client_id, 'keepalive=i' => \$keep_alive_timer) or pod2usage(2); pod2usage(1) if ($help); pod2usage(-exitstatus => 0, -verbose => 2) if $man; #pod2usage(2) unless (@ARGV); # need a topic push @ARGV, '#' unless (@ARGV); open_again: print "Open $host:port keep_alive_timer: $keep_alive_timer\n"; my $socket = IO::Socket::INET->new(PeerAddr => $host.':'.$port, Timeout => $keep_alive_timer, ) or die "Socket connect failed: $!\n"; my $buf = ''; my $mid = 1; my $next_ping; my $got_ping_response = 1; my @connect = ( message_type => MQTT_CONNECT, keep_alive_timer => $keep_alive_timer ); push @connect, client_id => $client_id if (defined $client_id); send_message($socket, @connect); my $msg = read_message($socket, $buf) or die "No ConnAck\n"; print 'Received: ', $msg->string, "\n" if ($verbose >= 2); send_message($socket, message_type => MQTT_SUBSCRIBE, message_id => $mid++, topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]); $msg = read_message($socket, $buf) or die "No SubAck\n"; print 'Received: ', $msg->string, "\n" if ($verbose >= 2); while (1) { $msg = read_message($socket, $buf); if ($msg) { my $t = time(); print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { print $msg->topic, " ", $msg->message, "\n"; } else { print $msg->string, "\n"; } my $topic = $msg->topic; my $dir = $topic; $dir =~ s{\w+/\w+/(\w+)/\w+}{$1}; mkdir "$queue" if ( ! -e "$queue" ); mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); mkdir "$queue/$dir/up" if ( ! -e "$queue/$dir/up" ); write_file "$queue/$dir/up/$t", $msg->string; mkdir "$queue/$dir/down" if ( ! -e "$queue/$dir/down" ); mkdir "$queue/$dir/down/.done" if ( ! -e "$queue/$dir/down/.done" ); # if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat $topic =~ s/up$/down/; my @all_pending = sort glob "$queue/$dir/down/*"; if ( my $pending = shift @all_pending ) { my $raw = read_file $pending; send_message($socket, message_type => MQTT_PUBLISH, retain => 0, #$retain, topic => $topic, message => $raw); rename "$queue/$dir/down/$pending", "$queue/$dir/down/.done/$pending"; } } if (defined $count && --$count == 0) { exit; } } elsif ($msg->message_type == MQTT_PINGRESP) { $got_ping_response = 1; print 'Received: ', $msg->string, "\n" if ($verbose >= 3); } else { print 'Received: ', $msg->string, "\n" if ($verbose >= 2); } } if (Time::HiRes::time > $next_ping) { die "Ping Response timeout. Exiting\n" unless ($got_ping_response); send_message($socket, message_type => MQTT_PINGREQ); } } sub send_message { my $socket = shift; my $msg = Net::MQTT::Message->new(@_); print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ); $msg = $msg->bytes; syswrite $socket, $msg, length $msg; print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3); $next_ping = Time::HiRes::time + $keep_alive_timer; } sub read_message { my $socket = shift; my $select = IO::Select->new($socket); my $timeout = $next_ping - Time::HiRes::time; do { my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1); return $mqtt if (defined $mqtt); $select->can_read($timeout) || return; $timeout = $next_ping - Time::HiRes::time; my $bytes = sysread $socket, $_[0], 2048, length $_[0]; unless ($bytes) { warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n"; # FIXME reopen goto open_again; } print "Receive buffer: ", dump_string($_[0], ' '), "\n\n" if ($verbose >= 3); } while ($timeout > 0); return; } __END__ =pod =encoding UTF-8 =head1 NAME net-mqtt-sub - Perl script for subscribing to an MQTT topic =head1 VERSION version 1.143260 =head1 SYNOPSIS net-mqtt-sub [options] topic1 [topic2] [topic3] ... =head1 DESCRIPTION This script subscribes to one or more MQTT topics and prints any messages that it receives to stdout. =head1 OPTIONS =over =item B<-help> Print a brief help message. =item B<-man> Print the manual page. =item B<-host> The host running the MQTT service. The default is C<127.0.0.1>. =item B<-port> The port of the running MQTT service. The default is 1883. =item B<-client-id> The client id to use in the connect message. The default is 'NetMQTTpm' followed by the process id of the process. =item B<-verbose> Include more verbose output. Without this option the script only outputs errors and received messages one per line in the form: topic message With one B<-verbose> options, publish messages are printed in a form of a summary of the header fields and the payload in hex dump and text form. With two B<-verbose> options, summaries are printed for all messages sent and received. With three B<-verbose> options, a hex dump of all data transmitted and received is printed. =item B<-keepalive NNN> The keep alive timer value. Defaults to 120 seconds. For simplicity, it is also currently used as the connection/subscription timeout. =item B<-count NNN> Read the specificed number of MQTT messages and then exit. Default is 0 - read forever. =item B<-one> or B<-1> Short for B<-count 1>. Read one message and exit. =back =head1 SEE ALSO Net::MQTT::Message(3) =head1 DISCLAIMER This is B official IBM code. I work for IBM but I'm writing this in my spare time (with permission) for fun. =head1 AUTHOR Mark Hindess =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2014 by Mark Hindess. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut