Skip to content

Commit

Permalink
Active-Active Topology - Distributor open_flow back-end
Browse files Browse the repository at this point in the history
Distributor open_flow back-end with ovs state and heartbeat
Implements: blueprint https://review.openstack.org/#/c/234639

Change-Id: Ifd01d908edd4e245e18651db998c07b857d46190
Co-Authored-By:  Dean Lorenz <[email protected]>
  • Loading branch information
2 people authored and abedad committed Mar 6, 2017
1 parent a3895c5 commit c9f8737
Show file tree
Hide file tree
Showing 18 changed files with 1,716 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ def finalize_amphora_cluster(self):
provides=constants.LISTENERS))
new_amphora_net_subflow.add(amphora_driver_tasks.ListenersUpdate(
requires=(constants.LISTENERS)))
# we dont need to update listeners here!!!!
new_amphora_net_subflow.add(CreateAmphoraClusterAlgExtraTask(
name=constants.ADD_CLUSTER_ALG_EXTRA,
provides=(constants.CLUSTER_ALG_TYPE, constants.CLUSTER_MIN_SIZE)
))

for amphoraCount in range(self._cluster_size):
sf_name = 'FINALIZE_AMP_' + str(amphoraCount)
Expand Down Expand Up @@ -205,9 +200,9 @@ def finalize_amphora_cluster(self):
name=sf_name + '-' + constants.DISTRIBUTOR_REGISTER_AMP,
requires=(constants.DISTRIBUTOR,
constants.LOADBALANCER,
constants.AMPHORA,
constants.CLUSTER_ALG_TYPE,
constants.CLUSTER_MIN_SIZE)))
constants.AMPHORA),
inject={constants.CLUSTER_ALG_TYPE:'active_active',
constants.CLUSTER_SLOT: amphoraCount}))
new_amphora_net_subflow.add(finalize_amp_for_lb_subflow)

return new_amphora_net_subflow
Expand Down
8 changes: 7 additions & 1 deletion octavia/amphorae/drivers/health/heartbeat_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ class UDPStatusGetter(object):
The heartbeats are transmitted via UDP and this class will bind to a port
and absorb them
"""
def __init__(self, health_update, stats_update):
def __init__(self, health_update, stats_update, distributor_update):
self.stats_update = stats_update
self.health_update = health_update
self.distributo_update = distributor_update
self.key = cfg.CONF.health_manager.heartbeat_key
self.ip = cfg.CONF.health_manager.bind_ip
self.port = cfg.CONF.health_manager.bind_port
Expand Down Expand Up @@ -171,6 +172,11 @@ def dorecv(self, *args, **kw):
def check(self):
try:
(obj, _) = self.dorecv()
if self.distributo_update and 'distributor_id' in obj:
self.executor.submit(
self.distributo_update.update_distributor_health,
obj)
return
if self.health_update:
self.executor.submit(self.health_update.update_health, obj)
if self.stats_update:
Expand Down
22 changes: 19 additions & 3 deletions octavia/cmd/distributor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,28 @@
# License for the specific language governing permissions and limitations
# under the License.

import logging
import multiprocessing as multiproc

import sys

import gunicorn.app.base
from oslo_config import cfg
from oslo_reports import guru_meditation_report as gmr
import six

from octavia.distributor.backend.agent.api_server import server
from octavia.distributor.backend.health_daemon import health_daemon
from octavia.common import service
from octavia.common import utils
from octavia.distributor.backend.agent.api_server import server
from octavia import version

LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_group('distributor', 'octavia.common.config')
CONF.import_group('haproxy_amphora', 'octavia.common.config')
CONF.import_group('amphora_agent', 'octavia.common.config')
HM_SENDER_CMD_QUEUE = multiproc.Queue()


class DistributorAgent(gunicorn.app.base.BaseApplication):
Expand All @@ -50,12 +59,18 @@ def main():
service.prepare_service(sys.argv)

gmr.TextGuruMeditation.setup_autorun(version)
health_sender_proc = multiproc.Process(name='HM_sender',
target=health_daemon.run_sender,
args=(HM_SENDER_CMD_QUEUE,))
health_sender_proc.daemon = True
health_sender_proc.start()

# Initiate server class
server_instance = server.Server()

bind_ip_port = utils.ip_port_str(CONF.haproxy_amphora.bind_host,
CONF.distributor.bind_port)

options = {
'bind': bind_ip_port,
'workers': 1,
Expand All @@ -64,8 +79,9 @@ def main():
'ca_certs': CONF.amphora_agent.agent_server_ca,
'cert_reqs': True,
'preload_app': True,
'accesslog': '-',
'errorlog': '-',
'accesslog': '/var/log/distributor-agent.log',
'errorlog': '/var/log/distributor-agent.log',
'loglevel': 'debug',
}

DistributorAgent(server_instance.app, options).run()
3 changes: 3 additions & 0 deletions octavia/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,14 @@
DISTRIBUTOR_PENDING_UPDATE = 'DISTRIBUTOR_PENDING_UPDATE'
DISTRIBUTOR_PENDING_CREATE = 'DISTRIBUTOR_PENDING_CREATE'
DISTRIBUTOR_DELETED = 'DISTRIBUTOR_DELETED'
DISTRIBUTOR_FULL = 'FULL'
DISTRIBUTOR_ERROR = 'DISTRIBUTOR_ERROR'
SUPPORTED_DISTRIBUTOR_PROVISIONING_STATUSES = (DISTRIBUTOR_ACTIVE,
DISTRIBUTOR_ALLOCATED,
DISTRIBUTOR_BOOTING,
DISTRIBUTOR_READY,
DISTRIBUTOR_DELETED,
DISTRIBUTOR_FULL,
DISTRIBUTOR_ERROR,
DISTRIBUTOR_PENDING_CREATE,
DISTRIBUTOR_PENDING_DELETE,
Expand All @@ -446,5 +448,6 @@
DISTRIBUTOR_PENDING_DELETE)
CLUSTER_ALG_TYPE = 'cluster_alg_type'
CLUSTER_MIN_SIZE = 'cluster_min_size'
CLUSTER_SLOT = 'cluster_slot'
ADD_CLUSTER_ALG_EXTRA = (
'octavia-create-amp-cluster-alg-extra')
107 changes: 107 additions & 0 deletions octavia/controller/healthmanager/update_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,113 @@ def update_health(self, health):
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)


class UpdateDistributorHealthDb(object):
def __init__(self):
super(UpdateDistributorHealthDb, self).__init__()
self.event_streamer = stevedore_driver.DriverManager(
namespace='octavia.controller.queues',
name=cfg.CONF.health_manager.event_streamer_driver,
invoke_on_load=True).driver
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.distributor_repo = repo.DistributorRepository()

def emit(self, info_type, info_id, info_obj):
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
self.event_streamer.emit(cnt)

def update_distributor_health(self, health):
"""This function is to update db info based on amphora status
:type health: dict
:param health: map object that contains distributor, amphora info
The input health data structure is shown as below:
health = {
"distributor-id": self.FAKE_UUID_1,
"provisioning-state": {
"state": service provisioning status,
"reason": str
}
"loadbalancers": {
"lb-id-1": {
"status": instance provisioning status
"size": int
"registered": int
}
}
}
"""
session = db_api.get_session()
distributor_id = health['distributor_id']
distributor_state = health['provisioning_state']['state']
if distributor_state == constants.DISTRIBUTOR_BOOTING:
LOG.debug("Distributor %s is still booting -- skipping"
" health message.", distributor_id)
# nothing to report
return
elif distributor_state == constants.DISTRIBUTOR_FULL:
# @TODO handle FULL state -- eg, set quota limit
LOG.warning(_LW("Distributor %s is unavailable for new"
" requests."),
distributor_id)
elif distributor_state == constants.DISTRIBUTOR_ERROR:
LOG.warning(_LW("Distributor %s is in error state"),
distributor_id)
# @TODO set state of distributor so it is recycled

expected_lbs = self.distributor_repo.get_all_lbs_on_distributor(
session, distributor_id)
reported_lbs = health.get('loadbalancers', {})

if any(lb not in expected_lbs for lb in reported_lbs):
LOG.warning(_LW("Distributor %s reported unexpected"
" loadbalancer ids"), distributor_id)
# @TODO something is wrong here we should recycle

# NO_MONITOR for all missing lbs
reported_lbs.update(
(lb, {'status': constants.NO_MONITOR})
for lb in expected_lbs if lb not in reported_lbs)

# do actual update per lb
for lb_id, lb in six.iteritems(reported_lbs):
try:
lb_in_db = self.loadbalancer_repo.get(session, id=lb_id)
except sqlalchemy.orm.exc.NoResultFound:
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)
continue

reported_lb_op_status = lb['status']
if (reported_lb_op_status == constants.ERROR or
distributor_state == constants.DISTRIBUTOR_ERROR):
# Distributor error currently sets all LBs to error too.
# This is conservative. Could try to recycle Distributor
# without recycling the LBs
new_lb_op_status = constants.ERROR
elif (reported_lb_op_status == constants.DEGRADED and
lb_in_db.operating_status == constants.ONLINE):
new_lb_op_status = constants.DEGRADED
elif (reported_lb_op_status == constants.NO_MONITOR and
lb_in_db.operating_status == constants.ONLINE):
# @TODO Ignoring for now. Should we do anything here?
new_lb_op_status = None
else:
new_lb_op_status = None
# @TODO verify size and registered
if new_lb_op_status:
LOG.debug("%s %s status has changed from %s to "
"%s. Updating db and sending event.",
constants.LOADBALANCER, lb_id,
lb_in_db.operating_status,
new_lb_op_status)
self.loadbalancer_repo.update(
session, lb_id, operating_status=new_lb_op_status)
self.emit(constants.LOADBALANCER, lb_id,
{constants.OPERATING_STATUS: new_lb_op_status})


class UpdateStatsDb(stats.StatsMixin):

def __init__(self):
Expand Down
9 changes: 4 additions & 5 deletions octavia/controller/worker/tasks/distributor_driver_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ def revert(self, result, loadbalancer, *args, **kwargs):

class DistributorRegisterAmphora(BaseDistributorTask):
def execute(self, distributor, loadbalancer, amphora,
cluster_alg_type, cluster_min_size):
cluster_alg_type, cluster_slot):
load_balancer = self.loadbalancer_repo.get(
db_apis.get_session(), id=loadbalancer.id)
self.distributor_driver.register_amphora(
distributor, load_balancer,
amphora, cluster_alg_type,
cluster_min_size)
cluster_slot)

def revert(self, result, loadbalancer, *args, **kwargs):
if isinstance(result, failure.Failure):
Expand All @@ -98,12 +98,11 @@ def revert(self, result, loadbalancer, *args, **kwargs):

class DistributorUnregisterAmphora(BaseDistributorTask):
def execute(self, distributor, loadbalancer, amphora,
cluster_alg_type, cluster_min_size):
cluster_alg_type):
load_balancer = self.loadbalancer_repo.get(db_apis.get_session(),
id=loadbalancer.id)
self.distributor_driver.register_amphora(distributor, load_balancer,
amphora, cluster_alg_type,
cluster_min_size)
amphora, cluster_alg_type)

def revert(self, result, loadbalancer, *args, **kwargs):
if isinstance(result, failure.Failure):
Expand Down
17 changes: 17 additions & 0 deletions octavia/db/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,23 @@ def get_shared_ready_distributor(self, session):
return None
return distributor.to_data_model()

def get_all_lbs_on_distributor(self, session, distributor_id):
"""Get all of the load balancers on a Distributor.
:param session: A Sql Alchemy database session.
:param distributor_id: The Distributor id to list the load
balancers from
:returns: [octavia.common.data_model]
"""
with session.begin(subtransactions=True):
lb_subquery = (
session.query(models.AmphoraCluster.load_balancer_id).
filter_by(distributor_id=distributor_id).subquery())
lb_list = (session.query(models.LoadBalancer).
filter(models.LoadBalancer.id.in_(lb_subquery)).all())
data_model_list = [model.to_data_model() for model in lb_list]
return data_model_list


class AmphoraClusterRepository(BaseRepository):
model_class = models.AmphoraCluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def _get_version_of_installed_package(name):
return m.group(0)[len('Version: '):]


def _get_network_bytes(interface, type):
def _get_network_bytes(interface, byte_type):
file_name = "/sys/class/net/{interface}/statistics/{type}_bytes".format(
interface=interface, type=type)
interface=interface, type=byte_type)
with open(file_name, 'r') as f:
return f.readline()

Expand Down
Loading

0 comments on commit c9f8737

Please sign in to comment.