From b5ec1eb4c6d5e62037297ea50112f952b001a204 Mon Sep 17 00:00:00 2001 From: olampert Date: Tue, 17 Dec 2024 21:54:43 +0200 Subject: [PATCH 1/3] get elastic/memcache conf from env variables. add syncMemcache monitor. add liveness prob php script --- .../kava/monitorPlaysViewsMemcache.php | 243 ++++++++++++++++++ alpha/scripts/kava/playsViewsCommon.php | 36 +++ .../kava/playsViewsElasticConsumer.php | 9 +- .../kava/playsViewsMemcacheConsumer.php | 11 +- alpha/scripts/kava/systemUtils.php | 89 +++++++ 5 files changed, 377 insertions(+), 11 deletions(-) create mode 100644 alpha/scripts/kava/monitorPlaysViewsMemcache.php create mode 100644 alpha/scripts/kava/systemUtils.php diff --git a/alpha/scripts/kava/monitorPlaysViewsMemcache.php b/alpha/scripts/kava/monitorPlaysViewsMemcache.php new file mode 100644 index 0000000000..77c827345a --- /dev/null +++ b/alpha/scripts/kava/monitorPlaysViewsMemcache.php @@ -0,0 +1,243 @@ + self::DRUID_TOPN, + self::DRUID_DATASOURCE => self::DATASOURCE_HISTORICAL, + self::DRUID_INTERVALS => self::getIntervals($fromTime, $toTime), + self::DRUID_GRANULARITY => self::DRUID_GRANULARITY_ALL, + self::DRUID_CONTEXT => self::getDruidContext(), + self::DRUID_FILTER => self::getDruidFilter($eventType, $entryIds), + self::DRUID_DIMENSION => self::DIMENSION_ENTRY_ID, + self::DRUID_AGGR => array(self::getLongSumAggregator(self::METRIC_COUNT, self::METRIC_COUNT)), + self::DRUID_METRIC => self::METRIC_COUNT, + self::DRUID_THRESHOLD => $threshold + ); + + try + { + return self::runQuery($query); + } + catch (Exception $ex) + { + return false; + } + } + + public static function runPlaysViewsGraphQuery($fromTime, $toTime, $eventType, $entryIds = null) + { + $query = array( + self::DRUID_QUERY_TYPE => self::DRUID_TIMESERIES, + self::DRUID_DATASOURCE => self::DATASOURCE_HISTORICAL, + self::DRUID_INTERVALS => self::getIntervals($fromTime, $toTime), + self::DRUID_GRANULARITY => self::getGranularityPeriod('PT1H'), + self::DRUID_CONTEXT => self::getDruidContext(), + self::DRUID_FILTER => self::getDruidFilter($eventType, $entryIds), + self::DRUID_AGGR => array(self::getLongSumAggregator(self::METRIC_COUNT, self::METRIC_COUNT)), + ); + + $graph = self::runQuery($query); + $result = array(); + if ($graph) + { + foreach ($graph as $curItem) { + $timestamp = $curItem[self::DRUID_TIMESTAMP]; + $count = $curItem[self::DRUID_RESULT][self::METRIC_COUNT]; + $result[$timestamp] = $count; + } + } + return $result; + } + + protected static function getDruidContext() + { + return array( + self::DRUID_COMMENT => gethostname() . '[playsview]' + ); + } + + protected static function getDruidFilter($eventType, $entryIds) + { + $filter = self::getSelectorFilter(self::DIMENSION_EVENT_TYPE, $eventType); + if ($entryIds) + { + $filter = self::getAndFilter(array($filter, + self::getInFilter(self::DIMENSION_ENTRY_ID, $entryIds))); + } + + return $filter; + } +} +// parse the command line +if ($argc < 2) +{ + echo "Usage:\n\t" . basename(__file__) . " []\n"; + exit(1); +} + +$id = $argv[1]; +$memcache = getenv(MEMCACHE_VAR . "_$id"); +list($memcacheHost, $memcachePort) = explode(':', $memcache); + +$baseFolder = isset($argv[2]) ? $argv[2] : null; + +// connect to memcache +$memc = new kInfraMemcacheCacheWrapper(); +$ret = $memc->init(array('host'=>$memcacheHost, 'port'=>$memcachePort)); +if (!$ret) +{ + Utils::writeLog("Failed to connect to cache host {$memcacheHost} port {$memcachePort}"); + exit(1); +} + +// get last played at +$lastPlayedAt = $memc->get(MEMC_KEY_LAST_PLAYED_AT); +if (!$lastPlayedAt) +{ + Utils::writeLog('Error: failed to get last played at from memcache'); + exit(1); +} + +$lag = time() - $lastPlayedAt; +if ($lag > MAX_PLAYS_VIEWS_LAG) +{ + Utils::writeLog("Error: last played at is lagging $lag seconds"); +} + +$lastPlayedAt += 3600; + +$fromTime = $lastPlayedAt - 7 * 86400; +$toTime = $lastPlayedAt; + +// get top plays from druid (using 2 topn queries to get accurate results) +$druidPlays = playsViewsQueries::getTopPlays($fromTime, $toTime); +if (!$druidPlays) +{ + Utils::writeLog('Error: failed to get top plays from druid (1)'); + exit(1); +} + +$druidPlays = playsViewsQueries::getTopPlays($fromTime, $toTime, array_keys($druidPlays)); +if (!$druidPlays) +{ + Utils::writeLog('Error: failed to get top plays from druid (2)'); + exit(1); +} + +// get plays from memcache +$keys = array(); +foreach (array_keys($druidPlays) as $entryId) +{ + $keys[] = MEMC_KEY_PREFIX . $entryId; +} + +$memcValues = $memc->multiGet($keys); +if (!$memcValues) +{ + Utils::writeLog('Error: failed to get plays/views from memcache'); + exit(1); +} + +$memcPlays = array(); +foreach ($memcValues as $key => $value) +{ + $entryId = substr($key, strlen(MEMC_KEY_PREFIX)); + $fields = json_decode($value, true); + $memcPlays[$entryId] = isset($fields['plays_7days']) ? $fields['plays_7days'] : 0; +} + +// compare +$firstDiff = true; +foreach ($druidPlays as $entryId => $druidPlay) +{ + $entryId = normalizeEntryId($entryId); + if (!$entryId) + { + continue; + } + + $memcachePlay = $memcPlays[$entryId]; + if ($memcachePlay == $druidPlay) + { + continue; + } + + Utils::writeLog("Error: non-matching plays for entry $entryId, druid=$druidPlay memcache=$memcachePlay"); + + if (!$baseFolder) + { + continue; + } + + if (!$firstDiff) + { + continue; + } + + $firstDiff = false; + + // try to find the source of the discrepancy by comparing hour-by-hour + $graph = playsViewsQueries::runPlaysViewsGraphQuery($fromTime, $toTime, kKavaBase::EVENT_TYPE_PLAY, array($entryId)); + foreach ($graph as $timestamp => $druidCount) + { + $timestamp = strtr(substr($timestamp, 0, 13), 'T', '-'); + list($year, $month, $day, $hour) = explode('-', $timestamp); + $filePath = "$baseFolder/$year/$month/$day/playsviews-$year-$month-$day-$hour.gz"; + + if (substr($baseFolder, 0, strlen('s3://')) == "s3://") + { + $savedResult = shell_exec("aws s3 cp $filePath - | zgrep $entryId"); + } + else + { + $savedResult = shell_exec("zgrep $entryId $filePath"); + } + + if ($savedResult) + { + $savedResult = explode(',', trim($savedResult)); + $savedCount = $savedResult[1]; + } + else + { + $savedCount = 0; + } + + if ($savedCount != $druidCount) + { + Utils::writeLog("Error: found mismatch for entry $entryId in file $filePath, savedCount=$savedCount druidCount=$druidCount"); + } + } +} diff --git a/alpha/scripts/kava/playsViewsCommon.php b/alpha/scripts/kava/playsViewsCommon.php index a5346373d8..fc06a7eba6 100644 --- a/alpha/scripts/kava/playsViewsCommon.php +++ b/alpha/scripts/kava/playsViewsCommon.php @@ -2,3 +2,39 @@ define('CONF_TOPICS_PATH', 'analytics_plays_views_topics_path'); define('PLAYSVIEWS_TOPIC', 'playsViews'); +define('MEMC_KEY_LAST_PLAYED_AT', 'plays_views_last_played_at'); +define('MEMC_KEY_PREFIX', 'plays_views_'); +define('CLUSTER_ID_VAR', 'CLUSTER_ID'); +define('BULK_SIZE_VAR', 'BULK_SIZE'); +define('MEMCACHE_VAR', 'MEMCACHE'); +define('QC_MEMCACHE_VAR', 'QC_MEMCACHE'); + +function normalizeEntryId($entryId) +{ + // modern entry id - 0_abcd1234 + if (preg_match('/^[0-9]_[0-9a-z]{8}$/D', $entryId)) + { + return strtolower($entryId); + } + + // old entry id - a1b2c3d4e5 + if (preg_match('/^[0-9a-z]{10}$/D', $entryId)) + { + return strtolower($entryId); + } + + // antique entry id - 12345 + if (preg_match('/^[0-9]{1,6}$/D', $entryId)) + { + return null; + } + + // special entry id - _KMCLOGO + if (preg_match('/^_KMCLOGO\d?$/D', $entryId)) + { + return $entryId; + } + + return null; +} + diff --git a/alpha/scripts/kava/playsViewsElasticConsumer.php b/alpha/scripts/kava/playsViewsElasticConsumer.php index be76d97d5a..d52298b586 100644 --- a/alpha/scripts/kava/playsViewsElasticConsumer.php +++ b/alpha/scripts/kava/playsViewsElasticConsumer.php @@ -50,14 +50,15 @@ protected function processMessage($message) } // parse the command line -if ($argc < 3) +if ($argc < 2) { - echo "Usage:\n\t" . basename(__file__) . " \n"; + echo "Usage:\n\t" . basename(__file__) . " \n"; exit(1); } -$consumerId = $argv[1]; -$bulkSize = $argv[2]; +$id = $argv[1]; +$consumerId = getenv(CLUSTER_ID_VAR . "_$id"); +$bulkSize = getenv(BULK_SIZE_VAR); try { diff --git a/alpha/scripts/kava/playsViewsMemcacheConsumer.php b/alpha/scripts/kava/playsViewsMemcacheConsumer.php index 78bca2ca0e..8646ca9791 100644 --- a/alpha/scripts/kava/playsViewsMemcacheConsumer.php +++ b/alpha/scripts/kava/playsViewsMemcacheConsumer.php @@ -6,9 +6,6 @@ define('QUERY_CACHE_KEY_PREFIX', 'QCI-entry:id='); define('QUERY_CACHE_KEY_EXPIRY', 90000); -define('MEMC_KEY_PREFIX', 'plays_views_'); -define('MEMC_KEY_LAST_PLAYED_AT', 'plays_views_last_played_at'); - class playsViewsMemcacheConsumer extends BaseConsumer { @@ -58,13 +55,13 @@ protected function processMessage($message) // parse the command line if ($argc < 2) { - echo "Usage:\n\t" . basename(__file__) . " : [:,:]\n"; - echo "For example, php " . basename(__file__) . " localhost:11211\n"; + echo "Usage:\n\t" . basename(__file__) . " \n"; exit(1); } -$memcache = $argv[1]; -$queryCacheMemcaches = isset($argv[2]) ? $argv[2] : null; +$id = $argv[1]; +$memcache = getenv(MEMCACHE_VAR . "_$id"); +$queryCacheMemcaches = getenv(QC_MEMCACHE_VAR . "_$id"); try { diff --git a/alpha/scripts/kava/systemUtils.php b/alpha/scripts/kava/systemUtils.php new file mode 100644 index 0000000000..caafe44104 --- /dev/null +++ b/alpha/scripts/kava/systemUtils.php @@ -0,0 +1,89 @@ + "systemUtils::pingMemcache", + self::ELASTIC => "systemUtils::pingElastic", + ); + + public static function getHealthCheckInfo($checkedComponents) + { + $healthCheckArray = array(); + foreach ($checkedComponents as $component) { + if (isset(self::$checkFunc[$component])) { + $healthCheckArray[$component] = call_user_func(self::$checkFunc[$component]); + } + } + + $result = ''; + foreach ($healthCheckArray as $component => $res) { + if (!$res) { + $result .= "health check failed for $component " . PHP_EOL; + } + } + + return $result; + } + + public static function pingElastic() + { + $elasticHost = kConf::get('elasticHost', 'elastic', null); + if(!$elasticHost) + { + $elasticHost = self::ELASTIC_HOST; + } + $elasticPort = kConf::get('elasticPort', 'elastic', null); + if(!$elasticPort) + { + $elasticPort = self::ELASTIC_PORT; + } + try + { + $url = 'http://' . $elasticHost . ':' . $elasticPort . self::ELASTIC_HEALTH_CHECK; + $elasticHealth = json_decode(KCurlWrapper::getContent($url), true); + if(!isset($elasticHealth['status']) || $elasticHealth['status'] == 'red') + { + return 0; + } + } + catch(Exception $e) + { + return 0; + } + return 1; + } + + public static function pingMemcache() + { + $memcache = getenv('MEMCACHE_1'); + list($memcacheHost, $memcachePort) = explode(':', $memcache); + + $memc = new kInfraMemcacheCacheWrapper(); + return $memc->init(array('host'=>$memcacheHost, 'port'=>$memcachePort)); + } + +} + +if ($argc < 2) +{ + echo "Usage:\n\t" . basename(__file__) . " \n"; + exit(1); +} + +$components = explode(",", $argv[1]); +$errMessage = systemUtils::getHealthCheckInfo($components); +if ($errMessage) +{ + echo $errMessage; + exit(1); +} +exit(0); From 6801415e43908305ac81524b495c45dcea680626 Mon Sep 17 00:00:00 2001 From: olampert Date: Tue, 17 Dec 2024 21:58:45 +0200 Subject: [PATCH 2/3] get elastic/memcache conf from env variables. add syncMemcache monitor. add liveness prob php script --- alpha/scripts/kava/monitorPlaysViewsMemcache.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alpha/scripts/kava/monitorPlaysViewsMemcache.php b/alpha/scripts/kava/monitorPlaysViewsMemcache.php index 77c827345a..2801ad025d 100644 --- a/alpha/scripts/kava/monitorPlaysViewsMemcache.php +++ b/alpha/scripts/kava/monitorPlaysViewsMemcache.php @@ -1,6 +1,6 @@ Date: Wed, 25 Dec 2024 00:04:25 +0200 Subject: [PATCH 3/3] code review --- alpha/scripts/kava/systemUtils.php | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/alpha/scripts/kava/systemUtils.php b/alpha/scripts/kava/systemUtils.php index caafe44104..12d99ff455 100644 --- a/alpha/scripts/kava/systemUtils.php +++ b/alpha/scripts/kava/systemUtils.php @@ -5,28 +5,32 @@ class systemUtils { const MEMCACHE = 'MEMCACHE'; - const ELASTIC = "ELASTIC"; + const ELASTIC = 'ELASTIC'; const ELASTIC_HOST = '127.0.0.1'; const ELASTIC_PORT = '9200'; const ELASTIC_HEALTH_CHECK = '/_cluster/health?pretty'; protected static $checkFunc = array( - self::MEMCACHE => "systemUtils::pingMemcache", - self::ELASTIC => "systemUtils::pingElastic", + self::MEMCACHE => 'systemUtils::pingMemcache', + self::ELASTIC => 'systemUtils::pingElastic', ); public static function getHealthCheckInfo($checkedComponents) { $healthCheckArray = array(); - foreach ($checkedComponents as $component) { - if (isset(self::$checkFunc[$component])) { + foreach ($checkedComponents as $component) + { + if (isset(self::$checkFunc[$component])) + { $healthCheckArray[$component] = call_user_func(self::$checkFunc[$component]); } } $result = ''; - foreach ($healthCheckArray as $component => $res) { - if (!$res) { + foreach ($healthCheckArray as $component => $res) + { + if (!$res) + { $result .= "health check failed for $component " . PHP_EOL; } } @@ -37,12 +41,12 @@ public static function getHealthCheckInfo($checkedComponents) public static function pingElastic() { $elasticHost = kConf::get('elasticHost', 'elastic', null); - if(!$elasticHost) + if (!$elasticHost) { $elasticHost = self::ELASTIC_HOST; } $elasticPort = kConf::get('elasticPort', 'elastic', null); - if(!$elasticPort) + if (!$elasticPort) { $elasticPort = self::ELASTIC_PORT; } @@ -50,12 +54,12 @@ public static function pingElastic() { $url = 'http://' . $elasticHost . ':' . $elasticPort . self::ELASTIC_HEALTH_CHECK; $elasticHealth = json_decode(KCurlWrapper::getContent($url), true); - if(!isset($elasticHealth['status']) || $elasticHealth['status'] == 'red') + if (!isset($elasticHealth['status']) || $elasticHealth['status'] == 'red') { return 0; } } - catch(Exception $e) + catch (Exception $e) { return 0; }