From a24e936a69ce168164ce063d2802cfe2ba27db69 Mon Sep 17 00:00:00 2001
From: Maxim Kolomeychenko
Introduction • Agent • diff --git a/agent/Dockerfile b/agent/Dockerfile index 89097965f..d93cdefd3 100644 --- a/agent/Dockerfile +++ b/agent/Dockerfile @@ -35,6 +35,8 @@ RUN apt-get update \ && apt-get autoclean \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log +RUN pip install docker --upgrade + ############### copy code ############### ARG MODULE_PATH COPY $MODULE_PATH /workdir diff --git a/agent/VERSION b/agent/VERSION index 943734487..86567d210 100644 --- a/agent/VERSION +++ b/agent/VERSION @@ -1 +1 @@ -agent:6.0.24 +agent:6.1.0 diff --git a/agent/src/worker/agent.py b/agent/src/worker/agent.py index 1c38e4837..52ffc6737 100644 --- a/agent/src/worker/agent.py +++ b/agent/src/worker/agent.py @@ -8,13 +8,14 @@ import subprocess import os import supervisely_lib as sly +import uuid from worker import constants from worker.task_factory import create_task from worker.logs_to_rpc import add_task_handler from worker.agent_utils import LogQueue from worker.system_info import get_hw_info, get_self_docker_image_digest -from worker.image_streamer import ImageStreamer +from worker.app_file_streamer import AppFileStreamer from worker.telemetry_reporter import TelemetryReporter @@ -99,7 +100,6 @@ def agent_connect_initially(self): docker_img_info = subprocess.Popen([docker_inspect_cmd], shell=True, executable="/bin/bash", stdout=subprocess.PIPE).communicate()[0] - self.agent_info = { 'hardware_info': hw_info, 'agent_image': json.loads(docker_img_info)["Config"]["Image"], @@ -120,6 +120,7 @@ def get_new_task(self): task_msg = json.loads(task.data) task_msg['agent_info'] = self.agent_info self.logger.info('GET_NEW_TASK', extra={'received_task_id': task_msg['task_id']}) + self.logger.debug('FULL_TASK_MESSAGE', extra={'task_msg': task_msg}) self.start_task(task_msg) def get_stop_task(self): @@ -159,12 +160,24 @@ def start_task(self, task): self.task_pool_lock.acquire() try: if task['task_id'] in self.task_pool: - self.logger.warning('TASK_ID_ALREADY_STARTED', extra={'task_id': task['task_id']}) + #@TODO: remove - ?? only app will receive its messages (skip app button's messages) + if task['task_type'] != "app": + self.logger.warning('TASK_ID_ALREADY_STARTED', extra={'task_id': task['task_id']}) + else: + # request to application is duplicated to agent for debug purposes + pass else: task_id = task['task_id'] task["agent_version"] = self.agent_info["agent_version"] - self.task_pool[task_id] = create_task(task, self.docker_api) - self.task_pool[task_id].start() + try: + self.task_pool[task_id] = create_task(task, self.docker_api) + self.task_pool[task_id].start() + except Exception as e: + self.logger.critical('Unexpected exception in task start.', exc_info=True, extra={ + 'event_type': sly.EventType.TASK_CRASHED, + 'exc_str': str(e), + }) + finally: self.task_pool_lock.release() @@ -265,6 +278,8 @@ def inf_loop(self): self.thread_list.append(self.thread_pool.submit(sly.function_wrapper_external_logger, self.send_connect_info, self.logger)) self.thread_list.append( self.thread_pool.submit(sly.function_wrapper_external_logger, self.follow_daemon, self.logger, TelemetryReporter, 'TELEMETRY_REPORTER')) + # self.thread_list.append( + # self.thread_pool.submit(sly.function_wrapper_external_logger, self.follow_daemon, self.logger, AppFileStreamer, 'APP_FILE_STREAMER')) def wait_all(self): def terminate_all_deamons(): diff --git a/agent/src/worker/agent_utils.py b/agent/src/worker/agent_utils.py index eb5c86958..9a36c9770 100644 --- a/agent/src/worker/agent_utils.py +++ b/agent/src/worker/agent_utils.py @@ -91,3 +91,17 @@ def get_single_item_or_die(src_dict, key, dict_name): 'Multiple values ({}) were found for {} in {}. A list with exactly one item is required.'.format( len(results), key, dict_name)) return results[0] + + +def add_creds_to_git_url(git_url): + old_str = None + if 'https://' in git_url: + old_str = 'https://' + elif 'http://' in git_url: + old_str = 'http://' + res = git_url + if constants.GIT_LOGIN() is not None and constants.GIT_PASSWORD() is not None: + res = git_url.replace(old_str, '{}{}:{}@'.format(old_str, constants.GIT_LOGIN(), constants.GIT_PASSWORD())) + return res + else: + return git_url diff --git a/agent/src/worker/app_file_streamer.py b/agent/src/worker/app_file_streamer.py new file mode 100644 index 000000000..c9a1a3095 --- /dev/null +++ b/agent/src/worker/app_file_streamer.py @@ -0,0 +1,75 @@ +# coding: utf-8 + +import concurrent.futures +import json +import base64 + +import supervisely_lib as sly + +from worker import constants +from worker.task_logged import TaskLogged + + +class AppFileStreamer(TaskLogged): + def __init__(self): + super().__init__({'task_id': 'file_streamer'}) + self.thread_pool = None + + def init_logger(self): + super().init_logger() + sly.change_formatters_default_values(self.logger, 'worker', 'file_streamer') + + def init_additional(self): + super().init_additional() + self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) + + def task_main_func(self): + try: + self.logger.info('FILE_STREAMER_INITIALIZED') + for gen_event in self.api.get_endless_stream('GetGeneralEventsStream', + sly.api_proto.GeneralEvent, sly.api_proto.Empty()): + event_obj = { + 'request_id': gen_event.request_id, + 'data': json.loads(gen_event.data.decode('utf-8')), + } + self.logger.debug('GET_STREAM_FILE_CALL', extra=event_obj) + self.thread_pool.submit(sly.function_wrapper_nofail, self.stream_file, event_obj) + + except Exception as e: + self.logger.critical('FILE_STREAMER_CRASHED', exc_info=True, extra={ + 'event_type': sly.EventType.TASK_CRASHED, + 'exc_str': str(e), + }) + + def stream_file(self, event_obj): + # @TODO: path to basee64: hash = base64.b64encode(path.encode("utf-8")).decode("utf-8") + data_hash = event_obj['data']['hash'] + suffix = event_obj['data']['ext'] + st_path = base64.b64decode(data_hash).decode("utf-8") + + #st_path = self.data_mgr.storage.images.check_storage_object(data_hash=event_obj['data']['hash'], + # suffix=event_obj['data']['ext']) + + if st_path is None: + def chunk_generator(): + yield sly.api_proto.Chunk(error='STREAMER_FILE_NOT_FOUND') + + try: + self.api.put_stream_with_data('SendGeneralEventData', sly.api_proto.Empty, chunk_generator(), + addit_headers={'x-request-id': event_obj['request_id']}) + except: + pass + + return + + file_size = sly.fs.get_file_size(st_path) + + def chunk_generator(): + with open(st_path, 'rb') as file_: + for chunk_start, chunk_size in sly.ChunkSplitter(file_size, constants.NETW_CHUNK_SIZE()): + bytes_chunk = file_.read(chunk_size) + yield sly.api_proto.Chunk(buffer=bytes_chunk, total_size=file_size) + + self.api.put_stream_with_data('SendGeneralEventData', sly.api_proto.Empty, chunk_generator(), + addit_headers={'x-request-id': event_obj['request_id']}) + self.logger.debug("FILE_STREAMED", extra=event_obj) diff --git a/agent/src/worker/constants.py b/agent/src/worker/constants.py index 00836ab8d..1a651b7a1 100644 --- a/agent/src/worker/constants.py +++ b/agent/src/worker/constants.py @@ -4,7 +4,7 @@ from urllib.parse import urlparse import supervisely_lib as sly import hashlib -from enum import Enum +from supervisely_lib.io.docker_utils import PullPolicy _AGENT_HOST_DIR = 'AGENT_HOST_DIR' @@ -24,6 +24,7 @@ _DOCKER_API_CALL_TIMEOUT = 'DOCKER_API_CALL_TIMEOUT' _HTTP_PROXY = 'HTTP_PROXY' _HTTPS_PROXY = 'HTTPS_PROXY' +_NO_PROXY = 'NO_PROXY' _PUBLIC_API_RETRY_LIMIT = 'PUBLIC_API_RETRY_LIMIT' # container limits @@ -33,6 +34,9 @@ _PULL_POLICY = 'PULL_POLICY' +_GIT_LOGIN = 'GIT_LOGIN' +_GIT_PASSWORD = 'GIT_PASSWORD' +_GITHUB_TOKEN = 'GITHUB_TOKEN' _REQUIRED_SETTINGS = [ _AGENT_HOST_DIR, @@ -44,16 +48,6 @@ ] -class PullPolicy(Enum): - def __str__(self): - return str(self.value) - - ALWAYS = 'Always'.lower() - IF_AVAILABLE = 'IfAvailable'.lower() - IF_NOT_PRESENT = 'IfNotPresent'.lower() - NEVER = 'Never'.lower() - - _PULL_POLICY_DICT = { str(PullPolicy.ALWAYS): PullPolicy.ALWAYS, str(PullPolicy.IF_AVAILABLE): PullPolicy.IF_AVAILABLE, @@ -72,11 +66,15 @@ def __str__(self): _DOCKER_API_CALL_TIMEOUT: '60', _HTTP_PROXY: "", _HTTPS_PROXY: "", + _NO_PROXY: "", _PUBLIC_API_RETRY_LIMIT: 100, _CPU_PERIOD: None, _CPU_QUOTA: None, _MEM_LIMIT: None, - _PULL_POLICY: str(PullPolicy.IF_AVAILABLE) + _PULL_POLICY: str(PullPolicy.IF_AVAILABLE), #str(PullPolicy.NEVER), + _GIT_LOGIN: None, + _GIT_PASSWORD: None, + _GITHUB_TOKEN: None } @@ -244,6 +242,9 @@ def HTTP_PROXY(): def HTTPS_PROXY(): return read_optional_setting(_HTTPS_PROXY) +def NO_PROXY(): + return read_optional_setting(_NO_PROXY) + def PUBLIC_API_RETRY_LIMIT(): return int(read_optional_setting(_PUBLIC_API_RETRY_LIMIT)) @@ -281,6 +282,38 @@ def PULL_POLICY(): return _PULL_POLICY_DICT[val] +def GIT_LOGIN(): + return read_optional_setting(_GIT_LOGIN) + + +def GIT_PASSWORD(): + return read_optional_setting(_GIT_PASSWORD) + + +# def AGENT_APP_SOURCES_DIR(): +# return os.path.join(AGENT_ROOT_DIR(), 'app_sources') +# +# +# def AGENT_APP_SOURCES_DIR_HOST(): +# return os.path.join(HOST_DIR(), 'app_sources') + + +def AGENT_APP_SESSIONS_DIR(): + return os.path.join(AGENT_ROOT_DIR(), 'app_sessions') + + +def AGENT_APP_SESSIONS_DIR_HOST(): + return os.path.join(HOST_DIR(), 'app_sessions') + + +def GITHUB_TOKEN(): + return read_optional_setting(_GITHUB_TOKEN) + + +def APPS_STORAGE_DIR(): + return os.path.join(AGENT_STORAGE_DIR(), "apps") + + def init_constants(): sly.fs.mkdir(AGENT_LOG_DIR()) sly.fs.mkdir(AGENT_TASKS_DIR()) @@ -291,3 +324,6 @@ def init_constants(): sly.fs.mkdir(AGENT_IMPORT_DIR()) os.chmod(AGENT_IMPORT_DIR(), 0o777) # octal PULL_ALWAYS() + #sly.fs.mkdir(AGENT_APP_SOURCES_DIR()) + sly.fs.mkdir(AGENT_APP_SESSIONS_DIR()) + sly.fs.mkdir(APPS_STORAGE_DIR()) diff --git a/agent/src/worker/task_app.py b/agent/src/worker/task_app.py new file mode 100644 index 000000000..5a95e9812 --- /dev/null +++ b/agent/src/worker/task_app.py @@ -0,0 +1,265 @@ +# coding: utf-8 + +import os +from worker import constants +import requests +import tarfile +import shutil +import json +from pathlib import Path + +import supervisely_lib as sly +from .task_dockerized import TaskDockerized +from supervisely_lib.io.json import dump_json_file +from supervisely_lib.io.json import flatten_json, modify_keys +from supervisely_lib.api.api import SUPERVISELY_TASK_ID +from supervisely_lib.api.api import Api +from supervisely_lib.io.fs import ensure_base_path, silent_remove, get_file_name, remove_dir, get_subdirs + +_ISOLATE = "isolate" + + +class TaskApp(TaskDockerized): + def __init__(self, *args, **kwargs): + self.app_config = None + self.dir_task_src = None + self.dir_task_container = None + self.dir_task_src_container = None + self._exec_id = None + self.app_info = None + super().__init__(*args, **kwargs) + + def init_task_dir(self): + # agent container paths + self.dir_task = os.path.join(constants.AGENT_APP_SESSIONS_DIR(), str(self.info['task_id'])) + self.dir_task_src = os.path.join(self.dir_task, 'repo') + # host paths + self.dir_task_host = os.path.join(constants.AGENT_APP_SESSIONS_DIR_HOST(), str(self.info['task_id'])) + # task container path + self.dir_task_container = os.path.join("/sessions", str(self.info['task_id'])) + self.dir_task_src_container = os.path.join(self.dir_task_container, 'repo') + self.app_info = self.info["appInfo"] + + def download_or_get_repo(self): + git_url = self.app_info["githubUrl"] + version = self.app_info.get("version", "master") + + already_downloaded = False + path_cache = None + if version != "master": + path_cache = os.path.join(constants.APPS_STORAGE_DIR(), *Path(git_url.replace(".git", "")).parts[1:], + version) + already_downloaded = sly.fs.dir_exists(path_cache) + + if already_downloaded is False: + self.logger.info("Git repo will be downloaded") + + api = Api(self.info['server_address'], self.info['api_token']) + tar_path = os.path.join(self.dir_task_src, 'repo.tar.gz') + api.app.download_git_archive(self.app_info["moduleId"], + self.app_info["id"], + version, + tar_path, + log_progress=True, + ext_logger=self.logger) + with tarfile.open(tar_path) as archive: + archive.extractall(self.dir_task_src) + + subdirs = get_subdirs(self.dir_task_src) + if len(subdirs) != 1: + raise RuntimeError("Repo is downloaded and extracted, but resulting directory not found") + extracted_path = os.path.join(self.dir_task_src, subdirs[0]) + + for filename in os.listdir(extracted_path): + shutil.move(os.path.join(extracted_path, filename), os.path.join(self.dir_task_src, filename)) + remove_dir(extracted_path) + silent_remove(tar_path) + + #git.download(git_url, self.dir_task_src, github_token, version) + if path_cache is not None: + shutil.copytree(self.dir_task_src, path_cache) + else: + self.logger.info("Git repo already exists") + shutil.copytree(path_cache, self.dir_task_src) + + def init_docker_image(self): + self.download_or_get_repo() + self.app_config = sly.io.json.load_json_file(os.path.join(self.dir_task_src, 'config.json')) + self.read_dockerimage_from_config() + super().init_docker_image() + + def read_dockerimage_from_config(self): + self.info['app_info'] = self.app_config + try: + self.info['docker_image'] = self.app_config['docker_image'] + except KeyError as e: + self.logger.critical('File \"config.json\" does not contain \"docker_image\" field') + + def is_isolate(self): + if self.app_config is None: + raise RuntimeError("App config is not initialized") + return self.app_config.get(_ISOLATE, True) + + def _get_task_volumes(self): + res = {} + if self.is_isolate(): + res[self.dir_task_host] = {'bind': self.dir_task_container, 'mode': 'rw'} + else: + res[constants.AGENT_APP_SESSIONS_DIR_HOST()] = {'bind': sly.app.SHARED_DATA, 'mode': 'rw'} + + #@TODO: cache ecoystem item in agent (remove copypaste download-extract logic) + # state = self.info.get('state', {}) + # if "slyEcosystemItemGitUrl" in state and "slyEcosystemItemId" in state and "slyEcosystemItemVersion" in state: + # path_cache = None + # if state["slyEcosystemItemVersion"] != "master": + # path_cache = os.path.join(constants.APPS_STORAGE_DIR(), + # *Path(state["slyEcosystemItemGitUrl"].replace(".git", "")).parts[1:], + # state["slyEcosystemItemVersion"]) + # already_downloaded = sly.fs.dir_exists(path_cache) + # if already_downloaded: + # constants.APPS_STORAGE_DIR + # res["/sly_ecosystem_item"] = {'bind': self.dir_task_container, 'mode': 'rw'} + # pass + + return res + + def download_step(self): + pass + + def find_or_run_container(self): + add_labels = {"sly_app": "1", "app_session_id": str(self.info['task_id'])} + if self.is_isolate(): + # spawn app session in new container + add_labels["isolate"] = "1" + sly.docker_utils.docker_pull_if_needed(self._docker_api, self.docker_image_name, constants.PULL_POLICY(), self.logger) + self.spawn_container(add_labels=add_labels) + else: + # spawn app session in existing container + add_labels["isolate"] = "0" + filter = {"ancestor": self.info['docker_image'], "label": ["sly_app=1", "isolate=0"]} + containers = self.docker_api.containers.list(all=True, filters=filter) + if len(containers) == 0: + sly.docker_utils.docker_pull_if_needed(self._docker_api, self.docker_image_name, constants.PULL_POLICY(), self.logger) + self.spawn_container(add_labels=add_labels) + else: + self._container = containers[0] + + def get_spawn_entrypoint(self): + return ["sh", "-c", "while true; do sleep 30; done;"] + + def exec_command(self, add_envs=None): + add_envs = sly.take_with_default(add_envs, {}) + main_script_path = os.path.join(self.dir_task_src_container, self.app_config.get('main_script', 'src/main.py')) + command = "python {}".format(main_script_path) + + self._exec_id = self._docker_api.api.exec_create(self._container.id, + cmd=command, + environment={ + 'LOG_LEVEL': 'DEBUG', + 'LANG': 'C.UTF-8', + 'PYTHONUNBUFFERED': '1', + constants._HTTP_PROXY: constants.HTTP_PROXY(), + constants._HTTPS_PROXY: constants.HTTPS_PROXY(), + 'HOST_TASK_DIR': self.dir_task_host, + 'TASK_ID': self.info['task_id'], + 'SERVER_ADDRESS': self.info['server_address'], + 'API_TOKEN': self.info['api_token'], + 'AGENT_TOKEN': constants.TOKEN(), + **add_envs + }) + self._logs_output = self._docker_api.api.exec_start(self._exec_id, stream=True, demux=False) + + #change pulling progress to app progress + progress_dummy = sly.Progress('Application is started ...', 1, ext_logger=self.logger) + progress_dummy.iter_done_report() + + self.logger.info("command is running", extra={"command": command}) + + def main_step(self): + self.find_or_run_container() + self.exec_command(add_envs= self.main_step_envs()) + self.process_logs() + self.drop_container_and_check_status() + + def upload_step(self): + pass + + def main_step_envs(self): + context = self.info.get('context', {}) + + context_envs = {} + if len(context) > 0: + context_envs = flatten_json(context) + context_envs = modify_keys(context_envs, prefix="context.") + + modal_state = self.info.get('state', {}) + modal_envs = {} + if len(modal_state) > 0: + modal_envs = flatten_json(modal_state) + modal_envs = modify_keys(modal_envs, prefix="modal.state.") + + envs = { + "CONTEXT": json.dumps(context), + "MODAL_STATE": json.dumps(modal_state), + **modal_envs, + "USER_ID": context.get("userId"), + "USER_LOGIN": context.get("userLogin"), + "TEAM_ID": context.get("teamId"), + "API_TOKEN": context.get("apiToken"), + **context_envs, + SUPERVISELY_TASK_ID: str(self.info['task_id']), + } + return envs + + def process_logs(self): + logs_found = False + + def _process_line(log_line): + #log_line = log_line.decode("utf-8") + msg, res_log, lvl = self.parse_log_line(log_line) + output = self.call_event_function(res_log) + + lvl_description = sly.LOGGING_LEVELS.get(lvl, None) + if lvl_description is not None: + lvl_int = lvl_description.int + else: + lvl_int = sly.LOGGING_LEVELS['INFO'].int + self.logger.log(lvl_int, msg, extra=res_log) + + #@TODO: parse multiline logs correctly (including exceptions) + log_line = "" + for log_line_arr in self._logs_output: + for log_part in log_line_arr.decode("utf-8").splitlines(): + logs_found = True + _process_line(log_part) + + if not logs_found: + self.logger.warn('No logs obtained from container.') # check if bug occurred + + def _stop_wait_container(self): + if self.is_isolate(): + return super()._stop_wait_container() + else: + return self.exec_stop() + + def exec_stop(self): + exec_info = self._docker_api.api.exec_inspect(self._exec_id) + if exec_info["Running"] == True: + pid = exec_info["Pid"] + self._container.exec_run(cmd="kill {}".format(pid)) + else: + return + + def _drop_container(self): + if self.is_isolate(): + super()._drop_container() + else: + self.exec_stop() + + def drop_container_and_check_status(self): + status = self._docker_api.api.exec_inspect(self._exec_id)['ExitCode'] + if self.is_isolate(): + self._drop_container() + self.logger.debug('Task container finished with status: {}'.format(str(status))) + if status != 0: + raise RuntimeError('Task container finished with non-zero status: {}'.format(str(status))) diff --git a/agent/src/worker/task_dockerized.py b/agent/src/worker/task_dockerized.py index 8c5566470..8ed7083aa 100644 --- a/agent/src/worker/task_dockerized.py +++ b/agent/src/worker/task_dockerized.py @@ -4,8 +4,6 @@ from threading import Lock import json import os -from docker.errors import DockerException, ImageNotFound as DockerImageNotFound -from packaging import version import supervisely_lib as sly @@ -43,10 +41,16 @@ def __init__(self, *args, **kwargs): self._container = None self._container_lock = Lock() # to drop container from different threads + + self.docker_image_name = None + self.init_docker_image() + + self.docker_pulled = False # in task + + def init_docker_image(self): self.docker_image_name = self.info.get('docker_image', None) if self.docker_image_name is not None and ':' not in self.docker_image_name: self.docker_image_name += ':latest' - self.docker_pulled = False # in task @property def docker_api(self): @@ -64,8 +68,6 @@ def report_step_done(self, curr_step): def task_main_func(self): self.task_dir_cleaner.forbid_dir_cleaning() - self.docker_pull_if_needed() - last_step_str = self.info.get('last_complete_step') self.logger.info('LAST_COMPLETE_STEP', extra={'step': last_step_str}) self.completed_step = self.step_name_mapping.get(last_step_str, TaskStep.NOTHING) @@ -120,105 +122,44 @@ def run(self): def clean_task_dir(self): self.task_dir_cleaner.clean() - def _docker_pull(self, raise_exception=True): - self.logger.info('Docker image will be pulled', extra={'image_name': self.docker_image_name}) - progress_dummy = sly.Progress('Pulling image...', 1, ext_logger=self.logger) - progress_dummy.iter_done_report() - try: - pulled_img = self._docker_api.images.pull(self.docker_image_name) - self.logger.info('Docker image has been pulled', extra={'pulled': {'tags': pulled_img.tags, 'id': pulled_img.id}}) - except DockerException as e: - if raise_exception is True: - raise DockerException('Unable to pull image: see actual error above. ' - 'Please, run the task again or contact support team.') - else: - self.logger.warn("Pulling step is skipped. Unable to pull image: {!r}.".format(str(e))) - - def _validate_version(self, agent_image, plugin_image): - self.logger.info('Check if agent and plugin versions are compatible') - - def get_version(docker_image): - if docker_image is None: - return None - image_parts = docker_image.strip().split(":") - if len(image_parts) != 2: - return None - return image_parts[1] - - agent_version = get_version(agent_image) - plugin_version = get_version(plugin_image) - - if agent_version is None: - self.logger.info('Unknown agent version') - return - - if plugin_version is None: - self.logger.info('Unknown plugin version') - return - - av = version.parse(agent_version) - pv = version.parse(plugin_version) - - if type(av) is version.LegacyVersion or type(pv) is version.LegacyVersion: - self.logger.info('Invalid semantic version, can not compare') - return - - if av.release[0] < pv.release[0]: - self.logger.critical('Agent version is lower than plugin version. Please, upgrade agent.') - - def _docker_image_exists(self): - try: - docker_img = self._docker_api.images.get(self.docker_image_name) - except DockerImageNotFound: - return False - if constants.CHECK_VERSION_COMPATIBILITY(): - self._validate_version(self.info["agent_version"], docker_img.labels.get("VERSION", None)) - return True - - def docker_pull_if_needed(self): - if self.docker_pulled: - return - policy = constants.PULL_POLICY() - if policy == constants.PullPolicy.ALWAYS: - self._docker_pull() - elif policy == constants.PullPolicy.NEVER: - pass - elif policy == constants.PullPolicy.IF_NOT_PRESENT: - if not self._docker_image_exists(): - self._docker_pull() - elif policy == constants.PullPolicy.IF_AVAILABLE: - self._docker_pull(raise_exception=False) - - if not self._docker_image_exists(): - raise RuntimeError("Docker image not found. Agent's PULL_POLICY is {!r}".format(str(policy))) - - self.docker_pulled = True - def _get_task_volumes(self): return {self.dir_task_host: {'bind': '/sly_task_data', 'mode': 'rw'}} - def spawn_container(self, add_envs=None): - if add_envs is None: - add_envs = {} + def get_spawn_entrypoint(self): + return ["sh", "-c", "python -u {}".format(self.entrypoint)] + + def spawn_container(self, add_envs=None, add_labels=None, entrypoint_func=None): + add_envs = sly.take_with_default(add_envs, {}) + add_labels = sly.take_with_default(add_labels, {}) + if entrypoint_func is None: + entrypoint_func = self.get_spawn_entrypoint + self._container_lock.acquire() volumes = self._get_task_volumes() try: self._container = self._docker_api.containers.run( self.docker_image_name, runtime=self.docker_runtime, - entrypoint=["sh", "-c", "python -u {}".format(self.entrypoint)], + entrypoint=entrypoint_func(), detach=True, name='sly_task_{}_{}'.format(self.info['task_id'], constants.TASKS_DOCKER_LABEL()), remove=False, volumes=volumes, - environment={'LOG_LEVEL': 'DEBUG', - 'LANG': 'C.UTF-8', + environment={'LOG_LEVEL': 'DEBUG', + 'LANG': 'C.UTF-8', + 'PYTHONUNBUFFERED': '1', constants._HTTP_PROXY: constants.HTTP_PROXY(), constants._HTTPS_PROXY: constants.HTTPS_PROXY(), + 'HOST_TASK_DIR': self.dir_task_host, + constants._NO_PROXY: constants.NO_PROXY(), + constants._HTTP_PROXY.lower(): constants.HTTP_PROXY(), + constants._HTTPS_PROXY.lower(): constants.HTTPS_PROXY(), + constants._NO_PROXY.lower(): constants.NO_PROXY(), **add_envs}, labels={'ecosystem': 'supervisely', - 'ecosystem_token': constants.TASKS_DOCKER_LABEL(), - 'task_id': str(self.info['task_id'])}, + 'ecosystem_token': constants.TASKS_DOCKER_LABEL(), + 'task_id': str(self.info['task_id']), + **add_labels}, shm_size="1G", stdin_open=False, tty=False, @@ -229,7 +170,7 @@ def spawn_container(self, add_envs=None): ) self._container.reload() self.logger.debug('After spawning. Container status: {}'.format(str(self._container.status))) - self.logger.info('Docker container is spawned',extra={'container_id': self._container.id, 'container_name': self._container.name}) + self.logger.info('Docker container is spawned', extra={'container_id': self._container.id, 'container_name': self._container.name}) finally: self._container_lock.release() @@ -252,7 +193,7 @@ def _drop_container(self): def drop_container_and_check_status(self): status = self._stop_wait_container() - if (len(status) > 0) and (status['StatusCode'] != 0): # StatusCode must exist + if (len(status) > 0) and (status['StatusCode'] not in [0]): # StatusCode must exist raise RuntimeError('Task container finished with non-zero status: {}'.format(str(status))) self.logger.debug('Task container finished with status: {}'.format(str(status))) self._drop_container() diff --git a/agent/src/worker/task_dtl.py b/agent/src/worker/task_dtl.py index 1661d319a..c7f5a45d7 100644 --- a/agent/src/worker/task_dtl.py +++ b/agent/src/worker/task_dtl.py @@ -56,7 +56,6 @@ def download_data_sources(self, only_meta=False): def verify(self): self.download_data_sources(only_meta=True) - self.docker_pull_if_needed() self.spawn_container(add_envs={'VERIFICATION': '1'}) self.process_logs() self.drop_container_and_check_status() diff --git a/agent/src/worker/task_factory.py b/agent/src/worker/task_factory.py index c0b9f5019..304c23094 100644 --- a/agent/src/worker/task_factory.py +++ b/agent/src/worker/task_factory.py @@ -2,10 +2,11 @@ import base64 import json -from docker.errors import DockerException, ImageNotFound +import time import supervisely_lib as sly +from worker import constants from worker.task_dockerized import TaskDockerized from worker.task_dtl import TaskDTL from worker.task_import import TaskImport @@ -21,6 +22,8 @@ from worker.task_python import TaskPython from worker.task_plugin import TaskPlugin from worker.task_plugin_import_local import TaskPluginImportLocal +from worker.task_pull_docker_image import TaskPullDockerImage +from worker.task_app import TaskApp _task_class_mapping = { @@ -38,11 +41,13 @@ 'update_agent': TaskUpdate, 'python': TaskPython, 'general_plugin': TaskPlugin, - 'general_plugin_import_agent': TaskPluginImportLocal + 'general_plugin_import_agent': TaskPluginImportLocal, + 'app': TaskApp } def create_task(task_msg, docker_api): + task_id = task_msg.get('task_id', None) task_type = get_run_mode(docker_api, task_msg) task_cls = _task_class_mapping.get(task_type, None) if task_cls is None: @@ -58,13 +63,14 @@ def get_run_mode(docker_api, task_msg): if "docker_image" not in task_msg: return task_msg['task_type'] - try: - image_info = docker_api.images.get(task_msg["docker_image"]) - except ImageNotFound: - image_info = {} - except DockerException: - image_info = docker_api.images.pull(task_msg["docker_image"]) + temp_msg = {**task_msg, 'pull_policy': constants.PULL_POLICY()} + task_pull = TaskPullDockerImage(temp_msg) + task_pull.docker_api = docker_api + task_pull.start() + while task_pull.is_alive(): + time.sleep(1) + image_info = docker_api.images.get(task_msg["docker_image"]) try: plugin_info = json.loads(base64.b64decode(image_info.labels["INFO"]).decode("utf-8")) except Exception as e: @@ -75,4 +81,4 @@ def get_run_mode(docker_api, task_msg): if result == 'general_plugin' and task_msg['task_type'] == "import_agent": return 'general_plugin_import_agent' - return result + return result \ No newline at end of file diff --git a/agent/src/worker/task_logged.py b/agent/src/worker/task_logged.py index 749616243..6084666f2 100644 --- a/agent/src/worker/task_logged.py +++ b/agent/src/worker/task_logged.py @@ -38,26 +38,30 @@ def __init__(self, task_info): # logs. self._user_api_key = self.info.pop('user_api_key', None) - self.dir_task = osp.join(constants.AGENT_TASKS_DIR(), str(self.info['task_id'])) - self.dir_logs = osp.join(self.dir_task, 'logs') + self.init_task_dir() + self.dir_logs = os.path.join(self.dir_task, 'logs') sly.fs.mkdir(self.dir_task) sly.fs.mkdir(self.dir_logs) - self.dir_task_host = osp.join(constants.AGENT_TASKS_DIR_HOST(), str(self.info['task_id'])) - self._stop_log_event = threading.Event() - self._stop_event = multiprocessing.Event() - - # pre-init for static analysis self.logger = None self.log_queue = None self.executor_log = None self.future_log = None + self.init_logger() + self._stop_log_event = threading.Event() + self._stop_event = multiprocessing.Event() + + # pre-init for static analysis self.api = None self.data_mgr = None self.public_api = None self.public_api_context = None + def init_task_dir(self): + self.dir_task = osp.join(constants.AGENT_TASKS_DIR(), str(self.info['task_id'])) + self.dir_task_host = osp.join(constants.AGENT_TASKS_DIR_HOST(), str(self.info['task_id'])) + def init_logger(self): self.logger = sly.get_task_logger(self.info['task_id']) sly.change_formatters_default_values(self.logger, 'service_type', sly.ServiceType.AGENT) @@ -113,13 +117,12 @@ def end_log_finish(self): # in new process def run(self): try: - self.init_logger() self.init_api() self.future_log = self.executor_log.submit(self.submit_log) # run log submitting except Exception as e: # unable to do something another if crashed print(e) - dump_json_file(e, os.path.join(constants.AGENT_ROOT_DIR(), 'logger_fail.json')) + dump_json_file(str(e), os.path.join(constants.AGENT_ROOT_DIR(), 'logger_fail.json')) os._exit(1) # ok, documented try: diff --git a/agent/src/worker/task_pull_docker_image.py b/agent/src/worker/task_pull_docker_image.py new file mode 100644 index 000000000..8537516eb --- /dev/null +++ b/agent/src/worker/task_pull_docker_image.py @@ -0,0 +1,77 @@ +# coding: utf-8 +from packaging import version +import supervisely_lib as sly + +from worker import constants +from worker.task_sly import TaskSly + + +class TaskPullDockerImage(TaskSly): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.docker_runtime = 'runc' # or 'nvidia' + self._docker_api = None # must be set by someone + + self.docker_image_name = self.info.get('docker_image', None) + if self.docker_image_name is not None and ':' not in self.docker_image_name: + self.docker_image_name += ':latest' + self.docker_pulled = False # in task + + @property + def docker_api(self): + return self._docker_api + + @docker_api.setter + def docker_api(self, val): + self._docker_api = val + + def task_main_func(self): + sly.docker_utils.docker_pull_if_needed(self._docker_api, self.docker_image_name, self.info['pull_policy'], self.logger) + docker_img = self._docker_api.images.get(self.docker_image_name) + if constants.CHECK_VERSION_COMPATIBILITY(): + self._validate_version(self.info["agent_version"], docker_img.labels.get("VERSION", None)) + + def _validate_version(self, agent_image, plugin_image): + self.logger.info('Check if agent and plugin versions are compatible') + + def get_version(docker_image): + if docker_image is None: + return None + image_parts = docker_image.strip().split(":") + if len(image_parts) != 2: + return None + return image_parts[1] + + agent_version = get_version(agent_image) + plugin_version = get_version(plugin_image) + + if agent_version is None: + self.logger.info('Unknown agent version') + return + + if plugin_version is None: + self.logger.info('Unknown plugin version') + return + + av = version.parse(agent_version) + pv = version.parse(plugin_version) + + if type(av) is version.LegacyVersion or type(pv) is version.LegacyVersion: + self.logger.info('Invalid semantic version, can not compare') + return + + if av.release[0] < pv.release[0]: + self.logger.critical('Agent version is lower than plugin version. Please, upgrade agent.') + + def end_log_stop(self): + return sly.EventType.TASK_STOPPED + + def end_log_crash(self, e): + return sly.EventType.TASK_CRASHED + + def end_log_finish(self): + return sly.EventType.TASK_FINISHED + + def report_start(self): + pass \ No newline at end of file diff --git a/base_images/jupyterlab/VERSION b/base_images/jupyterlab/VERSION index 4caf37afb..5165fb090 100644 --- a/base_images/jupyterlab/VERSION +++ b/base_images/jupyterlab/VERSION @@ -1 +1 @@ -base-jupyterlab:6.0.15 +base-jupyterlab:6.0.16 diff --git a/base_images/py/Dockerfile b/base_images/py/Dockerfile index cb1ca87c1..45f4c849c 100644 --- a/base_images/py/Dockerfile +++ b/base_images/py/Dockerfile @@ -1,6 +1,5 @@ -FROM nvidia/cuda:9.0-cudnn7-runtime-ubuntu16.04 - - +#FROM supervisely/base-py:6.0.17 +FROM nvidia/cuda:9.2-cudnn7-runtime-ubuntu16.04 ############################################################################## # common ############################################################################## @@ -24,12 +23,12 @@ RUN apt-get update \ ############################################################################## -# Miniconda & python 3.6 +# Miniconda & python 3.9 ############################################################################## -RUN curl -sSL https://repo.continuum.io/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh -o /tmp/miniconda.sh \ +RUN curl -sSL https://repo.anaconda.com/miniconda/Miniconda3-py38_4.8.3-Linux-x86_64.sh -o /tmp/miniconda.sh \ && bash /tmp/miniconda.sh -bfp /usr/local \ && rm -rf /tmp/miniconda.sh \ - && conda install -y python=3.6.5 \ + && conda install -y python=3.8 \ && conda clean --all --yes ENV PATH /opt/conda/bin:$PATH @@ -49,29 +48,21 @@ RUN apt-get update \ && apt-get autoclean \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log -# opencv; other packages are deps & mentioned to fix versions -RUN conda install -y -c menpo \ - opencv=3.4.1 \ - numpy=1.14.3 \ - zlib=1.2.11 \ - requests=2.18.4 \ - && conda install -y -c conda-forge hdbscan \ - && conda clean --all --yes - RUN pip install --no-cache-dir \ - python-json-logger==0.1.8 \ - pybase64==0.4.0 \ - shapely==1.5.13 \ - imgaug==0.2.5 \ - opencv-python==3.4.1.15 \ - scipy==1.1.0 \ - scikit-image==0.13.0 \ - matplotlib==2.2.2 \ - pillow==5.1.0 \ - networkx==2.1 \ + python-json-logger==0.1.11 \ + pybase64==1.0.2 \ + shapely==1.7.1 \ + imgaug==0.4.0 \ + numpy==1.19 \ + opencv-python==3.4.11.43 \ + scipy==1.5.2 \ + scikit-image==0.17.1 \ + matplotlib==3.3.2 \ + pillow==5.4.1 \ + requests==2.24.0 \ + networkx==2.5 \ jsonschema==2.6.0 - ############################################################################## # Java to run Pycharm ############################################################################## @@ -82,6 +73,7 @@ RUN apt-get update \ && apt-get -qq -y autoremove \ && apt-get autoclean \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ + && rm -rf /jre \ && ln -s /usr/lib/jvm/java-7-openjdk-amd64 /jre ENV JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 @@ -91,9 +83,9 @@ ENV JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 # Additional project libraries ############################################################################## RUN pip install --no-cache-dir \ - pandas==1.0.3 \ - grpcio==1.12.1 \ - grpcio-tools==1.12.1 + pandas==1.1.3 \ + grpcio==1.32.0 \ + grpcio-tools==1.32.0 RUN pip install --upgrade pip RUN apt-get update && \ @@ -102,9 +94,9 @@ RUN apt-get update && \ libexiv2-dev \ libboost-all-dev -RUN pip install py3exiv2==0.4.0 -RUN pip install simplejson==3.16.0 -RUN pip install requests-toolbelt +#RUN pip install py3exiv2==0.8.0 +#RUN pip install simplejson==3.17.2 +RUN pip install requests-toolbelt==0.9.1 RUN pip install PTable RUN pip install flask-restful RUN apt-get install -y fonts-noto=20160116-1 @@ -115,15 +107,20 @@ RUN pip install bidict RUN pip install --no-cache-dir scikit-video==1.1.11 RUN apt-get update && apt-get install -y openssh-server -RUN mkdir /var/run/sshd +RUN mkdir -p /var/run/sshd -RUN pip install plotly==4.5.4 -RUN pip install --upgrade pandas -RUN pip install "pillow<7" +RUN pip install plotly==4.11.0 +RUN pip install docker +RUN pip install fuzzywuzzy -RUN ln -sf /usr/local/bin/python3.6 /usr/bin/python +# to support pandas.read_excel +RUN pip install xlrd +RUN pip install google-cloud-storage +RUN pip install python-slugify +RUN pip install psutil==5.4.5 +RUN pip install cython ############################################################################## # Encoding for python SDK ############################################################################## -ENV LANG C.UTF-8 \ No newline at end of file +ENV LANG C.UTF-8 diff --git a/base_images/py/VERSION b/base_images/py/VERSION index 64498570a..9389a33a3 100644 --- a/base_images/py/VERSION +++ b/base_images/py/VERSION @@ -1 +1 @@ -base-py:6.0.17 +base-py:6.0.18 diff --git a/base_images/py_cuda_10_1/Dockerfile b/base_images/py_cuda_10_1/Dockerfile index 0c3c683a1..7bdf369f1 100644 --- a/base_images/py_cuda_10_1/Dockerfile +++ b/base_images/py_cuda_10_1/Dockerfile @@ -23,12 +23,12 @@ RUN apt-get update \ ############################################################################## -# Miniconda & python 3.6 +# Miniconda & python 3.9 ############################################################################## -RUN curl -sSL https://repo.continuum.io/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh -o /tmp/miniconda.sh \ +RUN curl -sSL https://repo.anaconda.com/miniconda/Miniconda3-py38_4.8.3-Linux-x86_64.sh -o /tmp/miniconda.sh \ && bash /tmp/miniconda.sh -bfp /usr/local \ && rm -rf /tmp/miniconda.sh \ - && conda install -y python=3.6.5 \ + && conda install -y python=3.8 \ && conda clean --all --yes ENV PATH /opt/conda/bin:$PATH @@ -49,21 +49,20 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log RUN pip install --no-cache-dir \ - python-json-logger==0.1.8 \ - pybase64==0.4.0 \ - shapely==1.5.13 \ - imgaug==0.2.5 \ - numpy==1.14.3 \ - opencv-python==3.4.1.15 \ - scipy==1.1.0 \ - scikit-image==0.13.0 \ - matplotlib==2.2.2 \ - pillow==5.1.0 \ - requests==2.18.4 \ - networkx==2.1 \ + python-json-logger==0.1.11 \ + pybase64==1.0.2 \ + shapely==1.7.1 \ + imgaug==0.4.0 \ + numpy==1.19 \ + opencv-python==3.4.11.43 \ + scipy==1.5.2 \ + scikit-image==0.17.1 \ + matplotlib==3.3.2 \ + pillow==5.4.1 \ + requests==2.24.0 \ + networkx==2.5 \ jsonschema==2.6.0 - ############################################################################## # Java to run Pycharm ############################################################################## @@ -74,6 +73,7 @@ RUN apt-get update \ && apt-get -qq -y autoremove \ && apt-get autoclean \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ + && rm -rf /jre \ && ln -s /usr/lib/jvm/java-7-openjdk-amd64 /jre ENV JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 @@ -83,9 +83,9 @@ ENV JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 # Additional project libraries ############################################################################## RUN pip install --no-cache-dir \ - pandas==1.0.3 \ - grpcio==1.12.1 \ - grpcio-tools==1.12.1 + pandas==1.1.3 \ + grpcio==1.32.0 \ + grpcio-tools==1.32.0 RUN pip install --upgrade pip RUN apt-get update && \ @@ -94,9 +94,9 @@ RUN apt-get update && \ libexiv2-dev \ libboost-all-dev -RUN pip install py3exiv2==0.4.0 -RUN pip install simplejson==3.16.0 -RUN pip install requests-toolbelt +#RUN pip install py3exiv2==0.8.0 +#RUN pip install simplejson==3.17.2 +RUN pip install requests-toolbelt==0.9.1 RUN pip install PTable RUN pip install flask-restful RUN apt-get install -y fonts-noto=20160116-1 @@ -107,16 +107,20 @@ RUN pip install bidict RUN pip install --no-cache-dir scikit-video==1.1.11 RUN apt-get update && apt-get install -y openssh-server -RUN mkdir /var/run/sshd - -RUN pip install plotly==4.5.4 +RUN mkdir -p /var/run/sshd -RUN ln -sf /usr/local/bin/python3.6 /usr/bin/python +RUN pip install plotly==4.11.0 +RUN pip install docker +RUN pip install fuzzywuzzy -RUN pip install --upgrade pandas -RUN pip install "pillow<7" +# to support pandas.read_excel +RUN pip install xlrd +RUN pip install google-cloud-storage +RUN pip install python-slugify +RUN pip install psutil==5.4.5 +RUN pip install cython ############################################################################## # Encoding for python SDK ############################################################################## -ENV LANG C.UTF-8 \ No newline at end of file +ENV LANG C.UTF-8 diff --git a/base_images/py_cuda_10_1/VERSION b/base_images/py_cuda_10_1/VERSION index 4ba3562cb..7e9e2878c 100644 --- a/base_images/py_cuda_10_1/VERSION +++ b/base_images/py_cuda_10_1/VERSION @@ -1 +1 @@ -base-py-cuda-10-1:6.0.18 +base-py-cuda-10-1:6.0.19 diff --git a/base_images/py_sdk/Dockerfile b/base_images/py_sdk/Dockerfile new file mode 100644 index 000000000..25b3b753b --- /dev/null +++ b/base_images/py_sdk/Dockerfile @@ -0,0 +1,6 @@ +ARG REGISTRY +ARG TAG +FROM ${REGISTRY}/base-py:${TAG} + +RUN pip install attrs +RUN python -m pip install git+https://github.com/supervisely/supervisely.git diff --git a/base_images/py_sdk/VERSION b/base_images/py_sdk/VERSION new file mode 100644 index 000000000..82cbe1223 --- /dev/null +++ b/base_images/py_sdk/VERSION @@ -0,0 +1 @@ +base-py-sdk:6.0.16 \ No newline at end of file diff --git a/base_images/pytorch/Dockerfile b/base_images/pytorch/Dockerfile index 64c2a1c5d..0abf81029 100644 --- a/base_images/pytorch/Dockerfile +++ b/base_images/pytorch/Dockerfile @@ -5,12 +5,7 @@ FROM ${REGISTRY}/base-py:${TAG} ############################################################################## # pytorch ############################################################################## -RUN conda install -y -c soumith \ - magma-cuda90=2.3.0 \ - && conda install -y -c pytorch \ - pytorch=0.3.1 \ - torchvision=0.2.0 \ - cuda90=1.0 \ +RUN conda install pytorch==1.5.1 torchvision==0.6.1 cudatoolkit=9.2 -c pytorch \ && conda clean --all --yes RUN pip install "pillow<7" diff --git a/base_images/pytorch/VERSION b/base_images/pytorch/VERSION index 930522d98..66c2b1679 100644 --- a/base_images/pytorch/VERSION +++ b/base_images/pytorch/VERSION @@ -1 +1 @@ -base-pytorch:6.0.17 +base-pytorch:6.0.18 diff --git a/base_images/pytorch_1_3_cuda_10_1/Dockerfile b/base_images/pytorch_1_3_cuda_10_1/Dockerfile index a31572805..bded19681 100644 --- a/base_images/pytorch_1_3_cuda_10_1/Dockerfile +++ b/base_images/pytorch_1_3_cuda_10_1/Dockerfile @@ -5,9 +5,7 @@ FROM ${REGISTRY}/base-py-cuda-10-1:${TAG} ############################################################################## # pytorch ############################################################################## -RUN pip install --no-cache-dir \ - torch==1.3.0 \ - torchvision==0.4.1 \ - -f https://download.pytorch.org/whl/torch_stable.html +RUN conda install pytorch==1.5.1 torchvision==0.6.1 cudatoolkit=10.1 -c pytorch \ + && conda clean --all --yes RUN pip install "pillow<7" \ No newline at end of file diff --git a/base_images/pytorch_1_3_cuda_10_1/VERSION b/base_images/pytorch_1_3_cuda_10_1/VERSION index 319f21156..b25547078 100644 --- a/base_images/pytorch_1_3_cuda_10_1/VERSION +++ b/base_images/pytorch_1_3_cuda_10_1/VERSION @@ -1 +1 @@ -base-pytorch-1-3-cuda-10-1:6.0.17 +base-pytorch-1-3-cuda-10-1:6.0.18 diff --git a/base_images/pytorch_v04/Dockerfile b/base_images/pytorch_v04/Dockerfile index 097b55801..52ed00099 100644 --- a/base_images/pytorch_v04/Dockerfile +++ b/base_images/pytorch_v04/Dockerfile @@ -5,12 +5,7 @@ FROM ${REGISTRY}/base-py:${TAG} ############################################################################## # pytorch ############################################################################## -RUN conda install -y -c soumith \ - magma-cuda90=2.3.0 \ - && conda install -y -c pytorch \ - pytorch=0.4.0 \ - torchvision=0.2.1 \ - cuda90=1.0 \ +RUN conda install pytorch==1.5.1 torchvision==0.6.1 cudatoolkit=9.2 -c pytorch \ && conda clean --all --yes RUN pip install "pillow<7" diff --git a/base_images/pytorch_v04/VERSION b/base_images/pytorch_v04/VERSION index 6dd2e80fb..64bc34a20 100644 --- a/base_images/pytorch_v04/VERSION +++ b/base_images/pytorch_v04/VERSION @@ -1 +1 @@ -base-pytorch-v04:6.0.17 +base-pytorch-v04:6.0.18 diff --git a/base_images/tensorflow/Dockerfile b/base_images/tensorflow/Dockerfile index 8c3458311..845c561e7 100644 --- a/base_images/tensorflow/Dockerfile +++ b/base_images/tensorflow/Dockerfile @@ -14,4 +14,4 @@ RUN apt-get update \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log RUN pip install --no-cache-dir \ - tensorflow_gpu==1.5.0 \ No newline at end of file + tensorflow_gpu==2.3.1 \ No newline at end of file diff --git a/base_images/tensorflow/VERSION b/base_images/tensorflow/VERSION index a3261c2de..0900edbfb 100644 --- a/base_images/tensorflow/VERSION +++ b/base_images/tensorflow/VERSION @@ -1 +1 @@ -base-tensorflow:6.0.15 +base-tensorflow:6.0.16 diff --git a/plugins/dtl/legacy_supervisely_lib/sly_logger.py b/plugins/dtl/legacy_supervisely_lib/sly_logger.py index a2cec7875..a7b284cab 100644 --- a/plugins/dtl/legacy_supervisely_lib/sly_logger.py +++ b/plugins/dtl/legacy_supervisely_lib/sly_logger.py @@ -6,7 +6,7 @@ import os from collections import namedtuple from enum import Enum -import simplejson +#import simplejson from pythonjsonlogger import jsonlogger @@ -105,15 +105,15 @@ def _get_default_logging_fields(): return ' '.join(['%({0:s})'.format(k) for k in supported_keys]) -def dumps_ignore_nan(obj, *args, **kwargs): - return simplejson.dumps(obj, ignore_nan=True, *args, **kwargs) +#def dumps_ignore_nan(obj, *args, **kwargs): +# return simplejson.dumps(obj, ignore_nan=True, *args, **kwargs) class CustomJsonFormatter(jsonlogger.JsonFormatter): additional_fields = {} def __init__(self, format_string): - super().__init__(format_string, json_serializer=dumps_ignore_nan) + super().__init__(format_string)#, json_serializer=dumps_ignore_nan) def process_log_record(self, log_record): log_record['timestamp'] = log_record.pop('asctime', None) diff --git a/plugins/import/dicom/Dockerfile b/plugins/import/dicom/Dockerfile index dcbf3c36e..62dc550fd 100644 --- a/plugins/import/dicom/Dockerfile +++ b/plugins/import/dicom/Dockerfile @@ -3,7 +3,7 @@ ARG TAG FROM ${REGISTRY}/base-py:${TAG} RUN pip install -U git+https://github.com/pydicom/pydicom.git@c5852160e5942f51ba18d5522b15f656b00b54fc -RUN conda install -c conda-forge gdcm=2.6.6 +#RUN conda install -c conda-forge gdcm=2.8.8 ############### copy code ############### ARG MODULE_PATH diff --git a/plugins/import/images/plugin_info.json b/plugins/import/images/plugin_info.json index 72926f419..4e6e4d93d 100644 --- a/plugins/import/images/plugin_info.json +++ b/plugins/import/images/plugin_info.json @@ -1,5 +1,6 @@ { "title": "Images", "description": "Supported formats: images, directory with images", - "type": "import" + "type": "import", + "run_mode": "general_plugin" } \ No newline at end of file diff --git a/plugins/import/images/predefined_run_configs.json b/plugins/import/images/predefined_run_configs.json index 238b059fd..16b5a8d01 100644 --- a/plugins/import/images/predefined_run_configs.json +++ b/plugins/import/images/predefined_run_configs.json @@ -3,7 +3,8 @@ "title": "settings", "type": "import", "config": { - "normalize_exif": true + "normalize_exif": true, + "remove_alpha_channel": true } } ] \ No newline at end of file diff --git a/plugins/import/images/src/main.py b/plugins/import/images/src/main.py index f50d09fae..ac2aa5095 100644 --- a/plugins/import/images/src/main.py +++ b/plugins/import/images/src/main.py @@ -1,78 +1,100 @@ # coding: utf-8 - import os from collections import defaultdict -import cv2 - -import supervisely_lib as sly -from supervisely_lib import fs -from supervisely_lib import TaskPaths from supervisely_lib.io.json import load_json_file - - -DEFAULT_DS_NAME = 'ds' - - -def find_input_datasets(): - root_files_paths = set(fs.list_files(TaskPaths.DATA_DIR, filter_fn=sly.image.has_valid_ext)) - files_paths = set(fs.list_files_recursively(TaskPaths.DATA_DIR, filter_fn=sly.image.has_valid_ext)) - files_paths = files_paths - root_files_paths - - if len(root_files_paths) + len(files_paths) == 0: - raise RuntimeError(f'Input directory is empty! Supported formats list: {sly.image.SUPPORTED_IMG_EXTS}.') - - datasets = defaultdict(list) - for path in files_paths: - ds_name = os.path.relpath(os.path.dirname(path), TaskPaths.DATA_DIR).replace(os.sep, '__') - datasets[ds_name].append(path) - - default_ds_name = (DEFAULT_DS_NAME + '_' + sly.rand_str(8)) if DEFAULT_DS_NAME in datasets else DEFAULT_DS_NAME - for path in root_files_paths: - datasets[default_ds_name].append(path) - - return datasets - - -def convert(): - task_settings = load_json_file(sly.TaskPaths.TASK_CONFIG_PATH) - in_datasets = find_input_datasets() - - convert_options = task_settings['options'] - normalize_exif = convert_options.get('normalize_exif') - - pr = sly.Project(os.path.join(sly.TaskPaths.RESULTS_DIR, task_settings['res_names']['project']), - sly.OpenMode.CREATE) - - for ds_name, img_paths in in_datasets.items(): - sly.logger.info( - 'Found {} files with supported image extensions in Dataset {!r}.'.format(len(img_paths), ds_name)) - - ds = pr.create_dataset(ds_name) - progress = sly.Progress('Dataset: {!r}'.format(ds_name), len(img_paths)) - for img_path in img_paths: - try: - item_name = os.path.basename(img_path) - - if normalize_exif: - img = sly.image.read(img_path) - sly.image.write(img_path, img) - - ds.add_item_file(item_name, img_path, _use_hardlink=True) - except Exception as e: - exc_str = str(e) - sly.logger.warn('Input sample skipped due to error: {}'.format(exc_str), exc_info=True, extra={ - 'exc_str': exc_str, - 'dataset_name': ds_name, - 'image_name': img_path, - }) - progress.iter_done_report() - - if pr.total_items == 0: - raise RuntimeError('Result project is empty! All input images have unsupported format!') +from supervisely_lib import TaskPaths +import supervisely_lib as sly +from supervisely_lib.video.import_utils import get_dataset_name + + +DEFAULT_DATASET_NAME = 'ds0' +root_ds_name = DEFAULT_DATASET_NAME + + +def add_images_to_project(): + sly.fs.ensure_base_path(sly.TaskPaths.RESULTS_DIR) + + task_config = load_json_file(TaskPaths.TASK_CONFIG_PATH) + + task_id = task_config['task_id'] + append_to_existing_project = task_config['append_to_existing_project'] + server_address = task_config['server_address'] + token = task_config['api_token'] + + convert_options = task_config.get('options', {}) + normalize_exif = convert_options.get('normalize_exif', True) + remove_alpha_channel = convert_options.get('remove_alpha_channel', True) + need_download = normalize_exif or remove_alpha_channel + + api = sly.Api(server_address, token, retry_count=5) + + task_info = api.task.get_info_by_id(task_id) + api.add_additional_field('taskId', task_id) + api.add_header('x-task-id', str(task_id)) + + workspace_id = task_info["workspaceId"] + project_name = task_config.get('project_name', None) + if project_name is None: + project_name = task_config["res_names"]["project"] + + project_info = None + if append_to_existing_project is True: + project_info = api.project.get_info_by_name(workspace_id, project_name, expected_type=sly.ProjectType.IMAGES, raise_error=True) + else: + project_info = api.project.create(workspace_id, project_name, type=sly.ProjectType.IMAGES, change_name_if_conflict=True) + + files_list = api.task.get_import_files_list(task_id) + dataset_to_item = defaultdict(dict) + for file_info in files_list: + original_path = file_info["filename"] + try: + sly.image.validate_ext(original_path) + item_hash = file_info["hash"] + ds_name = get_dataset_name(original_path) + item_name = sly.fs.get_file_name_with_ext(original_path) + if item_name in dataset_to_item[ds_name]: + temp_name = sly.fs.get_file_name(original_path) + temp_ext = sly.fs.get_file_ext(original_path) + new_item_name = "{}_{}{}".format(temp_name, sly.rand_str(5), temp_ext) + sly.logger.warning("Name {!r} already exists in dataset {!r}: renamed to {!r}" + .format(item_name, ds_name, new_item_name)) + item_name = new_item_name + dataset_to_item[ds_name][item_name] = item_hash + except Exception as e: + sly.logger.warning("File skipped {!r}: error occurred during processing {!r}".format(original_path, str(e))) + + for ds_name, ds_items in dataset_to_item.items(): + ds_info = api.dataset.get_or_create(project_info.id, ds_name) + + names = list(ds_items.keys()) + hashes = list(ds_items.values()) + paths = [os.path.join(sly.TaskPaths.RESULTS_DIR, h.replace("/", "a") + sly.image.DEFAULT_IMG_EXT) for h in hashes] + progress = sly.Progress('Dataset: {!r}'.format(ds_name), len(ds_items)) + + for batch_names, batch_hashes, batch_paths in zip(sly.batched(names, 10), sly.batched(hashes, 10), sly.batched(paths, 10)): + if need_download is True: + api.image.download_paths_by_hashes(batch_hashes, batch_paths) + for path in batch_paths: + img = sly.image.read(path, remove_alpha_channel) + sly.image.write(path, img, remove_alpha_channel) + api.image.upload_paths(ds_info.id, batch_names, batch_paths) + sly.fs.clean_dir(sly.TaskPaths.RESULTS_DIR) + else: + api.image.upload_hashes(ds_info.id, batch_names, batch_hashes, progress_cb=progress.iters_done_report) + progress.iters_done_report(len(batch_names)) + + if project_info is not None: + sly.logger.info('PROJECT_CREATED', extra={'event_type': sly.EventType.PROJECT_CREATED, 'project_id': project_info.id}) + else: + temp_str = "Project" + if append_to_existing_project is True: + temp_str = "Dataset" + raise RuntimeError("{} wasn't created: 0 files were added") + pass def main(): - convert() + add_images_to_project() sly.report_import_finished() diff --git a/plugins/import/pointcloud_ply_raw/Dockerfile b/plugins/import/pointcloud_ply_raw/Dockerfile index 2d1daead4..0667f7251 100644 --- a/plugins/import/pointcloud_ply_raw/Dockerfile +++ b/plugins/import/pointcloud_ply_raw/Dockerfile @@ -6,10 +6,10 @@ FROM ${REGISTRY}/base-py:${TAG} ############################################################################## RUN apt-get update -RUN apt-get install libpcl-dev -y -RUN pip install python-pcl +#RUN apt-get install libpcl-dev -y +#RUN pip install python-pcl RUN conda install -c conda-forge scikit-image -RUN python -m pip install open3d==0.9 +RUN python -m pip install open3d==0.11.0 ############### copy code ############### ARG MODULE_PATH diff --git a/plugins/import/pointcloud_ply_raw/src/main.py b/plugins/import/pointcloud_ply_raw/src/main.py index 63700f1c0..fc7f50ec7 100644 --- a/plugins/import/pointcloud_ply_raw/src/main.py +++ b/plugins/import/pointcloud_ply_raw/src/main.py @@ -1,7 +1,7 @@ # coding: utf-8 import os -import pcl +#import pcl import open3d as o3d from supervisely_lib.io.json import load_json_file diff --git a/plugins/nn/deeplab_v3plus/predefined_run_configs.json b/plugins/nn/deeplab_v3plus/predefined_run_configs.json index 550b03587..520e65444 100644 --- a/plugins/nn/deeplab_v3plus/predefined_run_configs.json +++ b/plugins/nn/deeplab_v3plus/predefined_run_configs.json @@ -7,8 +7,8 @@ "epochs": 5, "val_every": 0.5, "batch_size": { - "val": 1, - "train": 1 + "val": 4, + "train": 4 }, "input_size": { "width": 513, diff --git a/plugins/nn/icnet/predefined_run_configs.json b/plugins/nn/icnet/predefined_run_configs.json index dac38efc2..525f8b462 100644 --- a/plugins/nn/icnet/predefined_run_configs.json +++ b/plugins/nn/icnet/predefined_run_configs.json @@ -7,8 +7,8 @@ "epochs": 5, "val_every": 0.5, "batch_size": { - "val": 1, - "train": 1 + "val": 8, + "train": 8 }, "input_size": { "width": 2049, diff --git a/plugins/nn/mask_rcnn_matterport/predefined_run_configs.json b/plugins/nn/mask_rcnn_matterport/predefined_run_configs.json index e852fe779..626c8b5be 100644 --- a/plugins/nn/mask_rcnn_matterport/predefined_run_configs.json +++ b/plugins/nn/mask_rcnn_matterport/predefined_run_configs.json @@ -6,8 +6,8 @@ "lr": 0.001, "epochs": 5, "batch_size": { - "val": 1, - "train": 1 + "val": 4, + "train": 4 }, "input_size": { "max_dim": 256, diff --git a/plugins/nn/unet_v2/predefined_run_configs.json b/plugins/nn/unet_v2/predefined_run_configs.json index eb6fa7dde..06b3a6f6d 100644 --- a/plugins/nn/unet_v2/predefined_run_configs.json +++ b/plugins/nn/unet_v2/predefined_run_configs.json @@ -8,8 +8,8 @@ "height": 256 }, "batch_size": { - "train": 1, - "val": 1 + "train": 8, + "val": 8 }, "dataset_tags": { "train": "train", diff --git a/plugins/nn/yolo_v3/Dockerfile b/plugins/nn/yolo_v3/Dockerfile index 6e53430f8..d345fe251 100644 --- a/plugins/nn/yolo_v3/Dockerfile +++ b/plugins/nn/yolo_v3/Dockerfile @@ -5,15 +5,18 @@ FROM ${REGISTRY}/base-py:${TAG} ############################################################################## # devel components ############################################################################## +#ENV NCCL_VERSION 2.7.8 +# TODO: https://gitlab.com/nvidia/container-images/cuda/-/commit/8df3d50221d673066e95d57decc683670457dd28 +RUN apt-get update RUN apt-get update && apt-get install -y --no-install-recommends \ cuda-libraries-dev-$CUDA_PKG_VERSION \ cuda-nvml-dev-$CUDA_PKG_VERSION \ cuda-minimal-build-$CUDA_PKG_VERSION \ cuda-command-line-tools-$CUDA_PKG_VERSION \ cuda-core-9-0=9.0.176.3-1 \ - cuda-cublas-dev-9-0=9.0.176.4-1 \ - libnccl-dev=$NCCL_VERSION-1+cuda9.0 && \ + cuda-cublas-dev-9-0=9.0.176.4-1 && \ rm -rf /var/lib/apt/lists/* +#libnccl-dev=$NCCL_VERSION-1+cuda9.0 && \ ENV LIBRARY_PATH /usr/local/cuda/lib64/stubs ENV CUDNN_VERSION 7.4.2.24 @@ -24,17 +27,14 @@ RUN apt-get update && apt-get install -y --no-install-recommends --allow-change- ENV LD_LIBRARY_PATH=/usr/local/cuda/lib64:$LD_LIBRARY_PATH -RUN apt-get update && \ - apt-get install -y \ - libopencv-dev=2.4.9.1+dfsg-1.5ubuntu1.1 && \ - ldconfig +RUN apt-get update && apt-get install -y libopencv-dev=2.4.9.1+dfsg-1.5ubuntu1.1 RUN pip install --no-cache-dir \ pyclipper \ cython # OpenCV with version 3.4.1 (in base) has bug for C headers. -RUN conda install --no-update-deps -y opencv=3.4.2 +#RUN conda install --no-update-deps -y opencv=3.4.11 ############### copy code ############### ARG MODULE_PATH diff --git a/plugins/nn/yolo_v3/predefined_run_configs.json b/plugins/nn/yolo_v3/predefined_run_configs.json index 723c971f5..38a5a61aa 100644 --- a/plugins/nn/yolo_v3/predefined_run_configs.json +++ b/plugins/nn/yolo_v3/predefined_run_configs.json @@ -7,7 +7,7 @@ "train": "train" }, "batch_size": { - "train": 2 + "train": 8 }, "subdivisions": { "train": 1, diff --git a/plugins/python/src/legacy/download_project.py b/plugins/python/src/legacy/download_project.py index b0ebbf7b1..270c7d185 100644 --- a/plugins/python/src/legacy/download_project.py +++ b/plugins/python/src/legacy/download_project.py @@ -3,7 +3,7 @@ WORKSPACE_ID = int('%%WORKSPACE_ID%%') src_project_name = '%%IN_PROJECT_NAME%%' -src_dataset_ids = %%DATASET_IDS:None%% +src_dataset_ids = '%%DATASET_IDS:None%%' api = sly.Api(server_address=os.environ['SERVER_ADDRESS'], token=os.environ['API_TOKEN']) diff --git a/setup.py b/setup.py index e447e3e60..ee8813878 100644 --- a/setup.py +++ b/setup.py @@ -14,19 +14,19 @@ def read(fname): # already have PyTorch installed. setup( name="supervisely", - version="6.0.33", + version="6.1.0", packages=find_packages(include=['supervisely_lib', 'supervisely_lib.*']), description="Supervisely Python SDK.", long_description=read("README.md"), url="https://github.com/supervisely/supervisely", install_requires=[ "flask-restful>=0.3.7", - "grpcio>=1.12.1", + "grpcio>=1.32.0", "jsonschema>=2.6.0,<3.0.0", "matplotlib>=3.0.0", - "numpy>=1.14.3", - "opencv-python>=3.4.1,<4.0.0", - "pandas>=1.0.3", + "numpy>=1.19.1", + "opencv-python>=3.4.11.43,<4.0.0", + "pandas>=1.1.3", "pascal-voc-writer>=0.1.4", "PTable>=0.9.2", "pillow>=6.2.1", @@ -34,16 +34,18 @@ def read(fname): # Higher python-json-logger versions are incompatible with # simplejson somehow, so for now prevent higher versions from # being installed. - "python-json-logger==0.1.8", - "requests>=2.18.4", + "python-json-logger==0.1.11", + "requests>=2.24.0", "requests-toolbelt>=0.9.1", - "scikit-image>=0.13.0", - "scipy>=1.1.0", - "Shapely>=1.5.13", - "simplejson>=3.16.0", + "scikit-image>=0.17.1", + "scipy>=1.5.2", + "Shapely>=1.7.1", + #"simplejson>=3.17.2", "Werkzeug>=0.15.1", "bidict", "sk-video", - "plotly==4.5.4" + "plotly==4.11.0", + "docker", + "psutil>=5.4.5" ], ) diff --git a/supervisely_lib/__init__.py b/supervisely_lib/__init__.py index 103b78109..910cc6096 100644 --- a/supervisely_lib/__init__.py +++ b/supervisely_lib/__init__.py @@ -8,6 +8,7 @@ from supervisely_lib.io import fs from supervisely_lib.io import env +from supervisely_lib.io import json from supervisely_lib.io import network_exceptions from supervisely_lib.imaging import image @@ -21,7 +22,7 @@ report_dtl_verification_finished, \ report_metrics_training, report_metrics_validation, report_inference_finished -from supervisely_lib.project.project import Project, OpenMode, download_project, read_single_project +from supervisely_lib.project.project import Project, OpenMode, download_project, read_single_project, upload_project from supervisely_lib.project.project_meta import ProjectMeta from supervisely_lib.annotation.annotation import ANN_EXT, Annotation @@ -79,7 +80,7 @@ from supervisely_lib.video_annotation.video_figure import VideoFigure from supervisely_lib.video_annotation.frame import Frame from supervisely_lib.video_annotation.frame_collection import FrameCollection -from supervisely_lib.project.video_project import VideoDataset, VideoProject, download_video_project +from supervisely_lib.project.video_project import VideoDataset, VideoProject, download_video_project, upload_video_project from supervisely_lib.video import video import supervisely_lib.labeling_jobs.utils as lj @@ -90,5 +91,13 @@ from supervisely_lib.pointcloud_annotation.pointcloud_figure import PointcloudFigure from supervisely_lib.project.pointcloud_project import PointcloudDataset, PointcloudProject, download_pointcloud_project +from supervisely_lib.pyscripts_utils import utils as ps +from supervisely_lib.io import docker_utils +import supervisely_lib.app_widget as app_widget +import supervisely_lib.app as app +from supervisely_lib.app.app_service import AppService -from supervisely_lib.pyscripts_utils import utils as ps \ No newline at end of file +from supervisely_lib.decorators.profile import timeit +import supervisely_lib.script as script +from supervisely_lib.user.user import UserRoleName +from supervisely_lib.io import github_utils as git \ No newline at end of file diff --git a/supervisely_lib/_utils.py b/supervisely_lib/_utils.py index 1c00e1a9a..ed6cc0257 100644 --- a/supervisely_lib/_utils.py +++ b/supervisely_lib/_utils.py @@ -85,3 +85,18 @@ def default(self, obj): COMMUNITY = "community" ENTERPRISE = "enterprise" + + +def validate_percent(value): + if 0 <= value <= 100: + pass + else: + raise ValueError('Percent has to be in range [0; 100]') + + +def sizeof_fmt(num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f %s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f %s%s" % (num, 'Yi', suffix) \ No newline at end of file diff --git a/supervisely_lib/annotation/annotation.py b/supervisely_lib/annotation/annotation.py index f06fae66c..2f9795a7d 100644 --- a/supervisely_lib/annotation/annotation.py +++ b/supervisely_lib/annotation/annotation.py @@ -4,6 +4,7 @@ import json import itertools import numpy as np +from typing import List from copy import deepcopy @@ -71,7 +72,7 @@ def img_size(self): return deepcopy(self._img_size) @property - def labels(self): + def labels(self) -> List[Label]: return self._labels.copy() @property @@ -450,16 +451,6 @@ def from_img_path(cls, img_path): @classmethod def stat_area(cls, render, names, colors): - ''' - The function stat_area computes class distribution statistics in annotations (in pixels), space free of classes, - total area of render, it height, width and number of channels - :param render: mask with classes - :param names: class names for which statistics will be calculated - :param colors: colors of classes on render mask - :return: dictionary with statistics of space representation - ''' - #@TODO: check similar colors - if len(names) != len(colors): raise RuntimeError("len(names) != len(colors) [{} != {}]".format(len(names), len(colors))) @@ -474,68 +465,62 @@ def stat_area(cls, render, names, colors): elif len(render.shape) == 3: channels = render.shape[2] + unlabeled_done = False + covered_pixels = 0 for name, color in zip(names, colors): - col_name = name #"{} [area]".format(name) + col_name = name + if name == "unlabeled": + unlabeled_done = True class_mask = np.all(render == color, axis=-1).astype('uint8') cnt_pixels = class_mask.sum() covered_pixels += cnt_pixels - - result[col_name] = cnt_pixels + result[col_name] = cnt_pixels / total_pixels * 100.0 if covered_pixels > total_pixels: raise RuntimeError("Class colors mistake: covered_pixels > total_pixels") - result['unlabeled area'] = total_pixels - covered_pixels - result['total area'] = total_pixels + if unlabeled_done is False: + result['unlabeled'] = (total_pixels - covered_pixels) / total_pixels * 100.0 + result['height'] = height result['width'] = width result['channels'] = channels return result def stat_class_count(self, class_names): - ''' - The function stat_class_count counts how many times each class from given list occurs in annotation and total number of classes in annotation - :param class_names: list of classes names - :return: dictionary with a number of different classes in annotation and it total count - ''' - def _name_to_key(name): - return name#"{} [count]".format(name) total = 0 - stat = {_name_to_key(name): 0 for name in class_names} + stat = {name: 0 for name in class_names} for label in self._labels: cur_name = label.obj_class.name - if _name_to_key(cur_name) not in stat: + if cur_name not in stat: raise KeyError("Class {!r} not found in {}".format(cur_name, class_names)) - stat[_name_to_key(cur_name)] += 1 + stat[cur_name] += 1 total += 1 - stat['total count'] = total + stat['total'] = total return stat - def stat_img_tags(self, tag_names): - ''' - The function stat_img_tags counts how many times each tag from given list occurs in annotation - :param tag_names: list of tags names - :return: dictionary with a number of different tags in annotation - ''' - stat = {name: 0 for name in tag_names} - for tag in self._img_tags: - cur_name = tag.meta.name - if cur_name not in stat: - raise KeyError("Tag {!r} not found in {}".format(cur_name, tag_names)) - stat[cur_name] += 1 - return stat + # def stat_img_tags(self, tag_names): + # ''' + # The function stat_img_tags counts how many times each tag from given list occurs in annotation + # :param tag_names: list of tags names + # :return: dictionary with a number of different tags in annotation + # ''' + # stat = {name: 0 for name in tag_names} + # stat['any tag'] = 0 + # for tag in self._img_tags: + # cur_name = tag.meta.name + # if cur_name not in stat: + # raise KeyError("Tag {!r} not found in {}".format(cur_name, tag_names)) + # stat[cur_name] += 1 + # stat['any tag'] += 1 + # return stat def draw_class_idx_rgb(self, render, name_to_index): - ''' - The function draw_class_idx_rgb draws rectangle on render mask corresponding to each label with a specific color corresponding to the class name - :param render: mask to draw rectangle, corresponding to each label in annotation - :param name_to_index: a dictionary with an index value for each class name - ''' for label in self._labels: class_idx = name_to_index[label.obj_class.name] color = [class_idx, class_idx, class_idx] - label.draw(render, color=color, thickness=1, draw_tags=False, tags_font=self._get_font()) + label.draw(render, color=color, thickness=1) @property def custom_data(self): diff --git a/supervisely_lib/annotation/label.py b/supervisely_lib/annotation/label.py index d8fbd01c1..24b7bab2f 100644 --- a/supervisely_lib/annotation/label.py +++ b/supervisely_lib/annotation/label.py @@ -150,8 +150,17 @@ def crop(self, rect): :param rect: Rectangle class object :return: Label class object with new geometry ''' - return [self] if rect.contains(self.geometry.to_bbox()) else [ - self.clone(geometry=g) for g in self.geometry.crop(rect)] + if rect.contains(self.geometry.to_bbox()): + return [self] + else: + # for compatibility of old slightly invalid annotations, some of them may be out of image bounds. + # will correct it automatically + result_geometries = self.geometry.crop(rect) + if len(result_geometries) == 1: + result_geometries[0]._copy_creation_info_inplace(self.geometry) + return [self.clone(geometry=result_geometries[0])] + else: + return [self.clone(geometry=g) for g in self.geometry.crop(rect)] def relative_crop(self, rect): ''' @@ -256,6 +265,13 @@ def area(self): ''' return self.geometry.area + def convert(self, new_obj_class: ObjClass): + labels = [] + geometries = self.geometry.convert(new_obj_class.geometry_type) + for g in geometries: + labels.append(self.clone(geometry=g, obj_class=new_obj_class)) + return labels + class Label(LabelBase): def _validate_geometry_type(self): diff --git a/supervisely_lib/annotation/obj_class.py b/supervisely_lib/annotation/obj_class.py index c0ebea596..46894e89c 100644 --- a/supervisely_lib/annotation/obj_class.py +++ b/supervisely_lib/annotation/obj_class.py @@ -12,6 +12,7 @@ class ObjClassJsonFields: + ID = 'id' NAME = 'title' GEOMETRY_TYPE = 'shape' COLOR = 'color' @@ -19,7 +20,7 @@ class ObjClassJsonFields: class ObjClass(KeyObject, JsonSerializable): - def __init__(self, name: str, geometry_type: type, color: List[int]=None, geometry_config: dict=None): + def __init__(self, name: str, geometry_type: type, color: List[int]=None, geometry_config: dict=None, sly_id=None): """ Class of objects (person, car, etc) with necessary properties: name, type of geometry (Polygon, Rectangle, ...) and RGB color. Only one class can be associated with Label. @@ -38,6 +39,7 @@ def __init__(self, name: str, geometry_type: type, color: List[int]=None, geomet self._geometry_type = geometry_type self._color = random_rgb() if color is None else deepcopy(color) self._geometry_config = deepcopy(take_with_default(geometry_config, {})) + self._sly_id = sly_id _validate_color(self._color) @property @@ -80,6 +82,10 @@ def color(self): """ return deepcopy(self._color) + @property + def sly_id(self): + return self._sly_id + def to_json(self) -> dict: """ Converts object to json serializable dictionary. See Supervisely Json format explanation here: @@ -88,12 +94,15 @@ def to_json(self) -> dict: Returns: json serializable dictionary """ - return { + res = { ObjClassJsonFields.NAME: self.name, ObjClassJsonFields.GEOMETRY_TYPE: self.geometry_type.geometry_name(), ObjClassJsonFields.COLOR: rgb2hex(self.color), ObjClassJsonFields.GEOMETRY_CONFIG: self.geometry_type.config_to_json(self._geometry_config) } + if self.sly_id is not None: + res[ObjClassJsonFields.ID] = self.sly_id + return res @classmethod def from_json(cls, data: dict) -> 'ObjClass': @@ -108,7 +117,8 @@ def from_json(cls, data: dict) -> 'ObjClass': geometry_type = GET_GEOMETRY_FROM_STR(data[ObjClassJsonFields.GEOMETRY_TYPE]) color = hex2rgb(data[ObjClassJsonFields.COLOR]) geometry_config = geometry_type.config_from_json(data.get(ObjClassJsonFields.GEOMETRY_CONFIG)) - return cls(name=name, geometry_type=geometry_type, color=color, geometry_config=geometry_config) + sly_id = data.get(ObjClassJsonFields.ID, None) + return cls(name=name, geometry_type=geometry_type, color=color, geometry_config=geometry_config, sly_id=sly_id) def __eq__(self, other: 'ObjClass'): return isinstance(other, ObjClass) and \ @@ -134,7 +144,7 @@ def get_row_ptable(self): return [self.name, self.geometry_type.__name__, self.color] def clone(self, name: str = None, geometry_type: Geometry = None, color: List[int] = None, - geometry_config: dict = None) -> 'ObjClass': + geometry_config: dict = None, sly_id=None) -> 'ObjClass': """ Creates object duplicate. Defined arguments replace corresponding original values. @@ -146,4 +156,5 @@ def clone(self, name: str = None, geometry_type: Geometry = None, color: List[in return ObjClass(name=take_with_default(name, self.name), geometry_type=take_with_default(geometry_type, self.geometry_type), color=take_with_default(color, self.color), - geometry_config=take_with_default(geometry_config, self.geometry_config)) + geometry_config=take_with_default(geometry_config, self.geometry_config), + sly_id=take_with_default(sly_id, self.sly_id)) diff --git a/supervisely_lib/annotation/obj_class_collection.py b/supervisely_lib/annotation/obj_class_collection.py index e9a474105..c7679bfe1 100644 --- a/supervisely_lib/annotation/obj_class_collection.py +++ b/supervisely_lib/annotation/obj_class_collection.py @@ -1,9 +1,11 @@ # coding: utf-8 from typing import List +from collections import defaultdict from supervisely_lib.collection.key_indexed_collection import KeyIndexedCollection from supervisely_lib.io.json import JsonSerializable from supervisely_lib.annotation.obj_class import ObjClass +from supervisely_lib.imaging.color import rgb2hex, hex2rgb class ObjClassCollection(KeyIndexedCollection, JsonSerializable): @@ -36,6 +38,27 @@ def from_json(cls, data: List[dict]) -> 'ObjClassCollection': obj_classes = [ObjClass.from_json(obj_class_json) for obj_class_json in data] return cls(obj_classes) + def validate_classes_colors(self, logger=None): + # check colors uniq + color_names = defaultdict(list) + for obj_class in self: + hex = rgb2hex(obj_class.color) + color_names[hex].append(obj_class.name) + + class_colors_notify = None + for hex_color, class_names in color_names.items(): + if len(class_names) > 1: + warn_str = "Classes {!r} have the same RGB color = {!r}".format(class_names, hex2rgb(hex_color)) + if logger is not None: + logger.warn(warn_str) + if class_colors_notify is None: + class_colors_notify = "" + class_colors_notify += warn_str + '\n\n' + #if class_colors_notify != "": + # pass + # api.report.create_notification("Classes colors", class_colors_notify, sly.NotificationType.WARNING)) + return class_colors_notify + def make_renamed_classes(src_obj_classes: ObjClassCollection, renamer, skip_missing=False) -> ObjClassCollection: ''' diff --git a/supervisely_lib/annotation/tag_meta.py b/supervisely_lib/annotation/tag_meta.py index 8929fce90..ca6dde632 100644 --- a/supervisely_lib/annotation/tag_meta.py +++ b/supervisely_lib/annotation/tag_meta.py @@ -16,6 +16,7 @@ class TagValueType: class TagMetaJsonFields: + ID = 'id' NAME = 'name' VALUE_TYPE = 'value_type' VALUES = 'values' @@ -30,7 +31,7 @@ class TagMeta(KeyObject, JsonSerializable): This is a class for creating and using TagMeta objects. It include tag name, value type, and possible values for tags with enum values. ''' - def __init__(self, name: str, value_type: str, possible_values: List[str] = None, color: List[int]=None): + def __init__(self, name: str, value_type: str, possible_values: List[str] = None, color: List[int]=None, sly_id=None): """ :param name: str :param value_type: str (one of TagValueType fields) @@ -45,6 +46,7 @@ def __init__(self, name: str, value_type: str, possible_values: List[str] = None self._value_type = value_type self._possible_values = possible_values self._color = random_rgb() if color is None else deepcopy(color) + self._sly_id = sly_id if self._value_type == TagValueType.ONEOF_STRING: if self._possible_values is None: @@ -75,6 +77,10 @@ def possible_values(self): def color(self): return self._color.copy() + @property + def sly_id(self): + return self._sly_id + def to_json(self): ''' The function to_json convert TagMeta object to json format @@ -87,6 +93,9 @@ def to_json(self): } if self.value_type == TagValueType.ONEOF_STRING: jdict[TagMetaJsonFields.VALUES] = self.possible_values + + if self.sly_id is not None: + jdict[TagMetaJsonFields.ID] = self.sly_id return jdict @classmethod @@ -105,7 +114,8 @@ def from_json(cls, data): color = data.get(TagMetaJsonFields.COLOR) if color is not None: color = hex2rgb(color) - return cls(name=name, value_type=value_type, possible_values=values, color=color) + sly_id = data.get(TagMetaJsonFields.ID, None) + return cls(name=name, value_type=value_type, possible_values=values, color=color, sly_id=sly_id) else: raise ValueError('Tags must be dict or str types.') @@ -161,7 +171,7 @@ def is_compatible(self, other): self.value_type == other.value_type and self.possible_values == other.possible_values) - def clone(self, name=None, value_type=None, possible_values=None, color=None): + def clone(self, name=None, value_type=None, possible_values=None, color=None, sly_id=None): ''' The function clone make copy of the TagMeta class object :return: TagMeta class object @@ -169,7 +179,8 @@ def clone(self, name=None, value_type=None, possible_values=None, color=None): return TagMeta(name=take_with_default(name, self.name), value_type=take_with_default(value_type, self.value_type), possible_values=take_with_default(possible_values, self.possible_values), - color=take_with_default(color, self.color)) + color=take_with_default(color, self.color), + sly_id=take_with_default(sly_id, self.sly_id)) def __str__(self): return '{:<7s}{:<24} {:<7s}{:<13} {:<13s}{:<10}'.format('Name:', self.name, diff --git a/supervisely_lib/api/advanced_api.py b/supervisely_lib/api/advanced_api.py new file mode 100644 index 000000000..1b7a48954 --- /dev/null +++ b/supervisely_lib/api/advanced_api.py @@ -0,0 +1,22 @@ +# coding: utf-8 + +from supervisely_lib.api.module_api import ApiField, ModuleApiBase + + +class AdvancedApi(ModuleApiBase): + def add_tag_to_object(self, tag_meta_id, figure_id, value=None): + data = {ApiField.TAG_ID: tag_meta_id, ApiField.FIGURE_ID: figure_id} + if value is not None: + data[ApiField.VALUE] = value + resp = self._api.post('/object-tags.add-to-object', data) + return resp.json() + + def remove_tag_from_object(self, tag_meta_id, figure_id, tag_id): + data = {ApiField.TAG_ID: tag_meta_id, ApiField.FIGURE_ID: figure_id, ApiField.ID: tag_id} + resp = self._api.post('/object-tags.remove-from-figure', data) + return resp.json() + + def get_object_tags(self, figure_id): + data = {ApiField.ID: figure_id} + resp = self._api.post('/figures.tags.list', data) + return resp.json() diff --git a/supervisely_lib/api/api.py b/supervisely_lib/api/api.py index 77104b115..24b638ac7 100644 --- a/supervisely_lib/api/api.py +++ b/supervisely_lib/api/api.py @@ -23,6 +23,11 @@ import supervisely_lib.api.pointcloud.pointcloud_api as pointcloud_api import supervisely_lib.api.object_class_api as object_class_api import supervisely_lib.api.report_api as report_api +import supervisely_lib.api.app_api as app_api +import supervisely_lib.api.file_api as file_api +import supervisely_lib.api.image_annotation_tool_api as image_annotation_tool_api +import supervisely_lib.api.advanced_api as advanced_api +import supervisely_lib.api.import_storage_api as import_stoarge_api from supervisely_lib.sly_logger import logger @@ -37,7 +42,7 @@ class Api: - def __init__(self, server_address, token, retry_count=None, retry_sleep_sec=None, external_logger=None): + def __init__(self, server_address, token, retry_count=None, retry_sleep_sec=None, external_logger=None, ignore_task_id=False): ''' :param server_address: str (example: http://192.168.1.69:5555) :param token: str @@ -57,9 +62,9 @@ def __init__(self, server_address, token, retry_count=None, retry_sleep_sec=None retry_sleep_sec = int(os.getenv(SUPERVISELY_PUBLIC_API_RETRY_SLEEP_SEC, '1')) self.headers = {'x-api-key': token} - task_id = os.getenv(SUPERVISELY_TASK_ID) - if task_id is not None: - self.headers['x-task-id'] = task_id + self.task_id = os.getenv(SUPERVISELY_TASK_ID) + if self.task_id is not None and ignore_task_id is False: + self.headers['x-task-id'] = self.task_id self.context = {} self.additional_fields = {} @@ -81,6 +86,11 @@ def __init__(self, server_address, token, retry_count=None, retry_sleep_sec=None self.object_class = object_class_api.ObjectClassApi(self) self.report = report_api.ReportApi(self) self.pointcloud = pointcloud_api.PointcloudApi(self) + self.app = app_api.AppApi(self) + self.file = file_api.FileApi(self) + self.img_ann_tool = image_annotation_tool_api.ImageAnnotationToolApi(self) + self.advanced = advanced_api.AdvancedApi(self) + self.import_storage = import_stoarge_api.ImportStorageApi(self) self.retry_count = retry_count self.retry_sleep_sec = retry_sleep_sec @@ -88,11 +98,11 @@ def __init__(self, server_address, token, retry_count=None, retry_sleep_sec=None self.logger = external_logger or logger @classmethod - def from_env(cls): + def from_env(cls, retry_count=5, ignore_task_id=False): ''' :return: Api class object with server adress and token obtained from environment variables ''' - return cls(os.environ[SERVER_ADDRESS], os.environ[API_TOKEN]) + return cls(os.environ[SERVER_ADDRESS], os.environ[API_TOKEN], retry_count=retry_count, ignore_task_id=ignore_task_id) def add_header(self, key, value): ''' @@ -151,7 +161,7 @@ def post(self, method, data, retries=None, stream=False): "retry_limit": retries}) except Exception as exc: process_unhandled_request(self.logger, exc) - raise requests.exceptions.RetryError("Retry limit exceeded") + raise requests.exceptions.RetryError("Retry limit exceeded ({!r})".format(url)) def get(self, method, params, retries=None, stream=False, use_public_api=True): ''' diff --git a/supervisely_lib/api/app_api.py b/supervisely_lib/api/app_api.py new file mode 100644 index 000000000..39b7bfb82 --- /dev/null +++ b/supervisely_lib/api/app_api.py @@ -0,0 +1,88 @@ +# coding: utf-8 + +import os +from supervisely_lib.api.module_api import ApiField +from supervisely_lib.api.task_api import TaskApi +from supervisely_lib._utils import take_with_default +from supervisely_lib.app.constants import DATA, STATE, CONTEXT, TEMPLATE +from supervisely_lib.io.fs import ensure_base_path +from supervisely_lib.task.progress import Progress +from supervisely_lib._utils import sizeof_fmt +from supervisely_lib import logger + + +class AppApi(TaskApi): + def run_dtl(self, workspace_id, dtl_graph, agent_id=None): + raise RuntimeError("Method is unavailable") + + def _run_plugin_task(self, task_type, agent_id, plugin_id, version, config, input_projects, input_models, + result_name): + raise RuntimeError("Method is unavailable") + + def run_train(self, agent_id, input_project_id, input_model_id, result_nn_name, train_config=None): + raise RuntimeError("Method is unavailable") + + def run_inference(self, agent_id, input_project_id, input_model_id, result_project_name, inference_config=None): + raise RuntimeError("Method is unavailable") + + def get_training_metrics(self, task_id): + raise RuntimeError("Method is unavailable") + + def deploy_model(self, agent_id, model_id): + raise RuntimeError("Method is unavailable") + + def get_import_files_list(self, id): + raise RuntimeError("Method is unavailable") + + def download_import_file(self, id, file_path, save_path): + raise RuntimeError("Method is unavailable") + + def create_task_detached(self, workspace_id, task_type: str=None): + raise RuntimeError("Method is unavailable") + + def upload_files(self, task_id, abs_paths, names, progress_cb=None): + raise RuntimeError("Method is unavailable") + + def initialize(self, task_id, template, data=None, state=None): + d = take_with_default(data, {}) + s = take_with_default(state, {}) + fields = [{"field": TEMPLATE, "payload": template}, {"field": DATA, "payload": d}, {"field": STATE, "payload": s}] + resp = self._api.task.set_fields(task_id, fields) + return resp + + def get_url(self, task_id): + return os.path.join(self._api.server_address, "apps/sessions", str(task_id)) + + def download_git_file(self, app_id, version, file_path, save_path): + raise NotImplementedError() + + def download_git_archive(self, ecosystem_item_id, app_id, version, save_path, log_progress=True, ext_logger=None): + payload = { + ApiField.ECOSYSTEM_ITEM_ID: ecosystem_item_id, + ApiField.VERSION: version, + "isArchive": True + } + if app_id is not None: + payload[ApiField.APP_ID] = app_id + + response = self._api.post('ecosystem.file.download', payload, stream=True) + if log_progress: + if ext_logger is None: + ext_logger = logger + + length = -1 + # Content-Length + if "Content-Length" in response.headers: + length = int(response.headers['Content-Length']) + progress = Progress("Downloading: ", length, ext_logger=ext_logger, is_size=True) + + mb1 = 1024 * 1024 + ensure_base_path(save_path) + with open(save_path, 'wb') as fd: + log_size = 0 + for chunk in response.iter_content(chunk_size=mb1): + fd.write(chunk) + log_size += len(chunk) + if log_progress and log_size > mb1: + progress.iters_done_report(log_size) + log_size = 0 diff --git a/supervisely_lib/api/dataset_api.py b/supervisely_lib/api/dataset_api.py index 2c06d7ce6..f9f9339de 100644 --- a/supervisely_lib/api/dataset_api.py +++ b/supervisely_lib/api/dataset_api.py @@ -1,5 +1,6 @@ # coding: utf-8 +import urllib from supervisely_lib.api.module_api import ApiField, ModuleApi, UpdateableModule, RemoveableModuleApi @@ -18,7 +19,8 @@ def info_sequence(): ApiField.PROJECT_ID, ApiField.IMAGES_COUNT, ApiField.CREATED_AT, - ApiField.UPDATED_AT] + ApiField.UPDATED_AT, + ApiField.REFERENCE_IMAGE_URL] @staticmethod def info_tuple_name(): @@ -148,3 +150,9 @@ def move(self, dst_project_id, id, new_name, change_name_if_conflict=False, with self.remove(id) return new_dataset + def _convert_json_info(self, info: dict, skip_missing=True): + res = super()._convert_json_info(info, skip_missing=skip_missing) + if res.reference_image_url is not None: + res = res._replace(reference_image_url=urllib.parse.urljoin(self._api.server_address, res.reference_image_url)) + return res + diff --git a/supervisely_lib/api/file_api.py b/supervisely_lib/api/file_api.py new file mode 100644 index 000000000..d51f07316 --- /dev/null +++ b/supervisely_lib/api/file_api.py @@ -0,0 +1,89 @@ +# coding: utf-8 +import os +from pathlib import Path +import urllib +from supervisely_lib.api.module_api import ModuleApiBase, ApiField +from supervisely_lib.io.fs import ensure_base_path, get_file_name_with_ext +from requests_toolbelt import MultipartEncoder +import mimetypes +from supervisely_lib.io.fs import get_file_ext, get_file_name + + +class FileApi(ModuleApiBase): + def list(self, team_id, path): + response = self._api.post('file-storage.list', {ApiField.TEAM_ID: team_id, ApiField.PATH: path}) + return response.json() + + def download(self, team_id, remote_path, local_save_path): + response = self._api.post('file-storage.download', {ApiField.TEAM_ID: team_id, ApiField.PATH: remote_path}, stream=True) + ensure_base_path(local_save_path) + with open(local_save_path, 'wb') as fd: + for chunk in response.iter_content(chunk_size=1024 * 1024): + fd.write(chunk) + + def upload(self, team_id, src, dst): + def path_to_bytes_stream(path): + return open(path, 'rb') + item = get_file_name_with_ext(dst) + content_dict = {} + content_dict[ApiField.NAME] = item + + dst_dir = os.path.dirname(dst) + if not dst_dir.endswith(os.path.sep): + dst_dir += os.path.sep + content_dict[ApiField.PATH] = dst_dir # os.path.basedir ... + content_dict["file"] = (item, path_to_bytes_stream(src), mimetypes.MimeTypes().guess_type(src)[0]) + encoder = MultipartEncoder(fields=content_dict) + resp = self._api.post("file-storage.upload?teamId={}".format(team_id), encoder) + return resp.json() + # Example + # { + # "id": 38292, + # "storagePath": "teams_storage/1/X/S/rB/DrtCQEniZRAnj7oxAyJCrF80ViCC6swBcG6hYlUwkjlc0dE58lmIhRvGW00JSrQKO1s5NRuqaIAUZUUU50vK3vp09E62vCCErUF6owvkauzncYMtHssgXqoi9rGY.txt", + # "path": "/reports/classes_stats/max/2020-09-23-12:26:33_pascal voc 2012_(id_355).lnk", + # "userId": 1, + # "meta": { + # "size": 44, + # "mime": "text/plain", + # "ext": "lnk" + # }, + # "name": "2020-09-23-12:26:33_pascal voc 2012_(id_355).lnk", + # "teamId": 1 + # } + + def rename(self, old_name, new_name): + pass + + def remove(self, team_id, path): + resp = self._api.post("file-storage.remove",{ApiField.TEAM_ID: team_id, ApiField.PATH: path}) + + def exists(self, team_id, remote_path): + path_infos = self.list(team_id, remote_path) + for info in path_infos: + if info["path"] == remote_path: + return True + return False + + def get_free_name(self, team_id, path): + directory = Path(path).parent + name = get_file_name(path) + ext = get_file_ext(path) + res_name = name + suffix = 0 + + def _combine(suffix:int=None): + res = "{}/{}".format(directory, res_name) + if suffix is not None: + res += "_{:03d}".format(suffix) + if ext: + res += "{}".format(ext) + return res + + res_path = _combine() + while self.exists(team_id, res_path): + res_path = _combine(suffix) + suffix += 1 + return res_path + + def get_url(self, file_id): + return urllib.parse.urljoin(self._api.server_address, "files/{}".format(file_id)) \ No newline at end of file diff --git a/supervisely_lib/api/image_annotation_tool_api.py b/supervisely_lib/api/image_annotation_tool_api.py new file mode 100644 index 000000000..9f1551316 --- /dev/null +++ b/supervisely_lib/api/image_annotation_tool_api.py @@ -0,0 +1,66 @@ +# coding: utf-8 + +import os +import time +from collections import defaultdict, OrderedDict +import json + +from supervisely_lib.api.module_api import ApiField, ModuleApiBase, ModuleWithStatus, WaitingTimeExceeded +from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor +from supervisely_lib.io.fs import get_file_name, ensure_base_path, get_file_hash +from supervisely_lib.collection.str_enum import StrEnum +from supervisely_lib._utils import batched + + +class ImageAnnotationToolAction(StrEnum): + SET_FIGURE = 'figures/setFigure' + NEXT_IMAGE = 'images/nextImage' + PREV_IMAGE = 'images/prevImage' + SET_IMAGE = 'images/setImage' + ZOOM_TO_FIGURE = 'scene/zoomToObject' + + +class ImageAnnotationToolApi(ModuleApiBase): + def set_figure(self, session_id, figure_id): + return self._act(session_id, ImageAnnotationToolAction.SET_FIGURE, {ApiField.FIGURE_ID: figure_id}) + + def next_image(self, session_id, image_id): + return self._act(session_id, ImageAnnotationToolAction.NEXT_IMAGE, {ApiField.IMAGE_ID: image_id}) + + def prev_image(self, session_id, image_id): + return self._act(session_id, ImageAnnotationToolAction.PREV_IMAGE, {ApiField.IMAGE_ID: image_id}) + + def set_image(self, session_id, image_id): + return self._act(session_id, ImageAnnotationToolAction.SET_IMAGE, {ApiField.IMAGE_ID: image_id}) + + def zoom_to_figure(self, session_id, figure_id, zoom_factor=1): + return self._act(session_id, ImageAnnotationToolAction.ZOOM_TO_FIGURE, + {ApiField.FIGURE_ID: figure_id, ApiField.ZOOM_FACTOR: zoom_factor}) + + def _act(self, session_id: int, action: ImageAnnotationToolAction, payload: dict): + data = {ApiField.SESSION_ID: session_id, ApiField.ACTION: str(action), ApiField.PAYLOAD: payload} + resp = self._api.post('/annotation-tool.run-action', data) + return resp.json() + + + + # { + # "sessionId": "940c4ec7-3818-420b-9277-ab3c820babe5", + # "action": "scene/setViewport", + # "payload": { + # "viewport": { + # "offsetX": -461, # width + # "offsetY": -1228, # height + # "zoom": 1.7424000000000024 + # } + # } + # } + + # { + # "sessionId": "940c4ec7-3818-420b-9277-ab3c820babe5", + # "action": "scene/zoomToObject", + # "payload": { + # "figureId": 22129, + # "zoomFactor": 1.5 + # } + # } \ No newline at end of file diff --git a/supervisely_lib/api/image_api.py b/supervisely_lib/api/image_api.py index 6e00553f5..f4bd74504 100644 --- a/supervisely_lib/api/image_api.py +++ b/supervisely_lib/api/image_api.py @@ -30,7 +30,9 @@ def info_sequence(): ApiField.DATASET_ID, ApiField.CREATED_AT, ApiField.UPDATED_AT, - ApiField.META] + ApiField.META, + ApiField.PATH_ORIGINAL, + ApiField.FULL_STORAGE_URL] @staticmethod def info_tuple_name(): @@ -577,6 +579,38 @@ def url(self, team_id, workspace_id, project_id, dataset_id, image_id): return result + def _download_batch_by_hashes(self, hashes): + for batch_hashes in batched(hashes): + response = self._api.post( + 'images.bulk.download-by-hash', {ApiField.HASHES: batch_hashes}) + decoder = MultipartDecoder.from_response(response) + for part in decoder.parts: + content_utf8 = part.headers[b'Content-Disposition'].decode('utf-8') + # Find name="1245" preceded by a whitespace, semicolon or beginning of line. + # The regex has 2 capture group: one for the prefix and one for the actual name value. + h = content_utf8.replace("form-data; name=\"", "")[:-1] + yield h, part + + def download_paths_by_hashes(self, hashes, paths, progress_cb=None): + if len(hashes) == 0: + return + if len(hashes) != len(paths): + raise RuntimeError("Can not match \"hashes\" and \"paths\" lists, len(hashes) != len(paths)") + + h_to_path = {h: path for h, path in zip(hashes, paths)} + for h, resp_part in self._download_batch_by_hashes(list(set(hashes))): + ensure_base_path(h_to_path[h]) + with open(h_to_path[h], 'wb') as w: + w.write(resp_part.content) + if progress_cb is not None: + progress_cb(1) + + def get_project_id(self, image_id): + dataset_id = self.get_info_by_id(image_id).dataset_id + project_id = self._api.dataset.get_info_by_id(dataset_id).project_id + return project_id + + @staticmethod def _get_free_name(exist_check_fn, name): res_title = name @@ -589,3 +623,16 @@ def _get_free_name(exist_check_fn, name): res_title = '{}_{:03d}{}'.format(name_without_ext, suffix, ext) suffix += 1 return res_title + + def storage_url(self, path_original): + result = urllib.parse.urljoin(self._api.server_address, '{}'.format(path_original)) + return result + + def preview_url(self, url, width=None, height=None, quality=70): + #@TODO: if both width and height are defined, and they are not proportioned to original image resolution, + # then images will be croped from center + if width is None: + width = "" + if height is None: + height = "" + return url.replace("/image-converter", "/previews/{}x{},jpeg,q{}/image-converter".format(width, height, quality)) \ No newline at end of file diff --git a/supervisely_lib/api/import_storage_api.py b/supervisely_lib/api/import_storage_api.py new file mode 100644 index 000000000..351646741 --- /dev/null +++ b/supervisely_lib/api/import_storage_api.py @@ -0,0 +1,12 @@ +# coding: utf-8 +import os +from supervisely_lib.api.module_api import ModuleApiBase, ApiField +from supervisely_lib.io.fs import ensure_base_path, get_file_name_with_ext +from requests_toolbelt import MultipartEncoder +import mimetypes + + +class ImportStorageApi(ModuleApiBase): + def get_meta_by_hashes(self, hashes): + response = self._api.post('import-storage.internal.meta.list', {ApiField.HASHES: hashes}) + return response.json() diff --git a/supervisely_lib/api/module_api.py b/supervisely_lib/api/module_api.py index 5678bf359..81bd2b5a8 100644 --- a/supervisely_lib/api/module_api.py +++ b/supervisely_lib/api/module_api.py @@ -144,9 +144,37 @@ class ApiField: SCRIPT = 'script' LOGS = 'logs' FILES = 'files' + HASHES = 'hashes' + SUBTITLE = 'subtitle' + COMMAND = 'command' + DEFAULT_VALUE = 'defaultValue' + TITLE = 'title' + AREA = 'area' + OPTIONS = 'options' + REPORT_ID = 'reportId' + WIDGET = 'widget' + LAYOUT = 'layout' + PAYLOAD = 'payload' + FIELD = 'field' + FIELDS = 'fields' + APPEND = 'append' WITH_CUSTOM_DATA = 'withCustomBigData' + PATH = 'path' + SESSION_ID = 'sessionId' + ACTION = 'action' + FIGURE_ID = 'figureId' + VALUE = 'value' + ZOOM_FACTOR = 'zoomFactor' + FULL_STORAGE_URL = 'fullStorageUrl' REVIEWER_ID = 'reviewerId' REVIEWER_LOGIN = 'reviewerLogin' + RECURSIVE = 'recursive' + ECOSYSTEM_ITEM_ID = 'moduleId' + APP_ID = 'appId' + PROJECT = 'project' + OUTPUT = 'output' + REFERENCE_IMAGE_URL='referenceImageUrl' + GENERAL = 'general' def _get_single_item(items): diff --git a/supervisely_lib/api/project_api.py b/supervisely_lib/api/project_api.py index 25df02d13..c6625a6a0 100644 --- a/supervisely_lib/api/project_api.py +++ b/supervisely_lib/api/project_api.py @@ -2,6 +2,7 @@ from enum import Enum import pandas as pd +import urllib from supervisely_lib.api.module_api import ApiField, CloneableModuleApi, UpdateableModule, RemoveableModuleApi from supervisely_lib.project.project_meta import ProjectMeta @@ -27,7 +28,8 @@ def info_sequence(): ApiField.WORKSPACE_ID, ApiField.CREATED_AT, ApiField.UPDATED_AT, - ApiField.TYPE] + ApiField.TYPE, + ApiField.REFERENCE_IMAGE_URL] @staticmethod def info_tuple_name(): @@ -180,7 +182,10 @@ def get_activity(self, id): return df def _convert_json_info(self, info: dict, skip_missing=True): - return super(ProjectApi, self)._convert_json_info(info, skip_missing=skip_missing) + res = super()._convert_json_info(info, skip_missing=skip_missing) + if res.reference_image_url is not None: + res = res._replace(reference_image_url=urllib.parse.urljoin(self._api.server_address, res.reference_image_url)) + return res def get_stats(self, id): response = self._api.post('projects.stats', {ApiField.ID: id}) diff --git a/supervisely_lib/api/report_api.py b/supervisely_lib/api/report_api.py index 77d895ef3..fae09cc8f 100644 --- a/supervisely_lib/api/report_api.py +++ b/supervisely_lib/api/report_api.py @@ -3,18 +3,12 @@ import os import json import urllib.parse -from supervisely_lib.api.module_api import ApiField, ModuleApiBase +import uuid +from supervisely_lib.api.module_api import ApiField, ModuleApiBase from supervisely_lib.collection.str_enum import StrEnum -class WidgetType(StrEnum): - TABLE = 'table' - PLOTLY = "plotly" - MARKDOWN = "markdown" - NOTIFICATION = "notification" - - class NotificationType(StrEnum): INFO = 'info' NOTE = "note" @@ -22,54 +16,101 @@ class NotificationType(StrEnum): ERROR = "error" +#@TODO: стандартизовать title/description/name и так жалее у всех одинакого class ReportApi(ModuleApiBase): def __init__(self, api): ModuleApiBase.__init__(self, api) + #https://developer.mozilla.org/en-US/docs/Web/CSS/grid-template + #grid-template: "a a a" 40px "b c c" 40px "b c c" 40px / 1fr 1fr 1fr; + #area -a or b or c def create(self, team_id, name, widgets, layout=""): - response = self._api.post('reports.create', {ApiField.TEAM_ID: team_id, - ApiField.NAME: name, - ApiField.WIDGETS: widgets}) - return response.json()[ApiField.ID] - - def create_table(self, df, name, subtitle, per_page=20, pageSizes=[10, 20, 50, 100, 500], fix_columns=None): - res = { - "name": name, - "subtitle": subtitle, - "type": str(WidgetType.TABLE), - "content": json.loads(df.to_json(orient='split')), - "options": { - "perPage": per_page, - "pageSizes": pageSizes, - } - } - if fix_columns is not None: - res["options"]["fixColumns"] = fix_columns - return res - - def create_notification(self, name, content, notification_type=NotificationType.INFO): - return { - "type": str(WidgetType.NOTIFICATION), - "title": name, - "content": content, - "options": { - "type": str(notification_type) - } + data = { + ApiField.TEAM_ID: team_id, + ApiField.NAME: name, + ApiField.WIDGETS: widgets, + ApiField.LAYOUT: layout } + response = self._api.post('reports.create', data) + return response.json()[ApiField.ID] - def create_plotly(self, data_json, name, subtitle): - data = data_json - if type(data) is str: - data = json.loads(data_json) - elif type(data) is not dict: - raise RuntimeError("type(data_json) is not dict") - return { - "name": name, - "subtitle": subtitle, - "type": str(WidgetType.PLOTLY), - "content": data - } + # def create_table(self, df, name, subtitle, per_page=20, pageSizes=[10, 20, 50, 100, 500], fix_columns=None): + # res = { + # "name": name, + # "subtitle": subtitle, + # "type": str(WidgetType.TABLE), + # "content": json.loads(df.to_json(orient='split')), + # "options": { + # "perPage": per_page, + # "pageSizes": pageSizes, + # } + # } + # if fix_columns is not None: + # res["options"]["fixColumns"] = fix_columns + # return res + # + # def create_notification(self, name, content, notification_type=NotificationType.INFO): + # return { + # "type": str(WidgetType.NOTIFICATION), + # "title": name, + # "content": content, + # "options": { + # "type": str(notification_type) + # } + # } + # + # def create_plotly(self, data_json, name, subtitle): + # data = data_json + # if type(data) is str: + # data = json.loads(data_json) + # elif type(data) is not dict: + # raise RuntimeError("type(data_json) is not dict") + # return { + # "name": name, + # "subtitle": subtitle, + # "type": str(WidgetType.PLOTLY), + # "content": data + # } + # + # + # def create_linechart(self, name, description, id=None): + # res = { + # "type": str(WidgetType.LINECHART), + # "name": "linechart block title", + # "subtitle": "linechart block description", + # "content": [], + # "options": {} + # } + # res["id"] = uuid.uuid4().hex if id is None else id + # return res def url(self, id): return urllib.parse.urljoin(self._api.server_address, 'reports/{}'.format(id)) + def get_widget(self, report_id, widget_id): + response = self._api.post('reports.widgets.get', {"reportId": report_id, "widgetId": widget_id}) + return response.json() + + def _change_widget(self, method, report_id, widget_id, widget_type=None, name=None, description=None, area=None, content=None, options=None): + data = dict() + data[ApiField.ID] = widget_id + if name is not None: + data[ApiField.NAME] = name + if widget_type is not None: + data[ApiField.TYPE] = widget_type + if description is not None: + data[ApiField.SUBTITLE] = description + if area is not None: + data[ApiField.AREA] = area + if content is not None: + data[ApiField.CONTENT] = content + if options is not None: + data[ApiField.OPTIONS] = options + response = self._api.post(method, {ApiField.REPORT_ID: report_id, ApiField.WIDGET: data}) + return response.json() + + def update_widget(self, report_id, widget_id, name=None, description=None, area=None, content=None, options=None): + return self._change_widget('reports.widgets.update', report_id, widget_id, name, description, area, content, options) + + def rewrite_widget(self, report_id, widget_id, widget_type, name=None, description=None, area=None, content=None, options=None): + return self._change_widget('reports.widgets.rewrite', report_id, widget_id, widget_type, name, description, area, content, options) diff --git a/supervisely_lib/api/task_api.py b/supervisely_lib/api/task_api.py index ba3c0f520..9ed8e2213 100644 --- a/supervisely_lib/api/task_api.py +++ b/supervisely_lib/api/task_api.py @@ -5,7 +5,6 @@ from collections import defaultdict, OrderedDict import json - from supervisely_lib.api.module_api import ApiField, ModuleApiBase, ModuleWithStatus, WaitingTimeExceeded from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor from supervisely_lib.io.fs import get_file_name, ensure_base_path, get_file_hash @@ -253,6 +252,60 @@ def upload_files(self, task_id, abs_paths, names, progress_cb=None): if progress_cb is not None: progress_cb(len(content_dict)) + # { + # data: {my_val: 1} + # obj: {val: 1, res: 2} + # } + # { + # obj: {new_val: 1} + # } + # // apped: true, recursive: false + # { + # data: {my_val: 1} + # obj: {new_val: 1} + # }(edited) + # // append: false, recursive: false + # { + # obj: {new_val: 1} + # }(edited) + # + # 16: 32 + # // append: true, recursive: true + # { + # data: {my_val: 1} + # obj: {val: 1, res: 2, new_val: 1} + # } + + def set_fields(self, task_id, fields): + for idx, obj in enumerate(fields): + for key in [ApiField.FIELD, ApiField.PAYLOAD]: + if key not in obj: + raise KeyError("Object #{} does not have field {!r}".format(idx, key)) + data = { + ApiField.TASK_ID: task_id, + ApiField.FIELDS: fields + } + resp = self._api.post('tasks.data.set', data) + return resp.json() + + def set_field(self, task_id, field, payload, append=False, recursive=False): + fields = [ + { + ApiField.FIELD: field, + ApiField.PAYLOAD: payload, + ApiField.APPEND: append, + ApiField.RECURSIVE: recursive, + } + ] + return self.set_fields(task_id, fields) + # + # def get_field(self, task_id, field): + # data = {ApiField.TASK_ID: task_id} + # if field is not None and type(field) == str: + # data[ApiField.FIELD] = field + # resp = self._api.post('tasks.data.get', data) + # return resp.json()["result"] + def _validate_checkpoints_support(self, task_id): info = self.get_info_by_id(task_id) if info["type"] != str(TaskApi.PluginTaskType.TRAIN): @@ -268,3 +321,36 @@ def delete_unused_checkpoints(self, task_id): self._validate_checkpoints_support(task_id) resp = self._api.post("tasks.checkpoints.clear", {ApiField.ID: task_id}) return resp.json() + + def _set_output(self): + pass + + def set_output_project(self, task_id, project_id, project_name=None): + if project_name is None: + project = self._api.project.get_info_by_id(project_id) + project_name = project.name + + output = { + ApiField.PROJECT: { + ApiField.ID: project_id, + ApiField.TITLE: project_name + } + } + resp = self._api.post("tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}) + return resp.json() + + def set_output_report(self, task_id, file_id, file_name): + output = { + ApiField.GENERAL: { + "icon": { + "className": "zmdi zmdi-receipt", + "color": "#33c94c", + "backgroundColor": "#d9f7e4" + }, + "title": file_name, + "titleUrl": self._api.file.get_url(file_id), + "description": "Report" + } + } + resp = self._api.post("tasks.output.set", {ApiField.TASK_ID: task_id, ApiField.OUTPUT: output}) + return resp.json() \ No newline at end of file diff --git a/supervisely_lib/api/user_api.py b/supervisely_lib/api/user_api.py index a65eb3fde..464988311 100644 --- a/supervisely_lib/api/user_api.py +++ b/supervisely_lib/api/user_api.py @@ -12,7 +12,8 @@ class UserApi(ModuleApiBase): def info_sequence(): return [ApiField.ID, ApiField.LOGIN, - #ApiField.ROLE_ID, + ApiField.ROLE, + ApiField.ROLE_ID, ApiField.NAME, ApiField.EMAIL, ApiField.LOGINS, @@ -25,6 +26,9 @@ def info_sequence(): def info_tuple_name(): return 'UserInfo' + def _convert_json_info(self, info: dict, skip_missing=True): + return super(UserApi, self)._convert_json_info(info, skip_missing=skip_missing) + def get_info_by_id(self, id): ''' :param id: int diff --git a/supervisely_lib/api/video/video_annotation_api.py b/supervisely_lib/api/video/video_annotation_api.py index a9461ebae..4ec26bdf3 100644 --- a/supervisely_lib/api/video/video_annotation_api.py +++ b/supervisely_lib/api/video/video_annotation_api.py @@ -1,10 +1,12 @@ # coding: utf-8 +import json from supervisely_lib.api.module_api import ApiField from supervisely_lib.video_annotation.key_id_map import KeyIdMap from supervisely_lib.video_annotation.video_annotation import VideoAnnotation from supervisely_lib.api.entity_annotation.entity_annotation_api import EntityAnnotationAPI +from supervisely_lib.io.json import load_json_file class VideoAnnotationAPI(EntityAnnotationAPI): @@ -30,3 +32,13 @@ def append(self, video_id, ann: VideoAnnotation, key_id_map: KeyIdMap = None): self._append(self._api.video.tag, self._api.video.object, self._api.video.figure, info.project_id, info.dataset_id, video_id, ann.tags, ann.objects, ann.figures, key_id_map) + + def upload_paths(self, video_ids, ann_paths, project_meta, progress_cb=None): + # video_ids from the same dataset + + for video_id, ann_path in zip(video_ids, ann_paths): + ann_json = load_json_file(ann_path) + ann = VideoAnnotation.from_json(ann_json, project_meta) + + # ignore existing key_id_map because the new objects will be created + self.append(video_id, ann) \ No newline at end of file diff --git a/supervisely_lib/api/video/video_api.py b/supervisely_lib/api/video/video_api.py index 06e3470c4..1478772b4 100644 --- a/supervisely_lib/api/video/video_api.py +++ b/supervisely_lib/api/video/video_api.py @@ -1,4 +1,6 @@ # coding: utf-8 +import json +from requests_toolbelt import MultipartDecoder, MultipartEncoder from supervisely_lib.api.module_api import ApiField, RemoveableBulkModuleApi from supervisely_lib.api.video.video_annotation_api import VideoAnnotationAPI @@ -6,9 +8,12 @@ from supervisely_lib.api.video.video_figure_api import VideoFigureApi from supervisely_lib.api.video.video_frame_api import VideoFrameAPI from supervisely_lib.api.video.video_tag_api import VideoTagApi +from supervisely_lib.sly_logger import logger +from supervisely_lib.io.fs import get_file_hash from supervisely_lib.io.fs import ensure_base_path from supervisely_lib._utils import batched +from supervisely_lib.video.video import get_video_streams, gen_video_stream_name class VideoApi(RemoveableBulkModuleApi): @@ -205,4 +210,108 @@ def notify_tracking_error(self, track_id, error, message): ApiField.MESSAGE: "{}: {}".format(error, message) } } - }) \ No newline at end of file + }) + # def upload(self): + # #"/videos.bulk.upload" + # pass + # + # def upload_path(self, dataset_id, name, path, meta=None): + # metas = None if meta is None else [meta] + # return self.upload_paths(dataset_id, [name], [path], metas=metas)[0] + + #@TODO: copypaste from image_api + def check_existing_hashes(self, hashes): + results = [] + if len(hashes) == 0: + return results + for hashes_batch in batched(hashes, batch_size=900): + response = self._api.post('images.internal.hashes.list', hashes_batch) + results.extend(response.json()) + return results + + def upload_paths(self, dataset_id, names, paths, progress_cb=None, metas=None): + def path_to_bytes_stream(path): + return open(path, 'rb') + + video_info_results = [] + + hashes = [get_file_hash(x) for x in paths] + + self._upload_data_bulk(path_to_bytes_stream, zip(paths, hashes), progress_cb=progress_cb) + metas = self._api.import_storage.get_meta_by_hashes(hashes) + metas2 = [meta["meta"] for meta in metas] + + for name, hash, meta in zip(names, hashes, metas2): + try: + all_streams = meta["streams"] + video_streams = get_video_streams(all_streams) + for stream_info in video_streams: + stream_index = stream_info["index"] + + #TODO: check is community + # if instance_type == sly.COMMUNITY: + # if _check_video_requires_processing(file_info, stream_info) is True: + # warn_video_requires_processing(file_name) + # continue + + item_name = name + info = self._api.video.get_info_by_name(dataset_id, item_name) + if info is not None: + item_name = gen_video_stream_name(name, stream_index) + res = self.upload_hash(dataset_id, item_name, hash, stream_index) + video_info_results.append(res) + except Exception as e: + logger.warning("File skipped {!r}: error occurred during processing {!r}".format(name, str(e))) + + return video_info_results + + #TODO: copypaste from images_api + def _upload_uniq_videos_single_req(self, func_item_to_byte_stream, hashes_items_to_upload): + content_dict = {} + for idx, (_, item) in enumerate(hashes_items_to_upload): + content_dict["{}-file".format(idx)] = (str(idx), func_item_to_byte_stream(item), 'video/*') + encoder = MultipartEncoder(fields=content_dict) + resp = self._api.post('videos.bulk.upload', encoder) + + resp_list = json.loads(resp.text) + remote_hashes = [d['hash'] for d in resp_list if 'hash' in d] + if len(remote_hashes) != len(hashes_items_to_upload): + problem_items = [(hsh, item, resp['errors']) + for (hsh, item), resp in zip(hashes_items_to_upload, resp_list) if resp.get('errors')] + logger.warn('Not all images were uploaded within request.', extra={ + 'total_cnt': len(hashes_items_to_upload), 'ok_cnt': len(remote_hashes), 'items': problem_items}) + return remote_hashes + + def _upload_data_bulk(self, func_item_to_byte_stream, items_hashes, retry_cnt=3, progress_cb=None): + hash_to_items = {i_hash: item for item, i_hash in items_hashes} + + unique_hashes = set(hash_to_items.keys()) + remote_hashes = set(self.check_existing_hashes(list(unique_hashes))) # existing -- from server + if progress_cb: + progress_cb(len(remote_hashes)) + pending_hashes = unique_hashes - remote_hashes + + for retry_idx in range(retry_cnt): + # single attempt to upload all data which is not uploaded yet + + for hashes in batched(list(pending_hashes)): + pending_hashes_items = [(h, hash_to_items[h]) for h in hashes] + hashes_rcv = self._upload_uniq_videos_single_req(func_item_to_byte_stream, pending_hashes_items) + pending_hashes -= set(hashes_rcv) + if set(hashes_rcv) - set(hashes): + logger.warn('Hash inconsistency in images bulk upload.', + extra={'sent': hashes, 'received': hashes_rcv}) + if progress_cb: + progress_cb(len(hashes_rcv)) + + if not pending_hashes: + return + + logger.warn('Unable to upload images (data).', extra={ + 'retry_idx': retry_idx, + 'items': [(h, hash_to_items[h]) for h in pending_hashes] + }) + # now retry it for the case if it is a shadow server/connection error + + raise RuntimeError("Unable to upload images (data). " + "Please check if images are in supported format and if ones aren't corrupted.") diff --git a/supervisely_lib/app/__init__.py b/supervisely_lib/app/__init__.py new file mode 100644 index 000000000..faedd63f9 --- /dev/null +++ b/supervisely_lib/app/__init__.py @@ -0,0 +1,16 @@ +from supervisely_lib.app.constants import * + +#server-address/apps/designer +# +# { +# "a": "...", +# "context_menu": { +# "target": ["images_project", "images_dataset", "labeling_job", "team", "workspace"] +# "DELME show_description (по умолчанию false)": true, +# "context_root (optional)": "Download as / Run App (default) / Report", +# "context_category (optional)": "Objects" +# } +# } + +# context_root: Download as, Run App (default), Report +#[![Views](https://dev.supervise.ly/public/api/v3/ecosystem.counters?repo=supervisely-ecosystem/roads-test&counter=views&label=custom)](https://supervise.ly) \ No newline at end of file diff --git a/supervisely_lib/app/app_config.md b/supervisely_lib/app/app_config.md new file mode 100644 index 000000000..0275edea5 --- /dev/null +++ b/supervisely_lib/app/app_config.md @@ -0,0 +1,162 @@ +## config.json format + + +**name** (`required`) - String + +> Item name + + + +**description** - String + +> Item description + + + +**type** (`required`) - String + +> Item type +> +> *available values:* +> ``` +> project | model | app | notebook | plugin | project +> ``` + + + +**categories** - Array[String] +> List of item categories +> +> *example:* +> +> ``` +> ["Demo"] +> ``` + + + +**icon** - String +> Link to Item icon + + + +**icon_background** - String +> Icon background color in css format +> +> *example:* +> ``` +> "rgb(32, 29, 102)" +> ``` + + + +**icon_cover** - Boolean +> Fit icon to available size + + + + + +**main_script** (`required`) - String +> > Only for items with type "app" +> +> *example:* +> ``` +> "src/add_images_project.py", +> ``` + + + +**gui_template** - String +> > Only for items with type "app" +> +> Path to application UI template +> +> *example:* +> ``` +> "src/gui.html", +> ``` + + + +**modal_template** - String +> > Only for items with type "app" +> +> Path to template that will be shown in "Run Application" dialog +> +> *example:* +> ``` +> "src/modal.html", +> ``` + + + +**modal_template_state** - Object +> > Only for items with type "app" +> +> > Required if modal_template is specified +> +> Properties that will be availble in "Run Application" dialog +> +> *example:* +> ``` json +> { +> "teamId": null, +> "workspaceId": null, +> "projectName": "" +> } +> ``` + + + +**task_location** (`required`) - String +> > Only for items with type "app" +> +> Specify where application session will be displayed +> +> *available values:* +> +> workspace_tasks | application_sessions + + + +**context_menu** - Object +> > Only for items with type "app" +> +> Display application in context menu of specified entities +> +> - **target** (`required`) - Array(String) +> +> Entities list where application will be shown +> +> *available values:* +> +> images_project | videos_project | point_cloud_project | images_dataset | videos_dataset | point_cloud_dataset +> +> +> - **context_root** - String +> +> Root element in context menu +> +> *available values:* +> +> Download as | Run App (`default`) | Report +> +> +> - **context_category** - String +> +> Subcategory in context menu +> +> *example*: +> ``` json +> { +> "target": ["images_project", "images_dataset"], +> "context_root": "Report", +> "context_category": "Objects" +> } +> ``` + + + +**headless** - Boolean +> > Only for items with type "app" diff --git a/supervisely_lib/app/app_service.py b/supervisely_lib/app/app_service.py new file mode 100644 index 000000000..60891d995 --- /dev/null +++ b/supervisely_lib/app/app_service.py @@ -0,0 +1,273 @@ +import json +import os +import traceback +import functools +import sys +import asyncio +import signal +import random +import concurrent.futures +import queue + +from supervisely_lib.worker_api.agent_api import AgentAPI +from supervisely_lib.worker_proto import worker_api_pb2 as api_proto +from supervisely_lib.function_wrapper import function_wrapper +from supervisely_lib._utils import take_with_default +from supervisely_lib.sly_logger import logger as default_logger +from supervisely_lib.sly_logger import EventType +from supervisely_lib.app.constants import STATE, CONTEXT, STOP_COMMAND, IMAGE_ANNOTATION_EVENTS +from supervisely_lib.api.api import Api +from supervisely_lib.io.fs import file_exists + +# https://www.roguelynn.com/words/asyncio-we-did-it-wrong/ + +class ConnectionClosedByServerException(Exception): + pass + +REQUEST_ID = 'request_id' +SERVER_ADDRESS = 'SERVER_ADDRESS' +API_TOKEN = 'API_TOKEN' +REQUEST_DATA = "request_data" +AGENT_TOKEN = "AGENT_TOKEN" + + +def _default_stop(api: Api, task_id, context, state, app_logger): + app_logger.info('Stop app', extra={'event_type': EventType.APP_FINISHED}) + + +class AppService: + NETW_CHUNK_SIZE = 1048576 + QUEUE_MAX_SIZE = 2000 # Maximum number of in-flight requests to avoid exhausting server memory. + DEFAULT_EVENTS = [STOP_COMMAND, *IMAGE_ANNOTATION_EVENTS] + + def __init__(self, logger=None, task_id=None, server_address=None, agent_token=None, ignore_errors=False): + self.logger = take_with_default(logger, default_logger) + self._ignore_errors = ignore_errors + self.task_id = take_with_default(task_id, os.environ["TASK_ID"]) + self.server_address = take_with_default(server_address, os.environ[SERVER_ADDRESS]) + self.agent_token = take_with_default(agent_token, os.environ[AGENT_TOKEN]) + self.public_api = Api.from_env() + self._app_url = self.public_api.app.get_url(self.task_id) + self._session_dir = "/sessions/{}".format(self.task_id) + + self.api = AgentAPI(token=self.agent_token, server_address=self.server_address, ext_logger=self.logger) + self.api.add_to_metadata('x-task-id', str(self.task_id)) + + self.callbacks = {} + self.processing_queue = queue.Queue()#(maxsize=self.QUEUE_MAX_SIZE) + self.logger.debug('App is created', extra={"task_id": self.task_id, "server_address": self.server_address}) + + self._ignore_stop_for_debug = False + self._error = None + self.stop_event = asyncio.Event() + + self.executor = concurrent.futures.ThreadPoolExecutor() + self.loop = asyncio.get_event_loop() + # May want to catch other signals too + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGQUIT) + for s in signals: + self.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(self._shutdown(signal=s))) + # comment out the line below to see how unhandled exceptions behave + self.loop.set_exception_handler(self.handle_exception) + + def handle_exception(self, loop, context): + # context["message"] will always be there; but context["exception"] may not + msg = context.get("exception", context["message"]) + if isinstance(msg, Exception): + #self.logger.error(traceback.format_exc(), exc_info=True, extra={'exc_str': str(msg), 'future_info': context["future"]}) + self.logger.error(msg, exc_info=True, extra={'future_info': context["future"]}) + else: + self.logger.error("Caught exception: {}".format(msg)) + + self.logger.info("Shutting down...") + asyncio.create_task(self._shutdown()) + + @property + def session_dir(self): + return self._session_dir + + @property + def repo_dir(self): + return os.path.join(self._session_dir, "repo") + + @property + def data_dir(self): + return os.path.join(self._session_dir, "data") + + @property + def app_url(self): + return self._app_url + + def _add_callback(self, callback_name, func): + self.callbacks[callback_name] = func + + def callback(self, callback_name): + """A decorator that is used to register a view function for a + given application command. This does the same thing as :meth:`add_callback` + but is intended for decorator usage:: + @app.callback('calc') + def calc_func(): + return 'Hello World' + :param callback_name: the command name as string + """ + def decorator(f): + self._add_callback(callback_name, f) + + @functools.wraps(f) + def wrapper(*args, **kwargs): + f(*args, **kwargs) + return wrapper + return decorator + + def handle_message_sync(self, request_msg): + try: + state = request_msg.get(STATE, None) + context = request_msg.get(CONTEXT, None) + command = request_msg["command"] + user_api_token = request_msg["api_token"] + user_public_api = Api(self.server_address, user_api_token, retry_count=5, external_logger=self.logger) + + if command == STOP_COMMAND: + self.logger.info("APP receives stop signal from user") + self.stop_event.set() + + if command == STOP_COMMAND and command not in self.callbacks: + _default_stop(user_public_api, self.task_id, context, state, self.logger) + if self._ignore_stop_for_debug is False: + #self.stop() + asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) + return + else: + self.logger.info("STOP event is ignored ...") + elif command in AppService.DEFAULT_EVENTS and command not in self.callbacks: + raise KeyError("App received default command {!r}. Use decorator \"callback\" to handle it." + .format(command)) + elif command not in self.callbacks: + raise KeyError("App received unhandled command {!r}. Use decorator \"callback\" to handle it." + .format(command)) + + if command == STOP_COMMAND: + if self._ignore_stop_for_debug is False: + #self.stop() + asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) + return + else: + self.logger.info("STOP event is ignored ...") + else: + self.callbacks[command](api=user_public_api, + task_id=self.task_id, + context=context, + state=state, + app_logger=self.logger) + except KeyError as e: + self.logger.error(e, exc_info=False) + except Exception as e: + self.logger.error(traceback.format_exc(), exc_info=True, extra={'exc_str': str(e)}) + if self._ignore_errors is False: + self.logger.info("App will be stopped due to error") + #asyncio.create_task(self._shutdown(error=e)) + asyncio.run_coroutine_threadsafe(self._shutdown(error=e), self.loop) + + def consume_sync(self): + while True: + request_msg = self.processing_queue.get() + self.logger.debug('FULL_TASK_MESSAGE', extra={'task_msg': request_msg}) + #asyncio.run_coroutine_threadsafe(self.handle_message(request_msg), self.loop) + asyncio.ensure_future( + self.loop.run_in_executor(self.executor, self.handle_message_sync, request_msg), loop=self.loop + ) + + async def consume(self): + self.logger.info("Starting consumer") + asyncio.ensure_future( + self.loop.run_in_executor(self.executor, self.consume_sync), loop=self.loop + ) + + def publish_sync(self, initial_events=None): + if initial_events is not None: + for event_obj in initial_events: + event_obj["api_token"] = os.environ[API_TOKEN] + self.processing_queue.put(event_obj) + + for gen_event in self.api.get_endless_stream('GetGeneralEventsStream', api_proto.GeneralEvent, api_proto.Empty()): + try: + data = {} + if gen_event.data is not None and gen_event.data != b'': + data = json.loads(gen_event.data.decode('utf-8')) + + event_obj = {REQUEST_ID: gen_event.request_id, **data} + self.processing_queue.put(event_obj) + except Exception as error: + self.logger.warning('App exception: ', extra={"error_message": str(error)}) + + raise ConnectionClosedByServerException('Requests stream to a deployed model closed by the server.') + + async def publish(self, initial_events=None): + self.logger.info("Starting publisher") + asyncio.ensure_future( + self.loop.run_in_executor(self.executor, self.publish_sync, initial_events), loop=self.loop + ) + + def run(self, template_path=None, data=None, state=None, initial_events=None): + if template_path is None: + template_path = os.path.join(os.path.dirname(sys.argv[0]), 'gui.html') + + if not file_exists(template_path): + self.logger.info("App will be running without GUI", extra={"app_url": self.app_url}) + template = "" + else: + with open(template_path, 'r') as file: + template = file.read() + + self.public_api.app.initialize(self.task_id, template, data, state) + self.logger.info("Application session is initialized", extra={"app_url": self.app_url}) + + try: + self.loop.create_task(self.publish(initial_events), name="Publisher") + self.loop.create_task(self.consume(), name="Consumer") + self.loop.run_forever() + finally: + self.loop.close() + self.logger.info("Successfully shutdown the APP service.") + + if self._error is not None: + raise self._error + + def stop(self, wait=True): + #@TODO: add timeout + if wait is True: + event_obj = {"command": "stop", "api_token": os.environ[API_TOKEN]} + self.processing_queue.put(event_obj) + else: + self.logger.info('Stop app (force, no wait)', extra={'event_type': EventType.APP_FINISHED}) + #asyncio.create_task(self._shutdown()) + asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) + + async def _shutdown(self, signal=None, error=None): + """Cleanup tasks tied to the service's shutdown.""" + if signal: + self.logger.info(f"Received exit signal {signal.name}...") + self.logger.info("Nacking outstanding messages") + tasks = [t for t in asyncio.all_tasks() if t is not + asyncio.current_task()] + + [task.cancel() for task in tasks] + + self.logger.info(f"Cancelling {len(tasks)} outstanding tasks") + await asyncio.gather(*tasks, return_exceptions=True) + + self.logger.info("Shutting down ThreadPoolExecutor") + self.executor.shutdown(wait=False) + + self.logger.info(f"Releasing {len(self.executor._threads)} threads from executor") + for thread in self.executor._threads: + try: + thread._tstate_lock.release() + except Exception: + pass + + self.logger.info(f"Flushing metrics") + self.loop.stop() + + if error is not None: + self._error = error diff --git a/supervisely_lib/app/constants.py b/supervisely_lib/app/constants.py new file mode 100644 index 000000000..6a325ace3 --- /dev/null +++ b/supervisely_lib/app/constants.py @@ -0,0 +1,11 @@ +STATE = "state" +DATA = "data" +CONTEXT = "context" +TEMPLATE = "template" + + +SHARED_DATA = '/sessions' + +STOP_COMMAND = "stop" + +IMAGE_ANNOTATION_EVENTS = ["manual_selected_figure_changed"] \ No newline at end of file diff --git a/supervisely_lib/app/sly-icon-example.html b/supervisely_lib/app/sly-icon-example.html new file mode 100644 index 000000000..5d64e9ec9 --- /dev/null +++ b/supervisely_lib/app/sly-icon-example.html @@ -0,0 +1,66 @@ +