Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull request update/241104 #452

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arcee/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ENV PYTHONPATH /usr/src/app/

COPY arcee/arcee_receiver/requirements.txt arcee/arcee_receiver/requirements.txt
COPY optscale_client/aconfig_cl optscale_client/aconfig_cl
COPY optscale_client/config_client optscale_client/config_client
COPY tools/optscale_time tools/optscale_time

RUN pip install --no-cache-dir -r /usr/src/app/arcee/arcee_receiver/requirements.txt
Expand Down
13 changes: 12 additions & 1 deletion arcee/arcee_receiver/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing_extensions import Annotated


class ArceeState:
class ArceeState(int, Enum):
STARTED = 1
FINISHED = 2
ERROR = 3
Expand Down Expand Up @@ -115,6 +115,17 @@ class TaskPatchIn(BaseClass):
owner_id: Optional[str] = None


class TaskPostIn(TaskPatchIn):
key: str
metrics: List[str] = []


class Task(TaskPostIn):
id: str = id_
token: str
deleted_at: int = 0


class LeaderboardFilter(BaseClass):
id: str = Field(description='metric id to filter by')
min: Optional[float] = None
Expand Down
59 changes: 59 additions & 0 deletions arcee/arcee_receiver/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import asyncio
from functools import partial
import concurrent.futures

from kombu import Connection as QConnection, Exchange
from kombu.pools import producers


from optscale_client.config_client.client import Client as ConfigClient


class ActivitiesTaskProducer:
EXCHANGE_NAME = 'activities-tasks'
RETRY_POLICY = {'max_retries': 15, 'interval_start': 0,
'interval_step': 1, 'interval_max': 3}
RESCHEDULE_TIMEOUT = 60 * 60 * 12

executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def __init__(self):
self._config_cl = None

@classmethod
async def run_async(cls, func, *args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
if executor is None:
executor = cls.executor
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)

@property
def config_cl(self):
if self._config_cl is None:
config_cl = ConfigClient(
host=os.environ.get('HX_ETCD_HOST'),
port=int(os.environ.get('HX_ETCD_PORT')),
)
self._config_cl = config_cl
return self._config_cl

def create_task(self, task, routing_key):
params = self.config_cl.read_branch('/rabbit')
conn_str = f'amqp://{params["user"]}:{params["pass"]}@' \
f'{params["host"]}:{params["port"]}'
queue_conn = QConnection(conn_str, transport_options=self.RETRY_POLICY)

task_exchange = Exchange(self.EXCHANGE_NAME, type='topic')
with producers[queue_conn].acquire(block=True) as producer:
producer.publish(
task,
serializer='json',
exchange=task_exchange,
declare=[task_exchange],
routing_key=routing_key,
retry=True,
retry_policy=self.RETRY_POLICY
)
2 changes: 2 additions & 0 deletions arcee/arcee_receiver/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ aiohttp==3.10.2
sanic==23.12.1
sanic-ext==23.12.0
motor==3.6.0
kombu==5.3.4
pymongo==4.9.1
python-etcd==0.4.5
mongodb-migrations==1.2.1
pydantic==2.4.2
# OptScale packages
-e optscale_client/config_client
-e optscale_client/aconfig_cl
-e tools/optscale_time
Loading
Loading