-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
87 additions
and
72 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,4 @@ | ||
bin_PROGRAMS = net_strength can_msg_rate heartbeat | ||
bin_PROGRAMS = can_msg_rate heartbeat | ||
|
||
net_strength_SOURCES = net_strength.c | ||
can_msg_rate_SOURCES = can_msg_rate.c | ||
heartbeat_SOURCES = heartbeat.c |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
/* | ||
* Heartbeat Message Kafka Producer | ||
* Heartbeat Message Kafka Producer | ||
* | ||
* Author: Yang Wang <[email protected]> | ||
* | ||
* Copyright (C) 2017 Purdue University | ||
* Copyright (C) 2018 Purdue University | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to | ||
|
@@ -39,14 +39,17 @@ | |
|
||
static char key[100]; | ||
static char *brokers = "localhost:9092"; | ||
int offline_min = 0; | ||
static char *command = "qmicli -p -d /dev/cdc-wdm0 --nas-get-signal-strength | \ | ||
head -n 3 | tail -n 1 | sed \"s/^.*'\\([-0-9]*\\) dBm'[^']*$/\\1/\""; | ||
|
||
const char D_HB_SCHEMA[] = | ||
"{\"type\":\"record\",\ | ||
\"name\":\"dhb\",\ | ||
\"fields\":[\ | ||
{\"name\": \"timestamp\", \"type\": \"double\"},\ | ||
{\"name\": \"heartbeat\", \"type\": \"boolean\"}]}"; | ||
{\"name\": \"ns\", \"type\": \"int\"},\ | ||
{\"name\": \"netled\", \"type\": \"boolean\"},\ | ||
{\"name\": \"statled\", \"type\": \"boolean\"}]}"; | ||
|
||
/* Timer handler */ | ||
void timer_handler(int signum) { | ||
|
@@ -67,9 +70,12 @@ void timer_handler(int signum) { | |
struct timeval tp; | ||
double timestamp; | ||
|
||
/* Heartbeat */ | ||
static bool hb; | ||
static int ret; | ||
/* Heartbeat message variables */ | ||
static int ns; | ||
static int ret; | ||
static bool netled = true; | ||
static bool statled = true; | ||
static FILE *fn; | ||
|
||
/* Broker conf */ | ||
if (conf == NULL) { | ||
|
@@ -98,7 +104,7 @@ void timer_handler(int signum) { | |
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); | ||
} | ||
} | ||
|
||
if (rd_kafka_brokers_add(rk, brokers) == 0) { | ||
fprintf(stderr, "%% No valid brokers specified\n"); | ||
exit(EXIT_FAILURE); | ||
|
@@ -123,7 +129,7 @@ void timer_handler(int signum) { | |
} | ||
|
||
/* Create avro record based on the schema */ | ||
if (d_hb == NULL) { | ||
if (d_hb == NULL) { | ||
d_hb = avro_record(d_hb_schema); | ||
} | ||
|
||
|
@@ -137,10 +143,9 @@ void timer_handler(int signum) { | |
perror("system"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
|
||
/* If network checking returns error */ | ||
if (WEXITSTATUS(ret) == 0) { | ||
hb = true; | ||
offline_min = 0; | ||
/* Indicate online */ | ||
system("echo 0 > /sys/class/leds/LED_4_RED/brightness"); | ||
system("echo 255 > /sys/class/leds/LED_4_GREEN/brightness"); | ||
|
@@ -149,59 +154,82 @@ void timer_handler(int signum) { | |
fflush(stdout); | ||
#endif | ||
} else { | ||
hb = false; | ||
offline_min++; | ||
/* Indicate offline */ | ||
system("echo 0 > /sys/class/leds/LED_4_GREEN/brightness"); | ||
system("echo 255 > /sys/class/leds/LED_4_RED/brightness"); | ||
#if DEBUG | ||
printf("%f: dead\n", timestamp); | ||
fflush(stdout); | ||
#endif | ||
if (offline_min > 2) { | ||
/* Force udev trigger on the modem */ | ||
#if DEBUG | ||
printf("Try force udev triggering ...\n"); | ||
fflush(stdout); | ||
printf("Try force udev triggering ...\n"); | ||
fflush(stdout); | ||
#endif | ||
system("udevadm trigger /sys/class/net/wwan0"); | ||
} | ||
system("udevadm trigger /sys/class/net/wwan0"); | ||
} | ||
|
||
/* Only producing if network is good */ | ||
if (hb) { | ||
avro_datum_t ts_datum = avro_double(timestamp); | ||
avro_datum_t hb_datum = avro_boolean(hb); | ||
|
||
if (avro_record_set(d_hb, "timestamp", ts_datum) | ||
|| avro_record_set(d_hb, "heartbeat", hb_datum)) { | ||
fprintf(stderr, "Unable to set record to d_msg_rate\n"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
if (avro_write_data(writer, d_hb_schema, d_hb)) { | ||
fprintf(stderr, "unable to write d_msg_rate datum to memory\n"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, | ||
buf, avro_writer_tell(writer), key, strlen(key), NULL) == -1) { | ||
fprintf(stderr, "%% Failed to produce to topic %s " | ||
"partition %i: %s\n", | ||
rd_kafka_topic_name(rkt), RD_KAFKA_PARTITION_UA, | ||
rd_kafka_err2str(rd_kafka_last_error())); | ||
rd_kafka_poll(rk, 0); | ||
} | ||
/* Get the network strength from command */ | ||
fn = popen(command, "r"); | ||
if (fn != NULL) { | ||
fscanf(fn, "%d", &ns); | ||
} else { | ||
perror("popen"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
rd_kafka_poll(rk, 0); | ||
/* Close the subprocess */ | ||
if (pclose(fn) < 0) { | ||
perror("pclose"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
/* Decrement all our references to prevent memory from leaking */ | ||
avro_datum_decref(ts_datum); | ||
avro_datum_decref(hb_datum); | ||
#if DEBUG | ||
printf("%f: network strength is %d\n", timestamp, ns); | ||
fflush(stdout); | ||
#endif | ||
|
||
/* Reset the writer */ | ||
avro_writer_reset(writer); | ||
if (ns < -100) { | ||
printf("%f: Network strength %d dBm doesn't make sense! Something WRONG!\n", | ||
timestamp, ns); | ||
} | ||
|
||
avro_datum_t ts_datum = avro_double(timestamp); | ||
avro_datum_t ns_datum = avro_int32(ns); | ||
avro_datum_t netled_datum = avro_boolean(netled); | ||
avro_datum_t statled_datum = avro_boolean(statled); | ||
|
||
if (avro_record_set(d_hb, "timestamp", ts_datum) | ||
|| avro_record_set(d_hb, "ns", ns_datum) | ||
|| avro_record_set(d_hb, "netled", netled_datum) | ||
|| avro_record_set(d_hb, "statled", statled_datum)) { | ||
fprintf(stderr, "Unable to set record to d_hb\n"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
if (avro_write_data(writer, d_hb_schema, d_hb)) { | ||
fprintf(stderr, "unable to write d_hb datum to memory\n"); | ||
exit(EXIT_FAILURE); | ||
} | ||
|
||
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, | ||
buf, avro_writer_tell(writer), key, strlen(key), NULL) == -1) { | ||
fprintf(stderr, "%% Failed to produce to topic %s " | ||
"partition %i: %s\n", | ||
rd_kafka_topic_name(rkt), RD_KAFKA_PARTITION_UA, | ||
rd_kafka_err2str(rd_kafka_last_error())); | ||
rd_kafka_poll(rk, 0); | ||
} | ||
|
||
rd_kafka_poll(rk, 0); | ||
|
||
/* Decrement all our references to prevent memory from leaking */ | ||
avro_datum_decref(ts_datum); | ||
avro_datum_decref(ns_datum); | ||
avro_datum_decref(netled_datum); | ||
avro_datum_decref(statled_datum); | ||
|
||
/* Reset the writer */ | ||
avro_writer_reset(writer); | ||
} | ||
|
||
int main(int argc, char *argv[]) { | ||
|
@@ -248,17 +276,17 @@ int main(int argc, char *argv[]) { | |
sa.sa_handler = &timer_handler; | ||
sigaction(SIGALRM, &sa, NULL); | ||
|
||
/* Configure the timer to expire after 1 min... */ | ||
timer.it_value.tv_sec = 60; | ||
/* Configure the timer to expire after 10 secs... */ | ||
timer.it_value.tv_sec = 5; | ||
timer.it_value.tv_usec = 0; | ||
/* ... and every 1 min after that. */ | ||
timer.it_interval.tv_sec = 60; | ||
/* ... and every 10 secs after that. */ | ||
timer.it_interval.tv_sec = 5; | ||
timer.it_interval.tv_usec = 0; | ||
|
||
/* Start a real timer. It counts down whenever this process is | ||
* executing. */ | ||
setitimer(ITIMER_REAL, &timer, NULL); | ||
|
||
system("echo 255 > /sys/class/leds/LED_4_RED/brightness"); | ||
|
||
while (1) { | ||
|