diff --git a/appliance/manager.py b/appliance/manager.py index 2c4b8eb..4c6949b 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -140,12 +140,13 @@ async def delete_appliance(self, app_id, purge_data=False): if status != 200: self.logger.error(err) return 207, None, "Failed to deprovision persistent volume '%s'"%local_vols[i].id - _, global_vols, _ = await vol_mgr.get_global_volumes_by_appliance(app_id) - for gpv in global_vols: - gpv.unsubscribe(app_id) - for status, _, err in (await multi([vol_mgr.update_volume(gpv) for gpv in global_vols])): - if status != 200: - self.logger.error(err) + if purge_data: + _, global_vols, _ = await vol_mgr.get_global_volumes_by_appliance(app_id) + for gpv in global_vols: + gpv.unsubscribe(app_id) + for status, _, err in (await multi([vol_mgr.update_volume(gpv) for gpv in global_vols])): + if status != 200: + self.logger.error(err) # deprovision appliance status, msg, err = await self.__app_api.deprovision_appliance(app_id) diff --git a/schedule/local.py b/schedule/local.py index 9c678c9..31eb3b6 100644 --- a/schedule/local.py +++ b/schedule/local.py @@ -1,11 +1,14 @@ -import appliance.manager +import appliance +import volume from abc import ABCMeta +from itertools import groupby from schedule import SchedulePlan from schedule.universal import GlobalScheduleExecutor from commons import AutonomousMonitor, Loggable from config import get_global_scheduler +from locality import Placement class ApplianceScheduleExecutor(AutonomousMonitor): @@ -63,6 +66,10 @@ async def schedule(self, app, agents): class DefaultApplianceScheduler(ApplianceScheduler): + """ + Greedy bin-packing heuristic + + """ def __init__(self, *args, **kwargs): super(DefaultApplianceScheduler, self).__init__(*args, **kwargs) @@ -84,12 +91,12 @@ async def schedule(self, app, agents): contrs_to_create = [c for c in free_contrs if c.state in (ContainerState.SUBMITTED, ContainerState.FAILED)] for c in contrs_to_create: - c.sys_schedule_hints = c.user_schedule_hints + c.sys_schedult_hints = c.user_schedule_hints vols_declared = {v.id: v for v in app.volumes} - vols_to_create = set([v.src for c in free_contrs for v in c.persistent_volumes + vols_to_create = set([v.src for c in contrs_to_create for v in c.persistent_volumes if v.type == ContainerVolumeType.PERSISTENT and v.src in vols_declared - and not vols_declared[v.src].is_instantiated]) + and not vols_declared[v.src].is_active]) for vid in vols_to_create: vols_declared[vid].sys_schedule_hints = vols_declared[vid].user_schedule_hints sched.add_containers(contrs_to_create) @@ -104,3 +111,29 @@ def resolve_dependencies(self, app): for c in contrs.values(): parents.setdefault(c.id, set()).update([d for d in c.dependencies if d in contrs]) return [contrs[k] for k, v in parents.items() if not v] + + def find_placement(self, contrs, agents): + import container + placement = Placement() + cpus_demanded = sum([(c.resources.cpus * c.instances + if isinstance(c, container.service.Service) else c.resources.cpus) + for c in contrs]) + for locality in ('host', 'zone', 'region', 'cloud', ): + agents = list(sorted(agents, key=lambda a: a.attributes[locality])) + key, cpus_avail = max([(k, sum([a.resources.cpus for a in group])) + for k, group in groupby(agents, lambda a: a.attributes[locality])], + key=lambda x: x[1]) + self.logger.info('locality: %s, value: %s, ' + 'CPU available: %.1f, CPU demanded: %.1f'%(locality, key, cpus_avail, cpus_demanded)) + if cpus_avail >= cpus_demanded: + if locality == 'host': + placement.host = key + elif locality == 'zone': + placement.zone = key + elif locality == 'region': + placement.region = key + elif locality == 'cloud': + placement.region = key + break + return placement + diff --git a/schedule/plugin/local/cost_aware.py b/schedule/plugin/local/cost_aware.py new file mode 100644 index 0000000..1b6ba71 --- /dev/null +++ b/schedule/plugin/local/cost_aware.py @@ -0,0 +1 @@ +## \ No newline at end of file diff --git a/server.py b/server.py index 635c5ce..0185fe1 100644 --- a/server.py +++ b/server.py @@ -3,7 +3,6 @@ from tornado.web import Application, StaticFileHandler from tornado.httpserver import HTTPServer - from appliance import Appliance from container import Container from volume import PersistentVolume @@ -11,7 +10,7 @@ from appliance.handler import AppliancesHandler, ApplianceHandler from appliance.ui.handler import ApplianceUIHandler from container.handler import ContainersHandler, ContainerHandler, ServicesHandler, JobsHandler -from volume.handler import VolumesHandler, VolumeHandler +from volume.handler import ApplianceVolumesHandler, ApplianceVolumeHandler, GlobalVolumeHandler from cluster.manager import ClusterManager from index.handler import IndexHandler from ping.handler import PingHandler @@ -38,14 +37,15 @@ def start_server(): (r'/appliance\/*', AppliancesHandler), (r'/appliance/(%s)\/*'%Appliance.ID_PATTERN, ApplianceHandler), (r'/appliance/(%s)/container\/*'%Appliance.ID_PATTERN, ContainersHandler), - (r'/appliance/(%s)/volume\/*' % Appliance.ID_PATTERN, VolumesHandler), + (r'/appliance/(%s)/volume\/*' % Appliance.ID_PATTERN, ApplianceVolumesHandler), (r'/appliance/(%s)/service\/*'%Appliance.ID_PATTERN, ServicesHandler), (r'/appliance/(%s)/job\/*'%Container.ID_PATTERN, JobsHandler), (r'/appliance/(%s)/ui\/*'%Appliance.ID_PATTERN, ApplianceUIHandler), (r'/appliance/(%s)/container/(%s)\/*'%(Appliance.ID_PATTERN, Container.ID_PATTERN), ContainerHandler), - (r'/appliance/(%s)/volume/(%s)\/*' % (Appliance.ID_PATTERN, - PersistentVolume.ID_PATTERN), VolumeHandler), + (r'/appliance/(%s)/volume/(%s)\/*'%(Appliance.ID_PATTERN, + PersistentVolume.ID_PATTERN), ApplianceVolumeHandler), + (r'/volume/(%s)\/*'%PersistentVolume.ID_PATTERN, GlobalVolumeHandler), (r'/static/(.*)', StaticFileHandler, dict(path='%s/static'%dirname(__file__))), (r'/api', SwaggerAPIHandler), (r'/api/ui', SwaggerUIHandler), diff --git a/volume/__init__.py b/volume/__init__.py index 90da458..28d3308 100644 --- a/volume/__init__.py +++ b/volume/__init__.py @@ -21,6 +21,16 @@ def __new__(cls, value, driver): return obj +@swagger.enum +class PersistentVolumeState(Enum): + """ + Persistent volume state + """ + CREATED = 'created' + INACTIVE = 'inactive' + ACTIVE = 'active' + + @swagger.model class DataPersistence: """ @@ -163,7 +173,7 @@ def parse(cls, data, from_user=True): return 400, None, "Invalid volume scope: %s"%data.get('scope') if from_user: for f in ('deployment', ): - data.pop('deployment', None) + data.pop(f, None) sched_hints = data.pop('schedule_hints', None) if sched_hints: status, sched_hints, err = VolumeScheduleHints.parse(sched_hints, from_user) @@ -186,13 +196,13 @@ def parse(cls, data, from_user=True): return 400, None, "Unrecognized volume scope: %s"%vol.value return 200, vol, None - def __init__(self, id, type, is_instantiated=False, + def __init__(self, id, type, state=PersistentVolumeState.CREATED, scope=VolumeScope.LOCAL, user_schedule_hints=None, sys_schedule_hints=None, deployment=None, *args, **kwargs): - self.__id = id - self.__scope = scope if isinstance(scope, VolumeScope) else VolumeScope(scope.upper()) - self.__type = type if isinstance(type, PersistentVolumeType) else PersistentVolumeType(type) - self.__is_instantiated = is_instantiated + self.__id = str(id) + self.__scope = VolumeScope(scope.upper()) if isinstance(scope, str) else scope + self.__type = PersistentVolumeType(type.lower()) if isinstance(type, str) else type + self.__state = PersistentVolumeState(state.lower()) if isinstance(state, str) else state if isinstance(user_schedule_hints, dict): self.__user_schedule_hints = VolumeScheduleHints(**user_schedule_hints) @@ -230,38 +240,36 @@ def id(self): @property @swagger.property - def is_instantiated(self): + def type(self): """ - Whether the volume is instantiated + Volume type --- - type: bool - read_only: true + type: PersistentVolumeType + example: cephfs """ - return self.__is_instantiated + return self.__type @property - @swagger.property - def scope(self): + def state(self): """ - Persistent volume scope + Volume state --- - type: PersistentVolumeScope + type: PersistentVolumeState """ - return self.__scope + return self.__state @property @swagger.property - def type(self): + def scope(self): """ - Volume type + Persistent volume scope --- - type: Volume - example: cephfs + type: PersistentVolumeScope """ - return self.__type + return self.__scope @property @swagger.property @@ -298,6 +306,10 @@ def deployment(self): """ return self.__deployment + @property + def is_active(self): + return self.__state == PersistentVolumeState.ACTIVE + @type.setter def type(self, type): self.__type = type @@ -311,16 +323,16 @@ def sys_schedule_hints(self, hints): def deployment(self, deployment): self.__deployment = deployment - def set_instantiated(self): - self.__is_instantiated = True + def set_active(self): + self.__state = PersistentVolumeState.ACTIVE - def unset_instantiated(self): - self.__is_instantiated = False + def set_inactive(self): + self.__state = PersistentVolumeState.INACTIVE def to_render(self): return dict(id=self.id, type=self.type.value, - is_instantiated=self.is_instantiated, + state=self.state.value, scope=self.scope.value, user_schedule_hints=self.user_schedule_hints.to_render(), sys_schedule_hints=self.sys_schedule_hints.to_render(), @@ -329,7 +341,7 @@ def to_render(self): def to_save(self): return dict(id=self.id, type=self.type.value, - is_instantiated=self.is_instantiated, + state=self.state.value, scope=self.scope.value, user_schedule_hints=self.user_schedule_hints.to_save(), sys_schedule_hints=self.sys_schedule_hints.to_save(), @@ -363,7 +375,7 @@ def __eq__(self, other): and self.appliance == other.appliance)) def __str__(self): - return '%s-%s'%(self.appliance, self.id) + return '%s-%s'%(self.appliance, self.id) if self.scope == VolumeScope.LOCAL else str(self.id) @swagger.model diff --git a/volume/handler.py b/volume/handler.py index 19cb504..c825b2a 100644 --- a/volume/handler.py +++ b/volume/handler.py @@ -1,5 +1,6 @@ import swagger +from tornado.gen import multi from tornado.web import RequestHandler from tornado.escape import json_encode @@ -8,7 +9,7 @@ from util import message, error -class VolumesHandler(RequestHandler, Loggable): +class ApplianceVolumesHandler(RequestHandler, Loggable): """ --- - name: app_id @@ -33,7 +34,7 @@ async def get(self, app_id): application/json: schema: type: list - items: Volume + items: PersistentVolume 404: description: the requested appliance does not exist content: @@ -41,12 +42,20 @@ async def get(self, app_id): schema: Error """ - status, vols, err = await self.__vol_mgr.get_local_volumes(appliance=app_id) - self.set_status(status) - self.write(json_encode([v.to_render() for v in vols] if status == 200 else error(err))) - - -class VolumeHandler(RequestHandler, Loggable): + vol_mgr = self.__vol_mgr + resps = await multi([vol_mgr.get_local_volumes(appliance=app_id), + vol_mgr.get_global_volumes_by_appliance(app_id)]) + volumes = [] + for status, vols, err in resps: + if status != 200: + self.set_status(status) + self.write(error(err)) + return + volumes += vols + self.write(json_encode([v.to_render() for v in volumes])) + + +class ApplianceVolumeHandler(RequestHandler, Loggable): """ --- - name: app_id @@ -56,7 +65,7 @@ class VolumeHandler(RequestHandler, Loggable): - name: vol_id required: true - description: persistent volume ID + description: local persistent volume ID type: str """ @@ -67,7 +76,7 @@ def initialize(self): @swagger.operation async def get(self, app_id, vol_id): """ - Get persistent volume + Get local persistent volume --- responses: 200: @@ -89,14 +98,14 @@ async def get(self, app_id, vol_id): @swagger.operation async def delete(self, app_id, vol_id): """ - Delete persistent volume + Delete local persistent volume --- responses: 200: - description: the requested volume is found and returned + description: the requested local volume is purged content: application/json: - schema: PersistentVolume + schema: Message 400: description: > the requested volume is in use by container(s) in the appliance and cannot be deleted yet @@ -115,4 +124,67 @@ async def delete(self, app_id, vol_id): self.write(json_encode(message(msg) if status == 200 else error(err))) +class GlobalVolumeHandler(RequestHandler, Loggable): + """ + --- + - name: vol_id + required: true + description: global persistent volume ID + type: str + + """ + + def initialize(self): + self.__vol_mgr = VolumeManager() + + @swagger.operation + async def get(self, vol_id): + """ + Get global persistent volume + --- + responses: + 200: + description: the requested global volume is found and returned + content: + application/json: + schema: Message + 404: + description: the requested global volume is not found + content: + application/json: + schema: Error + + """ + status, vol, err = await self.__vol_mgr.get_global_volume(vol_id) + self.set_status(status) + self.write(vol.to_render() if status == 200 else error(err)) + + @swagger.operation + async def delete(self, vol_id): + """ + Delete global persistent volume + --- + responses: + 200: + description: the requested volume is purged + content: + application/json: + schema: Message + 400: + description: > + the requested global volume is in use by container(s) in the appliance and cannot be + purged yet + content: + application/json: + schema: Error + 404: + description: the requested volume is not found + content: + application/json: + schema: Error + + """ + status, msg, err = await self.__vol_mgr.purge_global_volume(vol_id) + self.set_status(status) + self.write(json_encode(message(msg) if status == 200 else error(err))) diff --git a/volume/manager.py b/volume/manager.py index 3a13eaa..d47afc2 100644 --- a/volume/manager.py +++ b/volume/manager.py @@ -60,7 +60,7 @@ async def provision_volume(self, vol): if status != 200: self.logger.error(err) return status, None, err - vol.set_instantiated() + vol.set_active() await self.__vol_db.save_volume(vol) return status, vol, None @@ -79,7 +79,7 @@ async def deprovision_volume(self, vol): if status != 200: self.logger.error(err) return status, _, err - vol.unset_instantiated() + vol.set_inactive() await self.__vol_db.save_volume(vol) return status, "Persistent volume '%s' has been deprovisioned"%vol.id, None @@ -88,13 +88,16 @@ async def purge_global_volume(self, vol_id): if status != 200: return status, None, err if len(vol.used_by) > 0: - return 400, None, "Global persistent volume '%s' is in use by appliance(s): " \ - "%s"%(vol_id, vol.used_by) + return 400, None, "Failed to delete the global persistent volume '%s': " \ + "being used by appliance(s): %s"%(vol_id, vol.used_by) + status, msg, err = await self.deprovision_volume(vol) + if status != 200: + return status, None, err status, msg, err = await self.__vol_api.delete_global_volume(vol_id, purge=True) if status != 200: return status, None, err await self.__vol_db.delete_volume(vol) - return status, "Persistent volume '%s' has been purged" % vol, None + return status, "Global persistent volume '%s' has been purged" % vol, None async def purge_local_volume(self, app_id, vol_id): status, vol, err = await self.get_local_volume(app_id, vol_id, full_blown=True) @@ -103,13 +106,13 @@ async def purge_local_volume(self, app_id, vol_id): in_use = set([c.id for c in vol.appliance.containers for v in c.persistent_volumes if v.src == vol_id]) if len(in_use) > 0: - return 400, None, "Local persistent volume '%s' is in use by container(s): " \ - "%s"%(vol_id, list(in_use)) + return 400, None, "Failed to delete the local persistent volume '%s': " \ + "being used by container(s): %s"%(vol_id, list(in_use)) status, msg, err = await self.__vol_api.delete_local_volume(app_id, vol_id, purge=True) if status != 200: return status, None, err await self.__vol_db.delete_volume(vol) - return status, "Persistent volume '%s' has been purged"%vol, None + return status, "Local persistent volume '%s' has been purged"%vol, None async def get_global_volume(self, vol_id): status, vol, err = await self._get_volume(self.__vol_db.get_global_volume, @@ -149,7 +152,6 @@ async def _get_volume(self, db_get_vol_func, api_get_vol_func, *args): if i == 0: vol = output elif i == 1: - self.logger.info(str(output)) vol.deployment = VolumeDeployment(placement=Placement(**output['placement'])) return status, vol, None