X-Git-Url: http://git.rot13.org/?a=blobdiff_plain;f=zc-mqtt;h=e1593bec86aec997d0d36564c3e120c6353b55fc;hb=93c26262fb1edaa3d67c01d24c3602a583eaad82;hp=f95495664ff53b63275e94a658317e8d6f07dfe1;hpb=9efb310110d8ec5e20ae9eb11f7ab36ed7b6b112;p=zc diff --git a/zc-mqtt b/zc-mqtt index f954956..e1593be 100755 --- a/zc-mqtt +++ b/zc-mqtt @@ -15,13 +15,54 @@ use Time::HiRes; use Getopt::Long; use Pod::Usage; use POSIX qw(strftime); +use File::Slurp; +use DBD::Pg; +use Data::Dump qw(dump); +use autodie; + +BEGIN { + my $cwd = $0; + $cwd =~ s{/[^/]+$}{}; + chdir $cwd; + + my $log_dir = 'log'; + mkdir $log_dir unless -d $log_dir; + + open(my $log, '>', "$log_dir/".strftime("%Y-%m-%dT%H:%M:%S",localtime())); + select($log); + $|=1; +} + +use lib '.'; +use Protocol; + +my $queue = "queue"; + +my $dbh = DBI->connect("dbi:Pg:dbname=zc", "dpavlin", "", { RaiseError => 1 }); +my @columns = split(/\n/, <<__COLUMNS__); +0x01 PN +0x03 X_axis_angle +0x04 Y_axis_angle +0x0c Sensor_temperature +0x0d Power_source_voltage +0x11 Arming_disarming +0x17 Signal_strength +0x18 Sensor_operating_mode +0x19 Alarm_axis +__COLUMNS__ + +my $cols = join(',', map { (split(/\t/,$_))[1] } @columns); +my @insert_data_ids = map { hex( (split(/\t/,$_))[0] ) } @columns ; -$|=1; # flush STDOUT +my $sql_placeholders = ',?' x ($#columns+1); +$sql_placeholders =~ s/^,//; + +my $sth = $dbh->prepare(qq{insert into zc ($cols) values ($sql_placeholders)}); my $help; my $man; my $verbose = 2; -my $host = 'mqtt.zc-sensor.com'; +my $host = 'localhost'; my $port = 1883; my $count; my $client_id; @@ -66,7 +107,9 @@ print 'Received: ', $msg->string, "\n" if ($verbose >= 2); while (1) { $msg = read_message($socket, $buf); if ($msg) { - print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime()) if $msg->message_type != MQTT_PINGRESP; + my $t = time(); + my ($date,$time) = split(/ /,strftime("%Y-%m-%d %H:%M:%S", localtime($t))); + print "\n",strftime("%Y-%m-%d %H:%M:%S ", localtime($t)) if $msg->message_type != MQTT_PINGRESP; if ($msg->message_type == MQTT_PUBLISH) { if ($verbose == 0) { print $msg->topic, " ", $msg->message, "\n"; @@ -74,18 +117,50 @@ while (1) { print $msg->string, "\n"; } -# if ( $msg->topic =~ m{Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up} ) { - if ( substr($msg->message,2,1) eq "\x07" ) { # heartbeat - my $raw = "\x5a\x0b\x03\x09\x00\x00\x04\xe8\x03\x00\x00\x21\x44\xaa\x4B\xF3"; + # skip retained + next if $msg->string =~ m{Publish/at-most-once,retain}; + my $topic = $msg->topic; - $topic =~ s/up$/down/; - send_message($socket, - message_type => MQTT_PUBLISH, - retain => 0, #$retain, - topic => $topic, - message => $raw); + # Inclinometer/ZCT330Ex_SWP_N_YK/869858031634109/up + my $dir = $topic; # leave imei in dir + $dir =~ s{\w+/\w+/(\w+)/(\w+)$}{$1}; + my $up_down = $2; + + mkdir "$queue" if ( ! -e "$queue" ); + mkdir "$queue/$dir" if ( ! -e "$queue/$dir" ); + mkdir "$queue/$dir/.pending" if ( ! -e "$queue/$dir/.pending" ); + + my $function_code = unpack('C',substr($msg->message,2,1)); + + mkdir "$queue/$dir/$date" if ! -d "$queue/$dir/$date"; + write_file "$queue/$dir/$date/$time.$t.$up_down.$function_code", $msg->message; + + if ( $function_code == 7 || $function_code == 8 ) { # 7 = heartbeat, 8 = alarm + my $hash = protocol_decode( $up_down, $msg->message ); + $sth->execute( map { $hash->{data_id}->{$_} } @insert_data_ids ); + + } + + # send pending messages on any connection + my @all_pending = sort glob "$queue/$dir/.pending/*"; + if ( my $pending = shift @all_pending ) { + my $raw = read_file $pending; + my $pending_function_code = unpack('C',substr($msg->message,2,1)); + + $topic =~ s/up$/down/; + + send_message($socket, + message_type => MQTT_PUBLISH, + retain => 0, #$retain, + topic => $topic, + message => $raw); + $pending =~ s{$queue/$dir/.pending/}{}; + rename "$queue/$dir/.pending/$pending", "$queue/$dir/$pending.sent.$pending_function_code"; + } + + if (defined $count && --$count == 0) { exit; } @@ -236,34 +311,3 @@ the same terms as the Perl 5 programming language system itself. =cut -# 6. Protocol Format -# Valid data ID and parameter range supported by the product -# DATA ID ID Description Data Type( Data Length) R/W Range Default Remark - -__DATA__ -0x00 Seq # DWord(4) R / 0 The platform can carry the ID when the platform downstream reads and sets the device parameters, and the device returns the same data. Please refer to the example for use.Each seq # refer to one command and its response. -0x01 PN DWord(4) R / / / -0x02 Model Byte(1) R 32 32 Inner number:32 -0x03 X axis angle Float(4) R ‐90°~90° / X axis angle -0x04 Y axis angle Float(4) R ‐90°~90° / Y axis angle -0x09 X axis relative angle Float(4) R ‐90°~90° 0 Return the X angle value according to the set relative zero -0x0A Y axis relative angle Float(4) R ‐90°~90° 0 Return the Y angle value according to the set relative zero -0x0C Sensor temperature Word(2) R ‐32768~32767 / signed,sensor temperature =Data/100, unit ℃ -0x0D Power source voltage Word(2) R 0~65535 / Voltage= Data/100, unit V -0x11 Arming/disarming Byte(1) R/W 0~255 1 0 means disarming, non‐zero means arming -0x12 Alarm delay time Byte(1) R/W 3~255 20 The unit is 0.1 second, which means that the product responds to the alarm only after the alarm has exceeded the alarm angle for a certain period of time. - -0x13 Restore factory setting Byte(1) R/W 0~255 0 0: Do nothing Non‐zero: Restore the non‐network‐related parameters of the sensor. -0x14 Server IP&port 4*Byte(1)+Word(2) R/W / CTIOT:117.60.157.137,5683 MQTT:0.0.0.0,0 The server address should be IP, not the domain name; using the domain name may cause the connection server to be unstable and cause data loss. -0x17 Signal strength Byte(1) R 10~34 / A larger value indicates a stronger signal -0x18 Sensor operating mode Byte(1) R/W 0 0 The sensor can only work in absolute measurement mode -0x19 Alarm axis Byte(1) R 0~ 3 / 0: no alarm; 1: X‐axis alarm 2: Y axis alarm; 3: X/Y axis alarm at the same time -0x1A SIM card ID QWord(8) R 0~18446744073709551615 / Take the first 19 digits, the last digit is discarded -0x21 Heartbeat interval DWord(4) R/W 60~131071 86400 Interval at which the device periodically uploads data to the server -0x22 IMEI number of the device QWord(8) R 0~18446744073709551615 / Refers to the IMEI of the NB‐IOT network module in the product. -0x23 Backup server IP&port 4*Byte(1)+Word(2) R/W / CTIOT:117.60.157.137,5683 MQTT:0.0.0.0,0 / -0x24 Backup server enable Byte(1) R/W 0~255 0 0 means off, non‐zero means on -0x33 DNS IP address 4*Byte(1) R/W / 208.67.222.222 / -0x34 Domain name and port 64*Byte(1) R/W / mqtt.zc‐sensor.com,1883 Supports CTIOT and MQTT protocols. Priority IP in the case of IP (ID number 0x14); domain name and port should be separated with comma, length <=64 -0x35 MQTT‐ClientID 32*Byte(1) R/W / IMEI number of the device Length <=32, subject to MQTT related specifications -0x36 MQTT‐Username 32*Byte(1) R/W / empty Length <=32, subject to MQTT related specifications