Skip to content

Commit

Permalink
Merge pull request #40 from dcslab-snu/refactor/swapper
Browse files Browse the repository at this point in the history
swapper, online profiling, core isolator
  • Loading branch information
isac322 authored Oct 30, 2018
2 parents 5b71270 + 7285015 commit 94ca2f1
Show file tree
Hide file tree
Showing 55 changed files with 1,956 additions and 814 deletions.
176 changes: 58 additions & 118 deletions controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,140 +2,42 @@
# coding: UTF-8

import argparse
import functools
import json
import datetime
import logging
import os
import subprocess
import sys
import time
from threading import Thread
from typing import Dict
from typing import Dict, Optional

import pika
import psutil
from pika import BasicProperties
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic

import isolating_controller
from isolating_controller.isolation import NextStep
from isolating_controller.isolation.isolators import Isolator
from isolating_controller.isolation.policies import DiffPolicy, IsolationPolicy
from isolating_controller.metric_container.basic_metric import BasicMetric
from isolating_controller.workload import Workload
from isolating_controller.isolation.policies import AggressiveWViolationPolicy, IsolationPolicy
from isolating_controller.isolation.swapper import SwapIsolator
from pending_queue import PendingQueue
from polling_thread import PollingThread

MIN_PYTHON = (3, 6)


class Singleton(type):
_instances = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


class MainController(metaclass=Singleton):
class Controller:
def __init__(self, metric_buf_size: int) -> None:
self._metric_buf_size = metric_buf_size

self._rmq_host = 'localhost'
self._rmq_creation_queue = 'workload_creation'

self._pending_wl = PendingQueue(DiffPolicy)
self._control_thread = ControlThread(self._pending_wl)

def _cbk_wl_creation(self, ch: BlockingChannel, method: Basic.Deliver, _: BasicProperties, body: bytes) -> None:
ch.basic_ack(method.delivery_tag)

arr = body.decode().strip().split(',')

logger = logging.getLogger('monitoring.workload_creation')
logger.debug(f'{arr} is received from workload_creation queue')

if len(arr) != 4:
return

wl_name, pid, perf_pid, perf_interval = arr
pid = int(pid)
perf_pid = int(perf_pid)
perf_interval = int(perf_interval)

if not psutil.pid_exists(pid):
return
self._pending_queue: PendingQueue = PendingQueue(AggressiveWViolationPolicy)

workload = Workload(wl_name, pid, perf_pid, perf_interval)
self._interval: float = 0.2 # scheduling interval (sec)
self._profile_interval: float = 1.0 # check interval for phase change (sec)
self._solorun_interval: float = 2.0 # the FG's solorun profiling interval (sec)
self._solorun_count: Dict[IsolationPolicy, Optional[int]] = dict()

# FIXME: hard coded
if wl_name == 'SP':
self._pending_wl.add_bg(workload)
else:
self._pending_wl.add_fg(workload)

logger.info(f'{workload} is created')

wl_queue_name = '{}({})'.format(wl_name, pid)
ch.queue_declare(wl_queue_name)
ch.basic_consume(functools.partial(self._cbk_wl_monitor, workload), wl_queue_name)

def _cbk_wl_monitor(self, workload: Workload,
ch: BlockingChannel, method: Basic.Deliver, _: BasicProperties, body: bytes) -> None:
metric = json.loads(body.decode())
ch.basic_ack(method.delivery_tag)

item = BasicMetric(metric['l2miss'],
metric['l3miss'],
metric['instructions'],
metric['cycles'],
metric['stall_cycles'],
metric['wall_cycles'],
metric['intra_coh'],
metric['inter_coh'],
metric['llc_size'],
metric['local_mem'],
metric['remote_mem'],
workload.perf_interval)

logger = logging.getLogger(f'monitoring.metric.{workload}')
logger.debug(f'{metric} is given from ')

metric_que = workload.metrics

if len(metric_que) == self._metric_buf_size:
metric_que.pop()

metric_que.appendleft(item)

def run(self) -> None:
logger = logging.getLogger(__name__)

self._control_thread.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._rmq_host))
channel = connection.channel()

channel.queue_declare(self._rmq_creation_queue)
channel.basic_consume(self._cbk_wl_creation, self._rmq_creation_queue)

try:
logger.debug('starting consuming thread')
channel.start_consuming()

except KeyboardInterrupt:
channel.close()
connection.close()


class ControlThread(Thread):
def __init__(self, pending_queue: PendingQueue) -> None:
super().__init__(daemon=True)
self._isolation_groups: Dict[IsolationPolicy, int] = dict()

self._pending_queue: PendingQueue = pending_queue
self._polling_thread = PollingThread(metric_buf_size, self._pending_queue)

self._interval: int = 2 # Scheduling interval
self._isolation_groups: Dict[IsolationPolicy, int] = dict()
# Swapper init
self._swapper: SwapIsolator = SwapIsolator(self._isolation_groups)

def _isolate_workloads(self) -> None:
logger = logging.getLogger(__name__)
Expand All @@ -145,6 +47,28 @@ def _isolate_workloads(self) -> None:
logger.info(f'***************isolation of {group.name} #{iteration_num}***************')

try:
if group.in_solorun_profiling:
if iteration_num - self._solorun_count[group] >= int(self._solorun_interval / self._interval):
logger.info('Stopping solorun profiling...')

group.stop_solorun_profiling()
del self._solorun_count[group]

logger.info('skipping isolation... because corun data isn\'t collected yet')
else:
logger.info('skipping isolation because of solorun profiling...')

continue

# TODO: first expression can lead low reactivity
elif iteration_num % int(self._profile_interval / self._interval) == 0 and group.profile_needed():
logger.info('Starting solorun profiling...')
group.start_solorun_profiling()
self._solorun_count[group] = iteration_num
group.set_idle_isolator()
logger.info('skipping isolation because of solorun profiling...')
continue

if group.new_isolator_needed:
group.choose_next_isolator()

Expand Down Expand Up @@ -173,6 +97,10 @@ def _isolate_workloads(self) -> None:
finally:
self._isolation_groups[group] += 1

if len(tuple(g for g in self._isolation_groups if g.safe_to_swap)) >= 2:
if self._swapper.swap_is_needed():
self._swapper.do_swap()

def _register_pending_workloads(self) -> None:
"""
This function detects and registers the spawned workloads(threads).
Expand All @@ -185,7 +113,6 @@ def _register_pending_workloads(self) -> None:
logger.info(f'{pending_group} is created')

self._isolation_groups[pending_group] = 0
pending_group.init_isolators()

def _remove_ended_groups(self) -> None:
"""
Expand All @@ -203,9 +130,15 @@ def _remove_ended_groups(self) -> None:
logger.info(f'{group} of {ended_workload.name} is ended')

# remove from containers
group.reset()
del self._isolation_groups[group]
if group.in_solorun_profiling:
group.background_workload.resume()
del self._solorun_count[group]

def run(self) -> None:
self._polling_thread.start()

logger = logging.getLogger(__name__)
logger.info('starting isolation loop')

Expand All @@ -214,7 +147,6 @@ def run(self) -> None:
self._register_pending_workloads()

time.sleep(self._interval)

self._isolate_workloads()


Expand All @@ -223,24 +155,32 @@ def main() -> None:
parser.add_argument('-b', '--metric-buf-size', dest='buf_size', default='50', type=int,
help='metric buffer size per thread. (default : 50)')

os.makedirs('logs', exist_ok=True)

args = parser.parse_args()

formatter = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s'))
file_handler = logging.FileHandler(f'logs/debug_{datetime.datetime.now().isoformat()}.log')
stream_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

controller_logger = logging.getLogger(__name__)
controller_logger.setLevel(logging.INFO)
controller_logger.addHandler(stream_handler)
controller_logger.addHandler(file_handler)

module_logger = logging.getLogger(isolating_controller.__name__)
module_logger.setLevel(logging.DEBUG)
module_logger.addHandler(stream_handler)
module_logger.addHandler(file_handler)

monitoring_logger = logging.getLogger('monitoring')
monitoring_logger.setLevel(logging.INFO)
monitoring_logger.addHandler(stream_handler)
monitoring_logger.addHandler(file_handler)

controller = MainController(args.buf_size)
controller = Controller(args.buf_size)
controller.run()


Expand Down
7 changes: 7 additions & 0 deletions isolating_controller/isolation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ class NextStep(IntEnum):
WEAKEN = 2
STOP = 3
IDLE = 4


class ResourceType(IntEnum):
CPU = 0
CACHE = 1
MEMORY = 2
Unknown = 3
4 changes: 3 additions & 1 deletion isolating_controller/isolation/isolators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# coding: UTF-8


from .base_isolator import Isolator
from .affinity import AffinityIsolator
from .base import Isolator
from .cache import CacheIsolator
from .core import CoreIsolator
from .idle import IdleIsolator
from .memory import MemoryIsolator
from .schedule import SchedIsolator
57 changes: 57 additions & 0 deletions isolating_controller/isolation/isolators/affinity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# coding: UTF-8

import logging
from typing import Optional

from .base import Isolator
from ...metric_container.basic_metric import MetricDiff
from ...workload import Workload


class AffinityIsolator(Isolator):
def __init__(self, foreground_wl: Workload, background_wl: Workload) -> None:
super().__init__(foreground_wl, background_wl)

self._cur_step: int = self._foreground_wl.orig_bound_cores[-1]

self._stored_config: Optional[int] = None

@classmethod
def _get_metric_type_from(cls, metric_diff: MetricDiff) -> float:
return metric_diff.instruction_ps

def strengthen(self) -> 'AffinityIsolator':
self._cur_step += 1
return self

@property
def is_max_level(self) -> bool:
# FIXME: hard coded
return self._cur_step + 1 == self._background_wl.bound_cores[0]

@property
def is_min_level(self) -> bool:
return self._foreground_wl.orig_bound_cores == self._foreground_wl.bound_cores

def weaken(self) -> 'AffinityIsolator':
self._cur_step -= 1
return self

def enforce(self) -> None:
logger = logging.getLogger(__name__)
logger.info(f'affinity of foreground is {self._foreground_wl.orig_bound_cores[0]}-{self._cur_step}')

self._foreground_wl.bound_cores = range(self._foreground_wl.orig_bound_cores[0], self._cur_step + 1)

def reset(self) -> None:
if self._foreground_wl.is_running:
self._foreground_wl.bound_cores = self._foreground_wl.orig_bound_cores

def store_cur_config(self) -> None:
self._stored_config = self._cur_step

def load_cur_config(self) -> None:
super().load_cur_config()

self._cur_step = self._stored_config
self._stored_config = None
Loading

0 comments on commit 94ca2f1

Please sign in to comment.