-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
179 lines (147 loc) · 6.12 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import argparse
import http.client
import io
import os
import time
import urllib
import boto3
import cv2
import libcamera
from dotenv import load_dotenv
from loguru import logger
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder, Quality
from picamera2.outputs import FfmpegOutput
# * Load environment variables
load_dotenv()
os.environ["LIBCAMERA_LOG_LEVELS"] = "4"
endpointUrl = os.getenv("R2_ENDPOINT_URL")
accessKey = os.getenv("R2_ACCESS_KEY")
secretAccessKey = os.getenv("R2_SECRET_ACCESS_KEY")
publicBucket = os.getenv("R2_PUBLIC_URL")
appToken = os.getenv("PUSHOVER_APP_TOKEN")
userKey = os.getenv("PUSHOVER_USER_KEY")
# * Constants
recordingInterval = 7
cascadePath = "/usr/share/opencv4/haarcascades/haarcascade_upperbody.xml"
# *Setup AWS S3 client with specified credentials and endpoint
s3 = boto3.client(
service_name="s3",
endpoint_url=endpointUrl,
aws_access_key_id=accessKey,
aws_secret_access_key=secretAccessKey,
region_name="auto",
)
# * Initialize and configure the camera with image flipping for correct orientation
picam2 = Picamera2()
videoConfig = picam2.create_video_configuration(main={"format": "XRGB8888"})
videoConfig["transform"] = libcamera.Transform(hflip=1, vflip=1)
picam2.configure(videoConfig)
picam2.start()
# * Setup video recording with H264 encoding
encoder = H264Encoder()
# * Load the upper body detector using OpenCV's pre-trained Haar cascade model
bodyDetector = cv2.CascadeClassifier(cascadePath)
def detectBodies():
# * Captures an image from the camera, converts it to grayscale, and detects bodies.
# * @return True if any bodies are detected, otherwise False.
im = (
picam2.capture_array()
) # ? Capture the next camera image from the stream named as its first argument
grey = cv2.cvtColor(
im, cv2.COLOR_BGR2GRAY
) # ? Convert the captured image to grayscale
bodies = bodyDetector.detectMultiScale(grey, 1.1, 5) # * Detect bodies in the image
return len(bodies) > 0 # * Return True if any bodies are detected, otherwise False
def send_notification(video_url):
# * Sends a notification with a link to the video.
# * @param video_url The URL of the video to send in the notification.
# * @return The response from the Pushover API.
logger.info("Sending notification...")
try:
conn = http.client.HTTPSConnection("api.pushover.net:443")
conn.request(
"POST",
"/1/messages.json",
urllib.parse.urlencode(
{
"token": appToken,
"user": userKey,
"title": "Person detected!",
"message": "A person has been detected in the frame! Here is the link to the video:",
"url": video_url,
}
),
{"Content-type": "application/x-www-form-urlencoded"},
)
response = conn.getresponse()
logger.info("Notification sent")
return response
except Exception as e:
logger.error(f"Error sending notification: {str(e)}")
return None
def main():
# * Setup argparse to handle command line arguments
parser = argparse.ArgumentParser(
description="Control video recording and storage options."
)
parser.add_argument(
"--local", action="store_true", help="If set, store videos locally only."
)
args = parser.parse_args()
logger.info("Initializing...")
time.sleep(2) # * Short delay to stabilize camera
logger.info("Initialization complete")
recording = False
startTime = None
while True:
try:
bodyDetected = detectBodies()
currentTime = time.time()
if bodyDetected and not recording:
# * Generate a unique filename for each recording session
timestamp = time.strftime("%Y%m%d_%H%M%S")
outputFilename = f"output_{timestamp}.mp4"
output = FfmpegOutput(outputFilename)
videoUrl = f"{publicBucket}/{outputFilename}"
# * Start recording if a body is detected and not already recording
picam2.start_recording(encoder, output, quality=Quality.LOW)
startTime = currentTime
recording = True
logger.info(f"Body detected, started recording to {outputFilename}")
if recording:
if bodyDetected:
# * Update the timer if body is still detected
startTime = currentTime
elif currentTime - startTime >= recordingInterval:
# * Stop recording if no body is detected for x seconds
picam2.stop_recording()
logger.info("Recording stopped")
recording = False
# * Read the recorded video file
with open(outputFilename, "rb") as file:
fileContent = file.read()
if not args.local:
# * Upload the video file to S3
s3.upload_fileobj(
io.BytesIO(fileContent), "detection", outputFilename
)
logger.info(f"Uploaded {outputFilename} to S3 successfully")
# * Send notification
send_notification(videoUrl)
# * Delete the local file after uploading
os.remove(outputFilename)
logger.info(f"Deleted local file {outputFilename} after upload")
else:
logger.info(f"Stored {outputFilename} locally")
# * Reinitialize camera configuration for detection
picam2.stop()
picam2.configure(videoConfig)
picam2.start()
logger.debug("Camera reinitialized for detection")
# * Sleep for a short duration before checking again
time.sleep(1)
except Exception as e:
logger.error(f"An error occurred: {e}")
if __name__ == "__main__":
main()