5 # based on net-mqtt-sub example from Net::MQTT
7 # 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
10 use Net::MQTT::Constants;
11 use Net::MQTT::Message;
17 use POSIX qw(strftime);
20 use Data::Dump qw(dump);
28 open(my $log, '>', 'log.'.strftime("%Y-%m-%dT%H:%M:%S",localtime()));
38 my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 });
39 my @columns = split(/\n/, <<__COLUMNS__);
43 0x0c Sensor_temperature
44 0x0d Power_source_voltage
47 0x18 Sensor_operating_mode
51 my $cols = join(',', map { (split(/\t/,$_))[1] } @columns);
52 my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ;
54 my $sql_placeholders = ',?' x ($#columns+1);
55 $sql_placeholders =~ s/^,//;
57 my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)});
62 my $host = 'localhost';
66 my $keep_alive_timer = 120;
67 GetOptions('help|?' => \$help,
69 'verbose+' => \$verbose,
73 'one|1' => sub { $count = 1 },
74 'client_id|client-id|C=s' => \$client_id,
75 'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
76 pod2usage(1) if ($help);
77 pod2usage(-exitstatus => 0, -verbose => 2) if $man;
78 #pod2usage(2) unless (@ARGV); # need a topic
79 push @ARGV, '#' unless (@ARGV);
82 print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
84 IO::Socket::INET->new(PeerAddr => $host.':'.$port,
85 Timeout => $keep_alive_timer,
86 ) or die "Socket connect failed: $!\n";
91 my $got_ping_response = 1;
92 my @connect = ( message_type => MQTT_CONNECT,
93 keep_alive_timer => $keep_alive_timer );
94 push @connect, client_id => $client_id if (defined $client_id);
95 send_message($socket, @connect);
96 my $msg = read_message($socket, $buf) or die "No ConnAck\n";
97 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
98 send_message($socket, message_type => MQTT_SUBSCRIBE,
100 topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
101 $msg = read_message($socket, $buf) or die "No SubAck\n";
102 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
105 $msg = read_message($socket, $buf);
108 print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP;
109 if ($msg->message_type == MQTT_PUBLISH) {
111 print $msg->topic, " ", $msg->message, "\n";
113 print $msg->string, "\n";
116 my $topic = $msg->topic;
117 # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up
118 my $dir = $topic; # leave imei in dir
119 $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1};
122 mkdir "$queue" if ( ! -e "$queue" );
123 mkdir "$queue/$dir" if ( ! -e "$queue/$dir" );
124 mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" );
126 my $function_code = unpack('C',substr($msg->message,2,1));
128 write_file "$queue/$dir/$t.$up_down.$function_code", $msg->message;
130 if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat
131 $topic =~ s/up$/down/;
133 my @all_pending = sort glob "$queue/$dir/.pending/*";
134 if ( my $pending = shift @all_pending ) {
135 my $raw = read_file $pending;
136 my $pending_function_code = unpack('C',substr($msg->message,2,1));
138 send_message($socket,
139 message_type => MQTT_PUBLISH,
140 retain => 0, #$retain,
143 $pending =~ s{$queue/$dir/.pending/}{};
144 rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code";
147 my $hash = protocol_decode( $up_down, $msg->message );
148 $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids );
153 if (defined $count && --$count == 0) {
156 } elsif ($msg->message_type == MQTT_PINGRESP) {
157 $got_ping_response = 1;
158 print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
160 print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
163 if (Time::HiRes::time > $next_ping) {
164 die "Ping Response timeout. Exiting\n" unless ($got_ping_response);
165 send_message($socket, message_type => MQTT_PINGREQ);
171 my $msg = Net::MQTT::Message->new(@_);
172 print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
174 syswrite $socket, $msg, length $msg;
175 print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
176 $next_ping = Time::HiRes::time + $keep_alive_timer;
181 my $select = IO::Select->new($socket);
182 my $timeout = $next_ping - Time::HiRes::time;
184 my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
185 return $mqtt if (defined $mqtt);
186 $select->can_read($timeout) || return;
187 $timeout = $next_ping - Time::HiRes::time;
188 my $bytes = sysread $socket, $_[0], 2048, length $_[0];
190 warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
194 print "Receive buffer: ", dump_string($_[0], ' '), "\n\n"
196 } while ($timeout > 0);
208 net-mqtt-sub - Perl script for subscribing to an MQTT topic
216 net-mqtt-sub [options] topic1 [topic2] [topic3] ...
220 This script subscribes to one or more MQTT topics and prints any
221 messages that it receives to stdout.
229 Print a brief help message.
233 Print the manual page.
237 The host running the MQTT service. The default is C<127.0.0.1>.
241 The port of the running MQTT service. The default is 1883.
245 The client id to use in the connect message. The default is
246 'NetMQTTpm' followed by the process id of the process.
250 Include more verbose output. Without this option the script only
251 outputs errors and received messages one per line in the form:
255 With one B<-verbose> options, publish messages are printed in a form
256 of a summary of the header fields and the payload in hex dump and text
259 With two B<-verbose> options, summaries are printed for all messages
262 With three B<-verbose> options, a hex dump of all data transmitted and
265 =item B<-keepalive NNN>
267 The keep alive timer value. Defaults to 120 seconds. For simplicity,
268 it is also currently used as the connection/subscription timeout.
272 Read the specificed number of MQTT messages and then exit. Default
275 =item B<-one> or B<-1>
277 Short for B<-count 1>. Read one message and exit.
283 Net::MQTT::Message(3)
287 This is B<not> official IBM code. I work for IBM but I'm writing this
288 in my spare time (with permission) for fun.
292 Mark Hindess <soft-cpan@temporalanomaly.com>
294 =head1 COPYRIGHT AND LICENSE
296 This software is copyright (c) 2014 by Mark Hindess.
298 This is free software; you can redistribute it and/or modify it under
299 the same terms as the Perl 5 programming language system itself.