Skip to content

Commit

Permalink
Merge pull request #193 from nebulous/payload-parsing
Browse files Browse the repository at this point in the history
Payload parsing refactor, associated frontend changes, and new cli bus prodding tool
  • Loading branch information
nebulous authored Sep 27, 2024
2 parents 2148255 + 2b04a9d commit daab4d5
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 227 deletions.
44 changes: 44 additions & 0 deletions climate
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/perl

use lib 'lib';
use strict;
use feature 'say';
use CarBus;
use Getopt::Long;
use IO::File;
use IO::Socket::IP;
use IO::Termios;
use JSON;

my $opt={ src=>'FakeSAM', dst=>'Thermostat', cmd=>'read', reg=>'0104', src_bus=>1, dst_bus=>1};
GetOptions($opt, 'src=s', 'dst=s', 'reg=s', 'cmd=s' );
$opt->{payload_raw} ||= pack("H*","00".$opt->{reg});

my $reqframe = CarBus::Frame->new($opt);
print STDERR $reqframe->frame_log."\n";

my $tcp = CarBus->new(IO::Socket::IP->new(PeerHost=>'192.168.1.23', PeerPort=>23)); #tcp
my $sam = CarBus->new(IO::Termios->open("/dev/cu.usbserial-A7039O5G", "38400,8,n,1")); #serial
#my $fil = CarBus->new(IO::File->new("<ccn.log"));

my $bridge = CarBus::Bridge->new(buslist=>[$tcp,$sam]);
my $lastwrite=0;
my $tries=8;
while(1) {
foreach my $frame ($bridge->drive) {
next unless $frame;
next unless ( $frame->struct->{dst} eq $reqframe->struct->{src}
and $frame->struct->{src} eq $reqframe->struct->{dst}
and ( $frame->struct->{reg_string} eq $reqframe->struct->{reg_string}
or $frame->struct->{cmd} eq 'exception' )
);
print STDERR $frame->frame_log."\n";
print encode_json($frame->frame_hash);
exit;
}
if (time>$lastwrite) {
$bridge->write($reqframe);
$lastwrite=time;
die 'timeout' unless $tries--;
}
}
5 changes: 2 additions & 3 deletions docker-compose-dev.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "2.1"

services:
infinitude:
container_name: infinitude
Expand All @@ -9,7 +7,8 @@ services:
build:
context: .
dockerfile: Dockerfile
network_mode: host
ports:
- "3000:3000"
volumes:
- ./:/infinitude/
environment:
Expand Down
4 changes: 0 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "2.1"

services:
infinitude:
container_name: infinitude
Expand All @@ -9,8 +7,6 @@ services:
context: .
dockerfile: Dockerfile
network_mode: host
ports:
- "3000:3000"
volumes:
- ./state:/infinitude/state
environment:
Expand Down
54 changes: 15 additions & 39 deletions infinitude
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ sub serial_init {
require IO::Termios;
warn "Using $config->{serial_tty} serial interface\n";
$handle ||= IO::Termios->open($config->{serial_tty},"38400,8,n,1");
$handle->blocking(0);
} else {
warn "Can't find serial device: $config->{serial_tty}. Serial monitoring disabled.\n" if $config->{serial_tty};
delete $config->{serial_tty};
Expand All @@ -79,38 +78,11 @@ sub serial_init {
$port //= 'telnet';
warn "Using $host port $port for serial interface\n";
$handle ||= IO::Socket::IP->new( PeerHost=>$host, PeerPort=>$port);
$handle->blocking(0);
}
$carbus = CarBus->new(fh=>$handle) if $handle;
$carbus = CarBus->new($handle) if $handle;
}
serial_init();

my @inbox = ();
if ($use_serial) {
my $last_frame_time = time;
my $frame_inbox_id = Mojo::IOLoop->recurring(0.0625 => sub {
my $loop = shift;
$loop->on(finish => sub {
warn "Frame filler loop finishing";
});
$loop->on(reset => sub {
warn "Frame filler loop resetting";
});
if ((time - $last_frame_time) < 10) {
return unless $carbus;
$carbus->fh_fill() if $carbus->fh;
my $frame = $carbus->get_frame;
if (not $frame->{error}) {
push(@inbox, $frame);
$last_frame_time = time;
}
shift @inbox if scalar(@inbox)>128;
} else {
serial_init();
}
});
}

app->secrets([$config->{app_secret}]);
push (@{app->static->paths}, ('development' eq ($ENV{MOJO_MODE}//'')) ? 'public/app' : 'public/dist');

Expand All @@ -121,7 +93,7 @@ hook before_dispatch => sub {

my $url = $c->req->url;

if ($url->to_abs->host =~ /(bryant|carrier|ioncomfort|infinitude|127.0.0.1)/i) { # request from stat or test harness
if ($url->to_abs->host =~ /(bryant|carrier|ioncomfort|infinitude)/i) { # request from stat or test harness
my $nk = $url->path->to_string;
$nk =~ s/\//-/g;
$nk =~ s/^-//;
Expand Down Expand Up @@ -439,26 +411,28 @@ any '/systems/:system_id/:part' => sub {
my $scantab = 0;
my $scanrow = 0;
my $scansec = 0;
my $lastframe = 0;
websocket '/serial' => sub {
my $c = shift;
unless ($use_serial) {
$c->app->log->info("Websocket opened, but no streaming source is available");
$c->app->log->info("Websocket opened, but no streaming source is configured");
return;
}

my $socketloop_id = Mojo::IOLoop->recurring(0.03125 => sub {
my $socketloop_id = Mojo::IOLoop->recurring(0.0625 => sub {
my $loop = shift;
if (my $frame = shift @inbox) {
serial_init() if (!$carbus or time>($lastframe+10));
if (my $frame = $carbus->get_frame) { return if (!$frame or !$frame->struct->{cmd});
my $fstruc = $frame->frame_hash;
$fstruc->{timestamp} = time;

if ($fstruc->{Function} eq 'reply' and $fstruc->{type}) {
if (my $payf = $fstruc->{payload}) {
$fstruc->{timestamp} = $lastframe = time;
if ($fstruc->{cmd} eq 'reply') {
my $payf = $fstruc->{payload};
if ($payf) {
if ($payf->{rows} and $ENV{SCAN_THERMOSTAT}) {
$scanrow = $payf->{rows}+1;
warn sprintf(">>>>>>>>>>> table %02x has %d rows <<<<<<<<<<<<<<<<<<<", $scantab, $payf->{rows});
}
$fstruc->{field} = { $fstruc->{type}=>$fstruc->{payload} };
$fstruc->{field} = { $fstruc->{reg_name}||'unknown' => $payf };
}
}

Expand All @@ -477,14 +451,16 @@ websocket '/serial' => sub {
$carbus->samreq($scantab, $scanrow);
}
}
#warn $frame->frame_log;
$c->send({json=>$fstruc});
}
});
$c->app->log->info("Websocket $socketloop_id Established");

$c->on('finish' => sub {
my ($c, $code) = @_;
Mojo::IOLoop->remove($socketloop_id);
$c->app->log->info("Websocket Closed: $code");
$c->app->log->info("Websocket $socketloop_id Closed: $code");
});
};

Expand Down
113 changes: 74 additions & 39 deletions lib/CarBus.pm
Original file line number Diff line number Diff line change
@@ -1,44 +1,50 @@
package CarBus;
use Moo;
use CarBus::Frame;
use Scalar::Util qw/blessed/;

has async => (is=>'ro', default=>sub{0});
has fh => (is=>'ro');
has fh => (is=>'ro', isa=>sub{
die 'fh must be an IO::Handle or subclass thereof' unless
defined blessed($_[0]) and $_[0]->isa('IO::Handle');
});
has buffer => (is=>'rw', default=>'');

use constant MAX_BUFFER => 1024;
use constant MIN_FRAME => 10;

sub BUILDARGS {
my ( $class, @args ) = @_;
unshift @args, "fh" if @args % 2 == 1;
return { @args };
my $argref = { @args };
$argref->{fh}->blocking(0);
return $argref;
};

sub get_frame {
my $self = shift;
my $self = shift;

my $max_attempts = $self->async ? $self->buflen : MAX_BUFFER;
my $attempts = 0;
while ($attempts++<$max_attempts) {
my $data_len = $self->buflen>4 ? ord(substr($self->buffer,4,1)) : 0;
if ($data_len>0) {
my $frame_len = 10+$data_len;
if ($self->buflen>=$frame_len) {
my $frame_string = substr($self->buffer,0,$frame_len);
my $cbf = CarBus::Frame->new($frame_string);
if ($cbf->valid) {
$self->shift_stream($frame_len);
$self->handlers($cbf);
return $cbf;
}
$self->shift_stream(1);
}
} else {
$self->shift_stream(1);
}
$self->fh_fill();
}
return { error=>'timed out or EOF' };
my $max_attempts = $self->buflen>MAX_BUFFER ? $self->buflen : MAX_BUFFER;
my $attempts = 0;
while ($attempts++<$max_attempts) {
if ($self->buflen < MIN_FRAME) {
$self->fh_fill();
next;
}
my $data_len = ord(substr($self->buffer,4,1));
my $frame_len = MIN_FRAME+$data_len;
if ($self->buflen >= $frame_len) {
my $frame_string = substr($self->buffer,0,$frame_len);
my $cbf = CarBus::Frame->new($frame_string);
if ($cbf->valid) {
$self->shift_stream($frame_len);
$self->handlers($cbf);
return $cbf;
}
$self->shift_stream(1);
}
$self->fh_fill();
}
return undef;
}

sub fh_fill {
Expand Down Expand Up @@ -71,29 +77,58 @@ sub shift_stream {
$self->buffer(substr($self->buffer,$byte_num));
}

sub write {
my $self = shift;
my $frame = shift;
$self->fh->syswrite($frame->frame);
}

sub samreq {
my $self = shift;
my ($table, $row, $frameopts) = @_;
$frameopts //= {};
my $samframe = CarBus::Frame->new(data=>pack("C*", 0, $table, $row), %$frameopts);
$self->fh->syswrite($samframe->frame);
my $samframe = CarBus::Frame->new(
src=>'FakeSAM', src_bus=>1,
dst=>'Thermostat', dst_bus=>1,
cmd=>'read',
payload_raw=>pack("C*", 0, $table, $row),
%$frameopts
);
$self->write($samframe);
return $samframe;
}

sub handlers {
my $self = shift;
my $frame = shift;
my $f = $frame->frame_hash;
#if (
# $f->{DstClass} eq 'SAM' and $f->{Function} eq 'read' and
# $f->{table} == 1 and $f->{row} == 4 ) {
# my $nf = CarBus::Frame->new( Function=>'reply',
# DstClass=>$f->{SrcClass}, SrcClass=>'SAM',
# DstAddress=>$f->{SrcAddress}, SrcAddress=>1,
# data => pack("C*",0,1,4)."\0SYSTEM ACCESS MODULE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0CESR131379-03 SYSTXCCSAM01\0\0\0\0\0\0\0\01009N182206-1009N182206-------------",
# );
#$self->fh->syswrite($nf->frame);
#}
# mangle frame contents;
}

package CarBus::Bridge;
use Moo;

has buslist => (is=>'ro');
has routes => (is=>'rw', default=>sub{{}});

sub drive {
my $self = shift;
my @frames = ();
foreach my $srcbus (@{$self->buslist}) {
if (my $frame = $srcbus->get_frame()) {
push(@frames,$frame);
foreach my $dstbus (@{$self->buslist}) {
next if $srcbus == $dstbus;
$dstbus->write($frame);
}
}
}
return @frames;
}

sub write {
my $self = shift;
my $frame = shift;
$_->write($frame) for @{$self->buslist};
}

1;
Loading

0 comments on commit daab4d5

Please sign in to comment.