Skip to content

Commit

Permalink
upgrade to Python 3.8 + Applications pre-release
Browse files Browse the repository at this point in the history
[agent] support of applictions, no_proxy env, pull_policy reimplemented
[SDK] api to upload videos api
[SDK] advanced api to work with object tags (work in progress)
[SDK/annotation] stats methods
[SDK/api] ignore_task_id (internal usage) + print retries
[SDK/api/file_api] api to work with files
[SDK/geometries] transformations (to_polygon, to_bitmap, ...)
[SDK/api/project + dataset] add reference_image_url
[dockerimages] upgrade all images, will be released later
[images] alpha channel support
[sdk/label] fix crop to keep ids
[new images import] not released
[annotation] add id field
[nn] new default batch sizes
and more ...
  • Loading branch information
mkolomeychenko committed Oct 21, 2020
1 parent d1915e3 commit a24e936
Show file tree
Hide file tree
Showing 104 changed files with 2,701 additions and 488 deletions.
11 changes: 10 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ internal/tests/flt_format_insurance/*
*/internal/*
/help/tutorials/internal/07_nn_integration_options_explaned/07_nn_integration_options_explained.md
plugins/python/src/internal
help/tutorials/internal
help/tutorials/internal

ecosystem/sys_apps/*
!ecosystem/sys_apps/init.sh
ecosystem/reports/*
!ecosystem/reports/init.sh
ecosystem/apps/*
!ecosystem/apps/init.sh
ecosystem/projects/*
!ecosystem/projects/init.sh
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<img src="https://img.shields.io/badge/PRs-welcome-brightgreen.svg">
</p>


<p align="center">
<a href="#introduction">Introduction</a> •
<a href="#agent">Agent</a> •
Expand Down
2 changes: 2 additions & 0 deletions agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ RUN apt-get update \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

RUN pip install docker --upgrade

############### copy code ###############
ARG MODULE_PATH
COPY $MODULE_PATH /workdir
Expand Down
2 changes: 1 addition & 1 deletion agent/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
agent:6.0.24
agent:6.1.0
25 changes: 20 additions & 5 deletions agent/src/worker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import subprocess
import os
import supervisely_lib as sly
import uuid

from worker import constants
from worker.task_factory import create_task
from worker.logs_to_rpc import add_task_handler
from worker.agent_utils import LogQueue
from worker.system_info import get_hw_info, get_self_docker_image_digest
from worker.image_streamer import ImageStreamer
from worker.app_file_streamer import AppFileStreamer
from worker.telemetry_reporter import TelemetryReporter


Expand Down Expand Up @@ -99,7 +100,6 @@ def agent_connect_initially(self):
docker_img_info = subprocess.Popen([docker_inspect_cmd],
shell=True, executable="/bin/bash",
stdout=subprocess.PIPE).communicate()[0]

self.agent_info = {
'hardware_info': hw_info,
'agent_image': json.loads(docker_img_info)["Config"]["Image"],
Expand All @@ -120,6 +120,7 @@ def get_new_task(self):
task_msg = json.loads(task.data)
task_msg['agent_info'] = self.agent_info
self.logger.info('GET_NEW_TASK', extra={'received_task_id': task_msg['task_id']})
self.logger.debug('FULL_TASK_MESSAGE', extra={'task_msg': task_msg})
self.start_task(task_msg)

def get_stop_task(self):
Expand Down Expand Up @@ -159,12 +160,24 @@ def start_task(self, task):
self.task_pool_lock.acquire()
try:
if task['task_id'] in self.task_pool:
self.logger.warning('TASK_ID_ALREADY_STARTED', extra={'task_id': task['task_id']})
#@TODO: remove - ?? only app will receive its messages (skip app button's messages)
if task['task_type'] != "app":
self.logger.warning('TASK_ID_ALREADY_STARTED', extra={'task_id': task['task_id']})
else:
# request to application is duplicated to agent for debug purposes
pass
else:
task_id = task['task_id']
task["agent_version"] = self.agent_info["agent_version"]
self.task_pool[task_id] = create_task(task, self.docker_api)
self.task_pool[task_id].start()
try:
self.task_pool[task_id] = create_task(task, self.docker_api)
self.task_pool[task_id].start()
except Exception as e:
self.logger.critical('Unexpected exception in task start.', exc_info=True, extra={
'event_type': sly.EventType.TASK_CRASHED,
'exc_str': str(e),
})

finally:
self.task_pool_lock.release()

Expand Down Expand Up @@ -265,6 +278,8 @@ def inf_loop(self):
self.thread_list.append(self.thread_pool.submit(sly.function_wrapper_external_logger, self.send_connect_info, self.logger))
self.thread_list.append(
self.thread_pool.submit(sly.function_wrapper_external_logger, self.follow_daemon, self.logger, TelemetryReporter, 'TELEMETRY_REPORTER'))
# self.thread_list.append(
# self.thread_pool.submit(sly.function_wrapper_external_logger, self.follow_daemon, self.logger, AppFileStreamer, 'APP_FILE_STREAMER'))

def wait_all(self):
def terminate_all_deamons():
Expand Down
14 changes: 14 additions & 0 deletions agent/src/worker/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,17 @@ def get_single_item_or_die(src_dict, key, dict_name):
'Multiple values ({}) were found for {} in {}. A list with exactly one item is required.'.format(
len(results), key, dict_name))
return results[0]


def add_creds_to_git_url(git_url):
old_str = None
if 'https://' in git_url:
old_str = 'https://'
elif 'http://' in git_url:
old_str = 'http://'
res = git_url
if constants.GIT_LOGIN() is not None and constants.GIT_PASSWORD() is not None:
res = git_url.replace(old_str, '{}{}:{}@'.format(old_str, constants.GIT_LOGIN(), constants.GIT_PASSWORD()))
return res
else:
return git_url
75 changes: 75 additions & 0 deletions agent/src/worker/app_file_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# coding: utf-8

import concurrent.futures
import json
import base64

import supervisely_lib as sly

from worker import constants
from worker.task_logged import TaskLogged


class AppFileStreamer(TaskLogged):
def __init__(self):
super().__init__({'task_id': 'file_streamer'})
self.thread_pool = None

def init_logger(self):
super().init_logger()
sly.change_formatters_default_values(self.logger, 'worker', 'file_streamer')

def init_additional(self):
super().init_additional()
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task_main_func(self):
try:
self.logger.info('FILE_STREAMER_INITIALIZED')
for gen_event in self.api.get_endless_stream('GetGeneralEventsStream',
sly.api_proto.GeneralEvent, sly.api_proto.Empty()):
event_obj = {
'request_id': gen_event.request_id,
'data': json.loads(gen_event.data.decode('utf-8')),
}
self.logger.debug('GET_STREAM_FILE_CALL', extra=event_obj)
self.thread_pool.submit(sly.function_wrapper_nofail, self.stream_file, event_obj)

except Exception as e:
self.logger.critical('FILE_STREAMER_CRASHED', exc_info=True, extra={
'event_type': sly.EventType.TASK_CRASHED,
'exc_str': str(e),
})

def stream_file(self, event_obj):
# @TODO: path to basee64: hash = base64.b64encode(path.encode("utf-8")).decode("utf-8")
data_hash = event_obj['data']['hash']
suffix = event_obj['data']['ext']
st_path = base64.b64decode(data_hash).decode("utf-8")

#st_path = self.data_mgr.storage.images.check_storage_object(data_hash=event_obj['data']['hash'],
# suffix=event_obj['data']['ext'])

if st_path is None:
def chunk_generator():
yield sly.api_proto.Chunk(error='STREAMER_FILE_NOT_FOUND')

try:
self.api.put_stream_with_data('SendGeneralEventData', sly.api_proto.Empty, chunk_generator(),
addit_headers={'x-request-id': event_obj['request_id']})
except:
pass

return

file_size = sly.fs.get_file_size(st_path)

def chunk_generator():
with open(st_path, 'rb') as file_:
for chunk_start, chunk_size in sly.ChunkSplitter(file_size, constants.NETW_CHUNK_SIZE()):
bytes_chunk = file_.read(chunk_size)
yield sly.api_proto.Chunk(buffer=bytes_chunk, total_size=file_size)

self.api.put_stream_with_data('SendGeneralEventData', sly.api_proto.Empty, chunk_generator(),
addit_headers={'x-request-id': event_obj['request_id']})
self.logger.debug("FILE_STREAMED", extra=event_obj)
60 changes: 48 additions & 12 deletions agent/src/worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from urllib.parse import urlparse
import supervisely_lib as sly
import hashlib
from enum import Enum
from supervisely_lib.io.docker_utils import PullPolicy


_AGENT_HOST_DIR = 'AGENT_HOST_DIR'
Expand All @@ -24,6 +24,7 @@
_DOCKER_API_CALL_TIMEOUT = 'DOCKER_API_CALL_TIMEOUT'
_HTTP_PROXY = 'HTTP_PROXY'
_HTTPS_PROXY = 'HTTPS_PROXY'
_NO_PROXY = 'NO_PROXY'
_PUBLIC_API_RETRY_LIMIT = 'PUBLIC_API_RETRY_LIMIT'

# container limits
Expand All @@ -33,6 +34,9 @@

_PULL_POLICY = 'PULL_POLICY'

_GIT_LOGIN = 'GIT_LOGIN'
_GIT_PASSWORD = 'GIT_PASSWORD'
_GITHUB_TOKEN = 'GITHUB_TOKEN'

_REQUIRED_SETTINGS = [
_AGENT_HOST_DIR,
Expand All @@ -44,16 +48,6 @@
]


class PullPolicy(Enum):
def __str__(self):
return str(self.value)

ALWAYS = 'Always'.lower()
IF_AVAILABLE = 'IfAvailable'.lower()
IF_NOT_PRESENT = 'IfNotPresent'.lower()
NEVER = 'Never'.lower()


_PULL_POLICY_DICT = {
str(PullPolicy.ALWAYS): PullPolicy.ALWAYS,
str(PullPolicy.IF_AVAILABLE): PullPolicy.IF_AVAILABLE,
Expand All @@ -72,11 +66,15 @@ def __str__(self):
_DOCKER_API_CALL_TIMEOUT: '60',
_HTTP_PROXY: "",
_HTTPS_PROXY: "",
_NO_PROXY: "",
_PUBLIC_API_RETRY_LIMIT: 100,
_CPU_PERIOD: None,
_CPU_QUOTA: None,
_MEM_LIMIT: None,
_PULL_POLICY: str(PullPolicy.IF_AVAILABLE)
_PULL_POLICY: str(PullPolicy.IF_AVAILABLE), #str(PullPolicy.NEVER),
_GIT_LOGIN: None,
_GIT_PASSWORD: None,
_GITHUB_TOKEN: None
}


Expand Down Expand Up @@ -244,6 +242,9 @@ def HTTP_PROXY():
def HTTPS_PROXY():
return read_optional_setting(_HTTPS_PROXY)

def NO_PROXY():
return read_optional_setting(_NO_PROXY)


def PUBLIC_API_RETRY_LIMIT():
return int(read_optional_setting(_PUBLIC_API_RETRY_LIMIT))
Expand Down Expand Up @@ -281,6 +282,38 @@ def PULL_POLICY():
return _PULL_POLICY_DICT[val]


def GIT_LOGIN():
return read_optional_setting(_GIT_LOGIN)


def GIT_PASSWORD():
return read_optional_setting(_GIT_PASSWORD)


# def AGENT_APP_SOURCES_DIR():
# return os.path.join(AGENT_ROOT_DIR(), 'app_sources')
#
#
# def AGENT_APP_SOURCES_DIR_HOST():
# return os.path.join(HOST_DIR(), 'app_sources')


def AGENT_APP_SESSIONS_DIR():
return os.path.join(AGENT_ROOT_DIR(), 'app_sessions')


def AGENT_APP_SESSIONS_DIR_HOST():
return os.path.join(HOST_DIR(), 'app_sessions')


def GITHUB_TOKEN():
return read_optional_setting(_GITHUB_TOKEN)


def APPS_STORAGE_DIR():
return os.path.join(AGENT_STORAGE_DIR(), "apps")


def init_constants():
sly.fs.mkdir(AGENT_LOG_DIR())
sly.fs.mkdir(AGENT_TASKS_DIR())
Expand All @@ -291,3 +324,6 @@ def init_constants():
sly.fs.mkdir(AGENT_IMPORT_DIR())
os.chmod(AGENT_IMPORT_DIR(), 0o777) # octal
PULL_ALWAYS()
#sly.fs.mkdir(AGENT_APP_SOURCES_DIR())
sly.fs.mkdir(AGENT_APP_SESSIONS_DIR())
sly.fs.mkdir(APPS_STORAGE_DIR())
Loading

0 comments on commit a24e936

Please sign in to comment.