Skip to content

Commit

Permalink
Threaded BirdWeather submissions with retry support
Browse files Browse the repository at this point in the history
Run BirdWeather submissions in a separate thread to run in the background it doesn't hold up or slow down processing audio and detection's.

Instead of creating a new thread to process each Birdweather submission as they occur, add the data used for the Birdweather submission to a queue.

Then use a single thread running birdweather_submission_processor in a endless loop to process Birdweather submissions as they are added to that queue.

This avoids accidentally sending too many Birdweather submission at once, so we're only ever sending 1 submission at any one time.

Add timeout values to the requests as they will run indefinitely if no timeout is specified

Further, the ability to retry requests was implemented but only for connection timeouts and specific responses codes that can be retried on.

A exponential backoff delay is calculated before retrying again to provide adequate delay between requests and ease server pressure
  • Loading branch information
jaredb7 committed Aug 28, 2023
1 parent 762732c commit ddfcda0
Showing 1 changed file with 265 additions and 0 deletions.
265 changes: 265 additions & 0 deletions scripts/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import queue
import re
import sys
from pathlib import Path
from tzlocal import get_localzone
import datetime
Expand Down Expand Up @@ -42,6 +44,26 @@
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# List of our BirdWeather submission threads
bw_worker_threads = list()
bw_submission_queue = queue.Queue()
# BirdWeather Soundscape ID caching
bw_soundscape_submission_id_cache = list()
bw_soundscape_submission_id_cache_limit = 40
#
# Retry on these response codes
bw_request_retry_on_status = [404, 429, 500, 502, 503, 504]
bw_default_request_timeout = 10
bw_default_post_timeout = 6 * bw_default_request_timeout
# Stop processing once we hit the max number of tries, API might be down so limit the retries, together with the
# bw_default_request_timeout the total retry period is 60 seconds without taking the backoff time between requests into account
bw_request_max_retries = 6
# Used in an exponential calculation to provide the number of seconds to wait before making the request again
bw_request_backoff_factor = 3.5

# DEBUG flag to enable debug output in select functions
debug_birdweather_submissions = False

try:
server.bind(ADDR)
except BaseException:
Expand Down Expand Up @@ -302,6 +324,7 @@ def analyzeAudioData(chunks, lat, lon, week, sensitivity, overlap,):
for x in range(len(p)):
if "Human" in p[x][0]:
HUMAN_DETECTED = True
break

# Save result and timestamp
pred_end = pred_start + 3.0
Expand Down Expand Up @@ -618,10 +641,252 @@ def handle_client(conn, addr):
conn.close()


def birdweather_submit(bw_submission_data):
soundscape_id = None
#
extra_debug_output = ''

# Grab the URL and sound data form the supplied dictionary
# Soundscape POST data
soundscape_url = bw_submission_data.pop('soundscape_url')
wave_sound_data = bw_submission_data.pop('gzip_wav_data')
soundscape_filename = bw_submission_data.pop('soundscape_filename')
# Detection post data
detection_url = bw_submission_data.pop('detection_url')
detection_post_json = bw_submission_data.pop('detection_post_json')

if debug_birdweather_submissions:
print(f'BirdWeather Submission:: DEBUG:: URL: {soundscape_url}', flush=True)

##################################
# SOUNDSCAPE UPLOAD #############
##################################
# Loop for the max number of retries
for ss_p_rt in range(bw_request_max_retries):
extra_debug_output = ''
# Don't calculate or sleep on the first loop as this is the initial attempt
if ss_p_rt > 0:
# Calculate the backoff time before making another request
request_backoff_time = bw_request_backoff_factor * (2 ** (ss_p_rt - 1))
# We're retrying, retry after the calculated backoff time
print(f'BirdWeather Submission Error:: Retrying after {request_backoff_time}s, Retry ({ss_p_rt} of {bw_request_max_retries})', flush=True)
time.sleep(request_backoff_time)

try:
# First see if we've already submitted this wave file and get the soundscape id for it
find_existing_soundscape = birdweather_soundscape_id_cache('search', soundscape_filename)

# Didn't find a soundscape id for the file we're processing, upload it to Birdweather and cache it's soundscape id
if not find_existing_soundscape['soundscape_found']:
if debug_birdweather_submissions:
print(f'BirdWeather Submission:: Did not find soundscape {soundscape_filename} in cache, Posting soundscape to BirdWeather', flush=True)

# Submit the soundscape submission
soundscape_async_response = requests.post(url=soundscape_url, data=wave_sound_data,
headers={'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip'},
timeout=bw_default_post_timeout)
# Raise a error if response is not 2XX
soundscape_async_response.raise_for_status()

# Spit out the whole dict response if debugging
if debug_birdweather_submissions:
print(f'BirdWeather Submission:: DEBUG:: Soundscape POST - RESPONSE: {soundscape_async_response}', flush=True)

# Extract some data
soundscape_response_json = soundscape_async_response.json()
# Get the soundscape id for the soundscape uploaded
soundscape_id = soundscape_response_json['soundscape']['id']

# Cache the soundscape id for corresponding to the filename the wave uploaded
birdweather_soundscape_id_cache('add', soundscape_filename, soundscape_id)

print(f"BirdWeather Submission:: Soundscape Successfully Uploaded - status:{soundscape_async_response.status_code} soundscape_id:{soundscape_id}", flush=True)
else:
# We found the soundscape filename and the Birdweather Soundscape ID for it, soundscape considered uploaded & use the ID for the detection
soundscape_id = find_existing_soundscape['soundscape_id']
if debug_birdweather_submissions:
print(f"BirdWeather Submission:: Found Existing Soundscape in cache for {soundscape_filename} - using soundscape_id:{soundscape_id}", flush=True)

# Break the loop, if we reach here then were no exceptions and the detection posted successfully
break
except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as conn_exec:
print(f"BirdWeather Submission Error:: Soundscape POST - Connection Error! - {conn_exec}", flush=True)
continue
except requests.exceptions.RequestException as request_exc:
# Check if the status code is one that we can try on
if request_exc.response.status_code in bw_request_retry_on_status:

if debug_birdweather_submissions:
extra_debug_output = f' - {request_exc.response.reason} - {request_exc}'

print(f"BirdWeather Submission Error:: Soundscape POST - HTTP Request Exception! {extra_debug_output}", flush=True)
continue
else:
print(f"BirdWeather Submission Error:: Soundscape POST - HTTP Request Exception! Cannot retry - {request_exc}", flush=True)
# break the loop on non-retryable status
break
except (requests.exceptions.JSONDecodeError, requests.exceptions.InvalidJSONError) as json_error_exec:
print(f'BirdWeather Submission Error:: Soundscape POST - Something went wrong decoding JSON data - {json_error_exec}', flush=True)
except BaseException as ss_ex:
print(f'BirdWeather Submission Error:: Soundscape POST - Something went wrong - {ss_ex}', flush=True)

##################################
# DETECTION UPLOAD #############
##################################
# Loop for the max number of retries
for detect_p_rt in range(bw_request_max_retries):
extra_debug_output = ''
# Don't calculate or sleep on the first loop as this is the initial attempt
if detect_p_rt > 0:
# Calculate the backoff time before making another request
request_backoff_time = bw_request_backoff_factor * (2 ** (detect_p_rt - 1))
# We're retrying, retry after the calculated backoff time
print(f'BirdWeather Submission Error:: Retrying after {request_backoff_time}s, Retry ({detect_p_rt} of {bw_request_max_retries})', flush=True)
time.sleep(request_backoff_time)

# We need to substitute in the soundscape_id into the detection_post_json data, since it the ID isn't available until the soundscape is uploaded
# and because we submitted the full json data with a placeholder set for the soundscape_id
detection_post_json = detection_post_json.replace("{{soundscape_id}}", str(soundscape_id))

# Some debugging output if needed
if debug_birdweather_submissions:
print(f'BirdWeather Submission:: DEBUG:: Detection POST - detection_url: {detection_url} - detection_json: {detection_post_json} - Soundscape_ID: {soundscape_id}', flush=True)

# Submit the detection
try:
# POST detection to server
detection_async_response = requests.post(detection_url,
json=json.loads(detection_post_json),
timeout=bw_default_request_timeout)
# Raise a error if response is not 2XX
detection_async_response.raise_for_status()

# Spit out the whole dict response if debugging
if debug_birdweather_submissions:
print(f'BirdWeather Submission:: DEBUG:: Detection POST - RESPONSE: {detection_async_response}', flush=True)

# Extract some data
detection_response_status_json = detection_async_response.json()

# Check the response
# Extract the bird detection info to display in the output
bird_detection_string = "N/A"
if 'detection' in detection_response_status_json:
bird_detection_name = detection_response_status_json['detection']['species']['commonName']
bird_detection_confidence = detection_response_status_json['detection']['confidence']
bird_detection_timestamp = datetime.datetime.fromisoformat(
detection_response_status_json['detection']['timestamp'])
bird_detection_time = bird_detection_timestamp.time()
bird_detection_string = f"- {bird_detection_time}/{bird_detection_name}/{bird_detection_confidence}"

if debug_birdweather_submissions:
# Add in the JSON response if debugging just in case we might want to view it
extra_debug_output = f'- json:{detection_response_status_json}'

print(f"BirdWeather Submission:: Detection Successfully Uploaded - status:{detection_async_response.status_code} {bird_detection_string} {extra_debug_output}", flush=True)

# Break the loop, if we reach here then were no exceptions and the detection posted successfully
break
except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as conn_exec:
print(f"BirdWeather Submission Error:: Detection POST - Connection Error! - {conn_exec}", flush=True)
continue
except requests.exceptions.RequestException as request_exc:
# Check if the status code is one that we can try on
req_status_code = request_exc.response.status_code
if req_status_code in bw_request_retry_on_status:

if debug_birdweather_submissions:
extra_debug_output = f'- {request_exc.response.reason} - {request_exc}'

print(f"BirdWeather Submission Error:: Detection POST - HTTP Request Exception! {extra_debug_output}", flush=True)
continue
else:
print(
f"BirdWeather Submission Error:: Detection POST - HTTP Request Exception! Cannot retry - {request_exc}",
flush=True)
# break the loop on non-retryable status
break
except (requests.exceptions.JSONDecodeError, requests.exceptions.InvalidJSONError) as json_error_exec:
print(
f'BirdWeather Submission Error:: Detection POST - Something went wrong decoding JSON data - {json_error_exec}',
flush=True)
except BaseException as dp_ex:
print(f'BirdWeather Submission Error:: Detection POST - Something went wrong - {dp_ex}', flush=True)


def birdweather_submission_processor():
if debug_birdweather_submissions:
print("Starting: birdweather_submission_processor thread")

# Loop over the queue containing the data used for the BirdWeather submissions
while True:
# Get the BirdWeather submission data, this is a dictionary containing the necessary data
bw_submission_data = bw_submission_queue.get()

if debug_birdweather_submissions:
print(
f"Processing: Soundscape:{bw_submission_data['soundscape_url']}, Detection:{bw_submission_data['detection_url']}, Detection_Data:{bw_submission_data['detection_post_json']}")

# Perform the submission
birdweather_submit(bw_submission_data)
# Processing finished so the task is now done
bw_submission_queue.task_done()


def birdweather_soundscape_id_cache(mode, soundscape_filename, bw_soundscape_id=None):
global bw_soundscape_submission_id_cache
ss_id_was_found = False
ss_id_to_return = 0

if debug_birdweather_submissions:
print(f"birdweather_soundscape_id_cache - mode:{mode} - for {soundscape_filename}")

if mode == 'search':
# Search the list for the filename
for soundscape_submission in bw_soundscape_submission_id_cache:
this_ss_filename = soundscape_submission['soundscape_filename']
this_ss_id = soundscape_submission['soundscape_id']
# If this soundscape filename matches the one we're searching for, return the bw soundscape id
if this_ss_filename == soundscape_filename:
ss_id_was_found = True
ss_id_to_return = this_ss_id
if debug_birdweather_submissions:
print(
f"birdweather_soundscape_id_cache - Found {soundscape_filename} with soundscape_id:{ss_id_to_return}")
break

return {'soundscape_found': ss_id_was_found, 'soundscape_id': ss_id_to_return}
elif mode == 'add':
# If a filename AND soundscape ID has been supplied then we want to store it in the list
if soundscape_filename is not None and bw_soundscape_id is not None:
# Check the length of the list first
if len(bw_soundscape_submission_id_cache) >= bw_soundscape_submission_id_cache_limit:
# Remove the first item in the list before inserting a new item
del bw_soundscape_submission_id_cache[0]

# Create a new dict containing the appropriate data
new_ss_submission = dict()
new_ss_submission['soundscape_filename'] = soundscape_filename
new_ss_submission['soundscape_id'] = bw_soundscape_id
# Append it to the list
bw_soundscape_submission_id_cache.append(new_ss_submission)

if debug_birdweather_submissions:
print(
f"birdweather_soundscape_id_cache - Inserting entry soundscape_filename:{soundscape_filename} - soundscape_id:{bw_soundscape_id}")


def start():
# Load model
global INTERPRETER, INCLUDE_LIST, EXCLUDE_LIST
INTERPRETER = loadModel()

# Run the BirdWeather submission queue processor in a thread
bw_submission = threading.Thread(target=birdweather_submission_processor)
bw_worker_threads.append(bw_submission)
bw_submission.start()

server.listen()
# print(f"[LISTENING] Server is listening on {SERVER}")
while True:
Expand Down

0 comments on commit ddfcda0

Please sign in to comment.