simple mqtt client to listen to sensors
authorDobrica Pavlinusic <dpavlin@rot13.org>
Fri, 18 Sep 2020 09:15:48 +0000 (11:15 +0200)
committerDobrica Pavlinusic <dpavlin@rot13.org>
Fri, 18 Sep 2020 09:16:17 +0000 (11:16 +0200)
zc-mqtt [new file with mode: 0755]

diff --git a/zc-mqtt b/zc-mqtt
new file mode 100755 (executable)
index 0000000..6aeafb7
--- /dev/null
+++ b/zc-mqtt
@@ -0,0 +1,256 @@
+#!/usr/bin/perl
+use strict;
+use warnings;
+
+# based on net-mqtt-sub example from Net::MQTT
+#
+# 2020-08-15 Dobrica Pavlinusic <dpavlin@rot13.org>
+
+use strict;
+use Net::MQTT::Constants;
+use Net::MQTT::Message;
+use IO::Select;
+use IO::Socket::INET;
+use Time::HiRes;
+use Getopt::Long;
+use Pod::Usage;
+use POSIX qw(strftime);
+
+$|=1; # flush STDOUT
+
+my $help;
+my $man;
+my $verbose = 2;
+my $host = 'mqtt.zc-sensor.com';
+my $port = 1883;
+my $count;
+my $client_id;
+my $keep_alive_timer = 120;
+GetOptions('help|?' => \$help,
+           'man' => \$man,
+           'verbose+' => \$verbose,
+           'host=s' => \$host,
+           'port=i' => \$port,
+           'count=i' => \$count,
+           'one|1' => sub { $count = 1 },
+           'client_id|client-id|C=s' => \$client_id,
+           'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
+pod2usage(1) if ($help);
+pod2usage(-exitstatus => 0, -verbose => 2) if $man;
+#pod2usage(2) unless (@ARGV); # need a topic
+push @ARGV, '#' unless (@ARGV);
+
+open_again:
+print "Open $host:port keep_alive_timer: $keep_alive_timer\n";
+my $socket =
+  IO::Socket::INET->new(PeerAddr => $host.':'.$port,
+                        Timeout => $keep_alive_timer,
+                       ) or die "Socket connect failed: $!\n";
+
+my $buf = '';
+my $mid = 1;
+my $next_ping;
+my $got_ping_response = 1;
+my @connect = ( message_type => MQTT_CONNECT,
+                keep_alive_timer => $keep_alive_timer );
+push @connect, client_id => $client_id if (defined $client_id);
+send_message($socket, @connect);
+my $msg = read_message($socket, $buf) or die "No ConnAck\n";
+print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
+send_message($socket, message_type => MQTT_SUBSCRIBE,
+             message_id => $mid++,
+             topics => [ map { [ $_ => MQTT_QOS_AT_MOST_ONCE ] } @ARGV ]);
+$msg = read_message($socket, $buf) or die "No SubAck\n";
+print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
+
+while (1) {
+  $msg = read_message($socket, $buf);
+  if ($msg) {
+    print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP;
+    if ($msg->message_type == MQTT_PUBLISH) {
+      if ($verbose == 0) {
+        print $msg->topic, " ", $msg->message, "\n";
+      } else {
+        print $msg->string, "\n";
+      }
+      if (defined $count && --$count == 0) {
+        exit;
+      }
+    } elsif ($msg->message_type == MQTT_PINGRESP) {
+      $got_ping_response = 1;
+      print 'Received: ', $msg->string, "\n" if ($verbose >= 3);
+    } else {
+      print 'Received: ', $msg->string, "\n" if ($verbose >= 2);
+    }
+  }
+  if (Time::HiRes::time > $next_ping) {
+    die "Ping Response timeout.  Exiting\n" unless ($got_ping_response);
+    send_message($socket, message_type => MQTT_PINGREQ);
+  }
+}
+
+sub send_message {
+  my $socket = shift;
+  my $msg = Net::MQTT::Message->new(@_);
+  print 'Sending: ', $msg->string, "\n" if ($verbose >= 2 && $_[1] eq 'message_type' && $_[2] != MQTT_PINGREQ);
+  $msg = $msg->bytes;
+  syswrite $socket, $msg, length $msg;
+  print dump_string($msg, 'Sent: '), "\n\n" if ($verbose >= 3);
+  $next_ping = Time::HiRes::time + $keep_alive_timer;
+}
+
+sub read_message {
+  my $socket = shift;
+  my $select = IO::Select->new($socket);
+  my $timeout = $next_ping - Time::HiRes::time;
+  do {
+    my $mqtt = Net::MQTT::Message->new_from_bytes($_[0], 1);
+    return $mqtt if (defined $mqtt);
+    $select->can_read($timeout) || return;
+    $timeout = $next_ping - Time::HiRes::time;
+    my $bytes = sysread $socket, $_[0], 2048, length $_[0];
+    unless ($bytes) {
+      warn "Socket closed ", (defined $bytes ? 'gracefully' : 'error'), "\n";
+      # FIXME reopen
+      goto open_again;
+    }
+    print "Receive buffer: ", dump_string($_[0], '   '), "\n\n"
+      if ($verbose >= 3);
+  } while ($timeout > 0);
+  return;
+}
+
+__END__
+
+=pod
+
+=encoding UTF-8
+
+=head1 NAME
+
+net-mqtt-sub - Perl script for subscribing to an MQTT topic
+
+=head1 VERSION
+
+version 1.143260
+
+=head1 SYNOPSIS
+
+  net-mqtt-sub [options] topic1 [topic2] [topic3] ...
+
+=head1 DESCRIPTION
+
+This script subscribes to one or more MQTT topics and prints any
+messages that it receives to stdout.
+
+=head1 OPTIONS
+
+=over
+
+=item B<-help>
+
+Print a brief help message.
+
+=item B<-man>
+
+Print the manual page.
+
+=item B<-host>
+
+The host running the MQTT service.  The default is C<127.0.0.1>.
+
+=item B<-port>
+
+The port of the running MQTT service.  The default is 1883.
+
+=item B<-client-id>
+
+The client id to use in the connect message.  The default is
+'NetMQTTpm' followed by the process id of the process.
+
+=item B<-verbose>
+
+Include more verbose output.  Without this option the script only
+outputs errors and received messages one per line in the form:
+
+  topic message
+
+With one B<-verbose> options, publish messages are printed in a form
+of a summary of the header fields and the payload in hex dump and text
+form.
+
+With two B<-verbose> options, summaries are printed for all messages
+sent and received.
+
+With three B<-verbose> options, a hex dump of all data transmitted and
+received is printed.
+
+=item B<-keepalive NNN>
+
+The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
+it is also currently used as the connection/subscription timeout.
+
+=item B<-count NNN>
+
+Read the specificed number of MQTT messages and then exit.  Default
+is 0 - read forever.
+
+=item B<-one> or B<-1>
+
+Short for B<-count 1>.  Read one message and exit.
+
+=back
+
+=head1 SEE ALSO
+
+Net::MQTT::Message(3)
+
+=head1 DISCLAIMER
+
+This is B<not> official IBM code.  I work for IBM but I'm writing this
+in my spare time (with permission) for fun.
+
+=head1 AUTHOR
+
+Mark Hindess <soft-cpan@temporalanomaly.com>
+
+=head1 COPYRIGHT AND LICENSE
+
+This software is copyright (c) 2014 by Mark Hindess.
+
+This is free software; you can redistribute it and/or modify it under
+the same terms as the Perl 5 programming language system itself.
+
+=cut
+
+# 6. Protocol Format 
+# Valid data ID and parameter range supported by the product 
+# DATA ID      ID Description  Data Type( Data Length)       R/W     Range   Default         Remark 
+
+__DATA__
+0x00   Seq #                   DWord(4)        R       /               0       The platform can carry the ID when the platform downstream reads and sets the device parameters, and the device returns the same data. Please refer to the example for use.Each seq # refer to one command and its response. 
+0x01   PN                      DWord(4)        R       /                /      / 
+0x02   Model                   Byte(1)         R       32              32      Inner number:32 
+0x03   X axis angle            Float(4)        R        ‐90°~90°   /       X axis angle 
+0x04   Y axis angle            Float(4)        R       ‐90°~90°    /       Y axis angle 
+0x09   X axis relative angle   Float(4)        R       ‐90°~90°    0       Return the X angle value according to the set relative zero 
+0x0A   Y axis relative angle   Float(4)        R       ‐90°~90°    0       Return the Y angle value according to the set relative zero 
+0x0C   Sensor temperature      Word(2)         R       ‐32768~32767  /       signed,sensor temperature =Data/100, unit ℃ 
+0x0D   Power source voltage    Word(2)         R       0~65535         /       Voltage= Data/100, unit V 
+0x11    Arming/disarming       Byte(1)         R/W     0~255           1       0 means disarming, non‐zero means arming 
+0x12   Alarm delay time        Byte(1)         R/W     3~255           20       The unit is 0.1 second, which means that the product responds to the alarm only after the alarm has exceeded the alarm angle for a certain period of time. 
+
+0x13   Restore factory setting Byte(1)         R/W     0~255           0       0: Do nothing Non‐zero: Restore the non‐network‐related parameters of the sensor. 
+0x14   Server IP&port          4*Byte(1)+Word(2)       R/W     /       CTIOT:117.60.157.137,5683 MQTT:0.0.0.0,0        The server address should be IP, not the domain name; using the domain name may cause the connection server to be unstable and cause data loss.
+0x17   Signal strength         Byte(1)         R       10~34           /       A larger value indicates a stronger signal 
+0x18   Sensor operating mode   Byte(1)         R/W     0               0       The sensor can only work in absolute measurement mode 
+0x19    Alarm axis             Byte(1)         R       0~      3       /       0: no alarm; 1: X‐axis alarm 2: Y axis alarm; 3: X/Y axis alarm at the same time 
+0x1A   SIM card ID             QWord(8)        R       0~18446744073709551615  /       Take the first 19 digits, the last digit is discarded 
+0x21    Heartbeat interval     DWord(4)        R/W     60~131071        86400  Interval at which the device periodically uploads data to the server 
+0x22   IMEI number of the device       QWord(8)         R      0~18446744073709551615  /       Refers to the IMEI of the NB‐IOT network module in the product. 
+0x23   Backup server IP&port   4*Byte(1)+Word(2)       R/W     /       CTIOT:117.60.157.137,5683 MQTT:0.0.0.0,0        / 
+0x24   Backup server enable    Byte(1)         R/W     0~255   0       0 means off, non‐zero means on 
+0x33   DNS IP address          4*Byte(1)       R/W     /       208.67.222.222  / 
+0x34   Domain name and port    64*Byte(1)      R/W     /       mqtt.zc‐sensor.com,1883       Supports CTIOT and MQTT protocols. Priority IP in the case of IP (ID number 0x14); domain name and port should be separated with comma, length <=64 
+0x35    MQTT‐ClientID                32*Byte(1)      R/W      /      IMEI number of the device       Length <=32, subject to MQTT related specifications 
+0x36   MQTT‐Username                 32*Byte(1)      R/W     /       empty           Length <=32, subject to MQTT related specifications