-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
executable file
·65 lines (49 loc) · 2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python3
import argparse
import logging
import time
import numpy as np
from waggle.plugin import Plugin
from waggle.data.vision import Camera
def process_frame(frame):
# we assume frame shape is (H, W, 3) for an RGB image
mean = np.mean(frame, (0, 1)).astype(float)
min = np.min(frame, (0, 1)).astype(float)
max = np.max(frame, (0, 1)).astype(float)
return {
"mean": mean,
"min": min,
"max": max,
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--device", default=0, help="camera device to use")
parser.add_argument("--interval", default=10, type=float, help="sampling interval in seconds")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(message)s',
datefmt='%Y/%m/%d %H:%M:%S')
logging.info("starting plugin. will process a frame every %ss", args.interval)
with Plugin() as plugin:
cam = Camera(args.device)
for sample in cam.stream():
logging.info("processing frame")
results = process_frame(sample.data)
logging.info("image summary")
logging.info("mean: %s", results["mean"])
logging.info("min: %s", results["min"])
logging.info("max: %s", results["max"])
plugin.publish("image.mean.red", results["mean"][0])
plugin.publish("image.mean.green", results["mean"][1])
plugin.publish("image.mean.blue", results["mean"][2])
plugin.publish("image.min.red", results["min"][0])
plugin.publish("image.min.green", results["min"][1])
plugin.publish("image.min.blue", results["min"][2])
plugin.publish("image.max.red", results["max"][0])
plugin.publish("image.max.green", results["max"][1])
plugin.publish("image.max.blue", results["max"][2])
logging.info("published summary")
time.sleep(args.interval)
if __name__ == "__main__":
main()