5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
20 use Data::Dump qw(dump);
29 mkdir $log_dir unless -d $log_dir;
31 open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime()));
41 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
42 my @columns = split(/\n/, <<__COLUMNS__);
46 0x0c Sensor_temperature
47 0x0d Power_source_voltage
50 0x18 Sensor_operating_mode
54 my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
55 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
57 my $sql_placeholders = ',?' x ($#columns+1);
58 $sql_placeholders =~ s/^,//;
60 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
65 my $host = 'localhost';
69 my $keep_alive_timer = 120;
70 GetOptions('help|?' => \$help,
72 'verbose+' => \$verbose,
76 'one|1' => sub { $count = 1 },
77 'client_id|client-id|C=s' => \$client_id,
78 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
79 pod2usage(1) if ($help);
80 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
81 #pod2usage(2) unless (@ARGV); # need a topic
82 push @ARGV, '#' unless (@ARGV);
85 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
87 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
88 Timeout => $keep_alive_timer,
89 ) or die "Socket connect failed: $!\n";
94 my $got_ping_response = 1;
95 my @connect = ( message_type => MQTT_CONNECT,
96 keep_alive_timer => $keep_alive_timer );
97 push @connect, client_id => $client_id if (defined $client_id);
98 send_message($socket, @connect);
99 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
100 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
101 send_message($socket, message_type => MQTT_SUBSCRIBE,
102 message_id => $mid++,
103 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
104 $msg = read_message($socket, $buf) or die "No SubAck\n";
105 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
108 $msg = read_message($socket, $buf);
111 my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t)));
112 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
113 if ($msg->message_type == MQTT_PUBLISH) {
115 print $msg->topic, " ", $msg->message, "\n";
117 print $msg->string, "\n";
120 my $topic = $msg->topic;
121 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
122 my $dir = $topic; # leave imei in dir
123 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
126 mkdir "$queue" if ( ! -e "$queue" );
127 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
128 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
130 my $function_code = unpack('C',substr($msg->message,2,1));
132 mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date";
133 write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message;
135 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
136 my $hash = protocol_decode( $up_down, $msg->message );
137 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
143 # send pending messages on any connection
144 my @all_pending = sort glob "$queue/$dir/.pending/*";
145 if ( my $pending = shift @all_pending ) {
146 my $raw = read_file $pending;
147 my $pending_function_code = unpack('C',substr($msg->message,2,1));
149 $topic =~ s/up$/down/;
151 send_message($socket,
152 message_type => MQTT_PUBLISH,
153 retain => 0, #$retain,
156 $pending =~ s{$queue/$dir/.pending/}{};
157 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
161 if (defined $count && --$count == 0) {
164 } elsif ($msg->message_type == MQTT_PINGRESP) {
165 $got_ping_response = 1;
166 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
168 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
171 if (Time::HiRes::time > $next_ping) {
172 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
173 send_message($socket, message_type => MQTT_PINGREQ);
179 my $msg = Net::MQTT::Message->new(@_);
180 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
182 syswrite $socket, $msg, length $msg;
183 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
184 $next_ping = Time::HiRes::time + $keep_alive_timer;
189 my $select = IO::Select->new($socket);
190 my $timeout = $next_ping - Time::HiRes::time;
192 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
193 return $mqtt if (defined $mqtt);
194 $select->can_read($timeout) || return;
195 $timeout = $next_ping - Time::HiRes::time;
196 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
198 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
202 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
204 } while ($timeout > 0);
216 net-mqtt-sub - Perl script for subscribing to an MQTT topic
224 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
228 This script subscribes to one or more MQTT topics and prints any
229 messages that it receives to stdout.
237 Print a brief help message.
241 Print the manual page.
245 The host running the MQTT service. The default is C<127.0.0.1>.
249 The port of the running MQTT service. The default is 1883.
253 The client id to use in the connect message. The default is
254 'NetMQTTpm' followed by the process id of the process.
258 Include more verbose output. Without this option the script only
259 outputs errors and received messages one per line in the form:
263 With one B<-verbose> options, publish messages are printed in a form
264 of a summary of the header fields and the payload in hex dump and text
267 With two B<-verbose> options, summaries are printed for all messages
270 With three B<-verbose> options, a hex dump of all data transmitted and
273 =item B<-keepalive NNN>
275 The keep alive timer value. Defaults to 120 seconds. For simplicity,
276 it is also currently used as the connection/subscription timeout.
280 Read the specificed number of MQTT messages and then exit. Default
283 =item B<-one> or B<-1>
285 Short for B<-count 1>. Read one message and exit.
291 Net::MQTT::Message(3)
295 This is B<not> official IBM code. I work for IBM but I'm writing this
296 in my spare time (with permission) for fun.
300 Mark Hindess <soft-cpan@temporalanomaly.com>
302 =head1 COPYRIGHT AND LICENSE
304 This software is copyright (c) 2014 by Mark Hindess.
306 This is free software; you can redistribute it and/or modify it under
307 the same terms as the Perl 5 programming language system itself.