Skip to content

Commit

Permalink
Merge pull request DataDog#2560 from DataDog/yann/vsphere-support-fol…
Browse files Browse the repository at this point in the history
…ders

[vsphere] enhance topology support ✨
  • Loading branch information
yannmh authored Jun 13, 2016
2 parents 1d54b2d + c973f92 commit e8770f9
Show file tree
Hide file tree
Showing 3 changed files with 398 additions and 90 deletions.
239 changes: 149 additions & 90 deletions checks.d/vsphere.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
from copy import deepcopy
from datetime import datetime, timedelta
from hashlib import md5
from Queue import Empty, Queue
Expand All @@ -14,7 +13,7 @@

# 3p
from pyVim import connect
from pyVmomi import vim # pylint: disable=E0611
from pyVmomi import vim # pylint: disable=E0611

# project
from config import _is_affirmative
Expand Down Expand Up @@ -289,6 +288,7 @@ def transform_vmreconfiguredevent(self):
self.payload['host'] = self.raw_event.vm.name
return self.payload


def atomic_method(method):
""" Decorator to catch the exceptions that happen in detached thread atomic tasks
and display them in the logs.
Expand All @@ -300,6 +300,7 @@ def wrapper(*args, **kwargs):
args[0].exceptionq.put("A worker thread crashed:\n" + traceback.format_exc())
return wrapper


class VSphereCheck(AgentCheck):
""" Get performance metrics from a vCenter server and upload them to Datadog
References:
Expand Down Expand Up @@ -530,112 +531,163 @@ def get_external_host_tags(self):

return external_host_tags

@atomic_method
def _cache_morlist_raw_atomic(self, i_key, obj_type, obj, tags, regexes=None, include_only_marked=False):
""" Compute tags for a single node in the vCenter rootFolder
and queue other such jobs for children nodes.
Usual hierarchy:
rootFolder
- datacenter1
- compute_resource1 == cluster
- host1
- host2
- host3
- compute_resource2
- host5
- vm1
- vm2
If it's a node we want to query metric for, queue it in self.morlist_raw
that will be processed by another job.
def _discover_mor(self, instance_key, obj, tags, regexes=None, include_only_marked=False):
"""
### <TEST-INSTRUMENTATION>
t = Timer()
self.log.debug("job_atomic: Exploring MOR {0} (type={1})".format(obj, obj_type))
### </TEST-INSTRUMENTATION>
tags_copy = deepcopy(tags)
Explore vCenter infrastructure to discover hosts, virtual machines
and compute their associated tags.
Start with the vCenter `rootFolder` and proceed recursively,
queueing other such jobs for children nodes.
Example topology:
```
rootFolder
- datacenter1
- compute_resource1 == cluster
- host1
- host2
- host3
- compute_resource2
- host5
- vm1
- vm2
```
If it's a node we want to query metric for, queue it in `self.morlist_raw` that
will be processed by another job.
"""
@atomic_method
def browse_mor(obj, prev_tags, depth):
self.log.debug(
u"job_atomic: Exploring MOR %s: name=%s, class=%s",
obj, obj.name, obj.__class__
)

if obj_type == 'rootFolder':
for datacenter in obj.childEntity:
# Skip non-datacenter
if not hasattr(datacenter, 'hostFolder'):
continue
self.pool.apply_async(
self._cache_morlist_raw_atomic,
args=(i_key, 'datacenter', datacenter, tags_copy, regexes, include_only_marked)
)
tags = list(prev_tags)

# Folder
if isinstance(obj, vim.Folder):
# Do not tag with root folder
if depth:
tags.append(obj.name)

for resource in obj.childEntity:
self.pool.apply_async(
browse_mor,
args=(resource, tags, depth + 1)
)

# Datacenter
elif isinstance(obj, vim.Datacenter):
tags.append(u"vsphere_datacenter:{0}".format(obj.name))

for resource in obj.hostFolder.childEntity:
self.pool.apply_async(
browse_mor,
args=(resource, tags, depth + 1)
)

# ClusterComputeResource
elif isinstance(obj, vim.ClusterComputeResource):
tags.append(u"vsphere_cluster:{0}".format(obj.name))

for host in obj.host:
# Skip non-host
if not hasattr(host, 'vm'):
continue

self.pool.apply_async(
browse_mor,
args=(host, tags, depth + 1)
)

# Host
elif isinstance(obj, vim.HostSystem):
if self._is_excluded(obj, regexes, include_only_marked):
self.log.debug(
u"Filtered out host '%s'.", obj.name
)
return

elif obj_type == 'datacenter':
dc_tag = "vsphere_datacenter:%s" % obj.name
tags_copy.append(dc_tag)
for compute_resource in obj.hostFolder.childEntity:
# Skip non-compute resource
if not hasattr(compute_resource, 'host'):
continue
self.pool.apply_async(
self._cache_morlist_raw_atomic,
args=(i_key, 'compute_resource', compute_resource, tags_copy, regexes, include_only_marked)
watched_mor = dict(
mor_type='host', mor=obj, hostname=obj.name, tags=tags + [u"vsphere_type:host"]
)
self.morlist_raw[instance_key].append(watched_mor)

tags.append(u"vsphere_host:{}".format(obj.name))
for vm in obj.vm:
if vm.runtime.powerState != 'poweredOn':
continue
self.pool.apply_async(
browse_mor,
args=(vm, tags, depth + 1)
)

# Virtual Machine
elif isinstance(obj, vim.VirtualMachine):
if self._is_excluded(obj, regexes, include_only_marked):
self.log.debug(
u"Filtered out VM '%s'.", obj.name
)
return

elif obj_type == 'compute_resource':
if obj.__class__ == vim.ClusterComputeResource:
cluster_tag = "vsphere_cluster:%s" % obj.name
tags_copy.append(cluster_tag)
for host in obj.host:
# Skip non-host
if not hasattr(host, 'vm'):
continue
self.pool.apply_async(
self._cache_morlist_raw_atomic,
args=(i_key, 'host', host, tags_copy, regexes, include_only_marked)
watched_mor = dict(
mor_type='vm', mor=obj, hostname=obj.name, tags=tags + ['vsphere_type:vm']
)
self.morlist_raw[instance_key].append(watched_mor)

else:
self.log.error(u"Unrecognized object %s", obj)

elif obj_type == 'host':
# Init recursion
self.pool.apply_async(
browse_mor,
args=(obj, tags, 0)
)

@staticmethod
def _is_excluded(obj, regexes, include_only_marked):
"""
Return `True` if the given host or virtual machine is excluded by the user configuration,
i.e. violates any of the following rules:
* Do not match the corresponding `*_include_only` regular expressions
* Is "non-labeled" while `include_only_marked` is enabled (virtual machine only)
"""
# Host
if isinstance(obj, vim.HostSystem):
# Based on `host_include_only_regex`
if regexes and regexes.get('host_include') is not None:
match = re.search(regexes['host_include'], obj.name)
if not match:
self.log.debug(u"Filtered out VM {0} because of host_include_only_regex".format(obj.name))
return
watched_mor = dict(mor_type='host', mor=obj, hostname=obj.name, tags=tags_copy+['vsphere_type:host'])
self.morlist_raw[i_key].append(watched_mor)

host_tag = "vsphere_host:%s" % obj.name
tags_copy.append(host_tag)
for vm in obj.vm:
if vm.runtime.powerState != 'poweredOn':
continue
self.pool.apply_async(
self._cache_morlist_raw_atomic,
args=(i_key, 'vm', vm, tags_copy, regexes, include_only_marked)
)
return True

elif obj_type == 'vm':
# VirtualMachine
elif isinstance(obj, vim.VirtualMachine):
# Based on `vm_include_only_regex`
if regexes and regexes.get('vm_include') is not None:
match = re.search(regexes['vm_include'], obj.name)
if not match:
self.log.debug(u"Filtered out VM {0} because of vm_include_only_regex".format(obj.name))
return
# Also, if include_only_marked is true, then check if there exists a
# custom field with the value DatadogMonitored
return True

# Based on `include_only_marked`
if include_only_marked:
monitored = False
for field in obj.customValue:
if field.value == VM_MONITORING_FLAG:
monitored = True
break # we shall monitor
if not monitored:
self.log.debug(u"Filtered out VM {0} because of include_only_marked".format(obj.name))
return

watched_mor = dict(mor_type='vm', mor=obj, hostname=obj.name, tags=tags_copy+['vsphere_type:vm'])
self.morlist_raw[i_key].append(watched_mor)
return True

### <TEST-INSTRUMENTATION>
self.histogram('datadog.agent.vsphere.morlist_raw_atomic.time', t.total())
### </TEST-INSTRUMENTATION>
return False

def _cache_morlist_raw(self, instance):
""" Initiate the first layer to refresh self.morlist by queueing
_cache_morlist_raw_atomic on the rootFolder in a recursive/asncy approach
"""
Initiate the first layer to refresh the list of MORs (`self.morlist`).
Resolve the vCenter `rootFolder` and initiate hosts and virtual machines discovery.
"""

i_key = self._instance_key(instance)
Expand All @@ -658,10 +710,10 @@ def _cache_morlist_raw(self, instance):
'vm_include': instance.get('vm_include_only_regex')
}
include_only_marked = _is_affirmative(instance.get('include_only_marked', False))
self.pool.apply_async(
self._cache_morlist_raw_atomic,
args=(i_key, 'rootFolder', root_folder, [instance_tag], regexes, include_only_marked)
)

# Discover hosts and virtual machines
self._discover_mor(i_key, root_folder, [instance_tag], regexes, include_only_marked)

self.cache_times[i_key][MORLIST][LAST] = time.time()

@atomic_method
Expand Down Expand Up @@ -800,12 +852,19 @@ def _collect_metrics_atomic(self, instance, mor):
value = self._transform_value(instance, result.id.counterId, result.value[0])

# Metric types are absolute, delta, and rate
if ALL_METRICS[self.metrics_metadata[i_key][result.id.counterId]['name']]['s_type'] == 'rate':
metric_name = self.metrics_metadata[i_key][result.id.counterId]['name']

if metric_name not in ALL_METRICS:
self.log.debug(u"Skipping unknown `%s` metric.", metric_name)
continue

if ALL_METRICS[metric_name]['s_type'] == 'rate':
record_metric = self.rate
else:
record_metric = self.gauge

record_metric(
"vsphere.%s" % self.metrics_metadata[i_key][result.id.counterId]['name'],
"vsphere.%s" % metric_name,
value,
hostname=mor['hostname'],
tags=['instance:%s' % instance_name]
Expand Down
1 change: 1 addition & 0 deletions tests/checks/fixtures/vsphere/vsphere_topology.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"childEntity": [{"hostFolder": {"childEntity": [{"host": [{"spec": "HostSystem", "name": "host1", "vm": []}, {"spec": "HostSystem", "name": "host2", "vm": []}], "spec": "ClusterComputeResource", "name": "compute_resource1"}]}, "spec": "Datacenter", "name": "datacenter1"}, {"childEntity": [{"hostFolder": {"childEntity": [{"host": [{"vm": [{"runtime": {"powerState": "poweredOn"}, "spec": "VirtualMachine", "name": "vm1"}, {"runtime": {"powerState": "poweredOn"}, "spec": "VirtualMachine", "name": "vm2", "label": true}, {"runtime": {"powerState": "poweredOff"}, "spec": "VirtualMachine", "name": "vm3", "label": true}, {"runtime": {"powerState": "poweredOn"}, "spec": "VirtualMachine", "name": "vm4", "label": true}], "spec": "HostSystem", "name": "host3"}], "spec": "ClusterComputeResource", "name": "compute_resource2"}]}, "spec": "Datacenter", "name": "datacenter2"}], "spec": "Folder", "name": "folder1"}], "spec": "Folder", "name": "rootFolder"}
Loading

0 comments on commit e8770f9

Please sign in to comment.