Skip to content

Commit

Permalink
CELE-119 feat: Replace niquests with curl
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsobspinto committed Dec 19, 2024
1 parent 5ca9fa6 commit f3125ca
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 26 deletions.
10 changes: 3 additions & 7 deletions applications/visualizer/backend/api/decorators/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def run_func():
class QueueWriter:
def write(self, data):
if data:
q.put(data) # Push data into the thread-safe queue
q.put(data)

def flush(self):
pass # For compatibility with print
Expand All @@ -46,16 +46,12 @@ def flush(self):
# Async generator to yield lines from the queue
async def line_generator():
while True:
line = await asyncio.to_thread(q.get) # Get item from thread-safe queue
line = await asyncio.to_thread(q.get)
if line is None: # End signal
break
yield line
await asyncio.sleep(0) # Yield control to event loop

# Return a streaming response that sends data asynchronously
response = StreamingHttpResponse(line_generator(), content_type="text/plain")
response['Cache-Control'] = 'no-cache'
response['X-Accel-Buffering'] = 'no' # Disable nginx buffering if using nginx
return response
return StreamingHttpResponse(line_generator(), content_type="text/plain")

return wrapper
33 changes: 15 additions & 18 deletions ingestion/ingestion/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import os
import sys
import tempfile
from argparse import ArgumentParser, Namespace
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
Expand All @@ -14,7 +13,6 @@
from google.api_core.exceptions import PreconditionFailed
from google.cloud import storage
from pydantic import ValidationError
import niquests
from tqdm import tqdm

from ingestion.cli import ask, type_directory, type_file
Expand Down Expand Up @@ -476,7 +474,6 @@ def upload_em_tiles(

pbar.close()


def trigger_populate_db(args):
try:
api_url = args.populate_db_url
Expand All @@ -495,27 +492,27 @@ def trigger_populate_db(args):
)
return

# Make a GET request to the streaming endpoint with basic auth
r = niquests.get(f"{api_url}", auth=(client_id, private_key_id), stream=True, timeout=None)
import subprocess

command = [
"curl",
"-u", f"{client_id}:{private_key_id}",
f"{api_url}"
]

with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
try:
for line in proc.stdout:
print(line, end='') # Print each line as received
except KeyboardInterrupt:
proc.terminate()
print("\nStreaming interrupted by user.", file=sys.stderr)

if r.status_code == 200:
for line in r.iter_lines():
# filter out keep-alive new lines
if line:
decoded_line = line.decode("utf-8")
print(decoded_line)
else:
print(
f"Error triggering DB population: {r.status_code} {r.text}",
file=sys.stderr,
)

except FileNotFoundError as e:
print(f"Error: Credentials file not found. {e}", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in the credentials file. {e}", file=sys.stderr)
except niquests.RequestException as e:
print(f"Error: Failed to make a request to the server. {e}", file=sys.stderr)
except Exception as e:
print(f"Unexpected error: {e}", file=sys.stderr)

Expand Down
1 change: 0 additions & 1 deletion ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies = [
"google-cloud-storage==2.18.2",
"tqdm==4.66.5",
"pillow==10.4.0",
"niquests==3.7.2",

# extraction dependencies
"diplib==3.5.1",
Expand Down

0 comments on commit f3125ca

Please sign in to comment.