5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
20 use Data::Dump qw(dump);
30 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
31 my @columns = split(/\n/, <<__COLUMNS__);
35 0x0c Sensor_temperature
36 0x0d Power_source_voltage
39 0x18 Sensor_operating_mode
43 my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
44 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
46 my $sql_placeholders = ',?' x ($#columns+1);
47 $sql_placeholders =~ s/^,//;
49 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
54 my $host = 'localhost';
58 my $keep_alive_timer = 120;
59 GetOptions('help|?' => \$help,
61 'verbose+' => \$verbose,
65 'one|1' => sub { $count = 1 },
66 'client_id|client-id|C=s' => \$client_id,
67 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
68 pod2usage(1) if ($help);
69 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
70 #pod2usage(2) unless (@ARGV); # need a topic
71 push @ARGV, '#' unless (@ARGV);
74 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
76 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
77 Timeout => $keep_alive_timer,
78 ) or die "Socket connect failed: $!\n";
83 my $got_ping_response = 1;
84 my @connect = ( message_type => MQTT_CONNECT,
85 keep_alive_timer => $keep_alive_timer );
86 push @connect, client_id => $client_id if (defined $client_id);
87 send_message($socket, @connect);
88 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
89 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
90 send_message($socket, message_type => MQTT_SUBSCRIBE,
92 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
93 $msg = read_message($socket, $buf) or die "No SubAck\n";
94 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
97 $msg = read_message($socket, $buf);
100 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
101 if ($msg->message_type == MQTT_PUBLISH) {
103 print $msg->topic, " ", $msg->message, "\n";
105 print $msg->string, "\n";
108 my $topic = $msg->topic;
109 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
110 my $dir = $topic; # leave imei in dir
111 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
114 mkdir "$queue" if ( ! -e "$queue" );
115 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
116 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
118 my $function_code = unpack('C',substr($msg->message,2,1));
120 write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message;
122 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
123 $topic =~ s/up$/down/;
125 my @all_pending = sort glob "$queue/$dir/.pending/*";
126 if ( my $pending = shift @all_pending ) {
127 my $raw = read_file $pending;
128 my $pending_function_code = unpack('C',substr($msg->message,2,1));
130 send_message($socket,
131 message_type => MQTT_PUBLISH,
132 retain => 0, #$retain,
135 $pending =~ s{$queue/$dir/.pending/}{};
136 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
139 my $hash = protocol_decode( $up_down, $msg->message );
140 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
145 if (defined $count && --$count == 0) {
148 } elsif ($msg->message_type == MQTT_PINGRESP) {
149 $got_ping_response = 1;
150 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
152 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
155 if (Time::HiRes::time > $next_ping) {
156 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
157 send_message($socket, message_type => MQTT_PINGREQ);
163 my $msg = Net::MQTT::Message->new(@_);
164 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
166 syswrite $socket, $msg, length $msg;
167 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
168 $next_ping = Time::HiRes::time + $keep_alive_timer;
173 my $select = IO::Select->new($socket);
174 my $timeout = $next_ping - Time::HiRes::time;
176 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
177 return $mqtt if (defined $mqtt);
178 $select->can_read($timeout) || return;
179 $timeout = $next_ping - Time::HiRes::time;
180 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
182 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
186 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
188 } while ($timeout > 0);
200 net-mqtt-sub - Perl script for subscribing to an MQTT topic
208 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
212 This script subscribes to one or more MQTT topics and prints any
213 messages that it receives to stdout.
221 Print a brief help message.
225 Print the manual page.
229 The host running the MQTT service. The default is C<127.0.0.1>.
233 The port of the running MQTT service. The default is 1883.
237 The client id to use in the connect message. The default is
238 'NetMQTTpm' followed by the process id of the process.
242 Include more verbose output. Without this option the script only
243 outputs errors and received messages one per line in the form:
247 With one B<-verbose> options, publish messages are printed in a form
248 of a summary of the header fields and the payload in hex dump and text
251 With two B<-verbose> options, summaries are printed for all messages
254 With three B<-verbose> options, a hex dump of all data transmitted and
257 =item B<-keepalive NNN>
259 The keep alive timer value. Defaults to 120 seconds. For simplicity,
260 it is also currently used as the connection/subscription timeout.
264 Read the specificed number of MQTT messages and then exit. Default
267 =item B<-one> or B<-1>
269 Short for B<-count 1>. Read one message and exit.
275 Net::MQTT::Message(3)
279 This is B<not> official IBM code. I work for IBM but I'm writing this
280 in my spare time (with permission) for fun.
284 Mark Hindess <soft-cpan@temporalanomaly.com>
286 =head1 COPYRIGHT AND LICENSE
288 This software is copyright (c) 2014 by Mark Hindess.
290 This is free software; you can redistribute it and/or modify it under
291 the same terms as the Perl 5 programming language system itself.