Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync playsviews to memcache/elastic kava scripts #13021

Open
wants to merge 3 commits into
base: Ursa-21.6.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions alpha/scripts/kava/monitorPlaysViewsMemcache.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
<?php

require_once(__DIR__ . '/../../apps/kaltura/lib/reports/kDruidBase.php');
require_once(__DIR__ . '/playsViewsCommon.php');
require_once(__DIR__ . '/../../../../../kava-utils/lib/Utils.php');

require_once (dirname(__FILE__).'/../bootstrap.php');

define('MAX_PLAYS_VIEWS_LAG', 129600); // 36 hours

class playsViewsQueries extends kKavaBase
{
public static function getTopPlays($fromTime, $toTime, $entryIds = null, $threshold = 10)
{
$response = self::runPlaysViewsQuery($fromTime, $toTime, self::EVENT_TYPE_PLAY, $threshold, $entryIds);
if (!isset($response[0][self::DRUID_RESULT]))
{
return false;
}

$result = array();
foreach ($response[0][self::DRUID_RESULT] as $cur)
{
$entryId = $cur[self::DIMENSION_ENTRY_ID];
$count = $cur[self::METRIC_COUNT];

$result[$entryId] = $count;
}

return $result;
}

protected static function runPlaysViewsQuery($fromTime, $toTime, $eventType, $threshold = 1000000, $entryIds = null)
{

$query = array(
self::DRUID_QUERY_TYPE => self::DRUID_TOPN,
self::DRUID_DATASOURCE => self::DATASOURCE_HISTORICAL,
self::DRUID_INTERVALS => self::getIntervals($fromTime, $toTime),
self::DRUID_GRANULARITY => self::DRUID_GRANULARITY_ALL,
self::DRUID_CONTEXT => self::getDruidContext(),
self::DRUID_FILTER => self::getDruidFilter($eventType, $entryIds),
self::DRUID_DIMENSION => self::DIMENSION_ENTRY_ID,
self::DRUID_AGGR => array(self::getLongSumAggregator(self::METRIC_COUNT, self::METRIC_COUNT)),
self::DRUID_METRIC => self::METRIC_COUNT,
self::DRUID_THRESHOLD => $threshold
);

try
{
return self::runQuery($query);
}
catch (Exception $ex)
{
return false;
}
}

public static function runPlaysViewsGraphQuery($fromTime, $toTime, $eventType, $entryIds = null)
{
$query = array(
self::DRUID_QUERY_TYPE => self::DRUID_TIMESERIES,
self::DRUID_DATASOURCE => self::DATASOURCE_HISTORICAL,
self::DRUID_INTERVALS => self::getIntervals($fromTime, $toTime),
self::DRUID_GRANULARITY => self::getGranularityPeriod('PT1H'),
self::DRUID_CONTEXT => self::getDruidContext(),
self::DRUID_FILTER => self::getDruidFilter($eventType, $entryIds),
self::DRUID_AGGR => array(self::getLongSumAggregator(self::METRIC_COUNT, self::METRIC_COUNT)),
);

$graph = self::runQuery($query);
$result = array();
if ($graph)
{
foreach ($graph as $curItem) {
$timestamp = $curItem[self::DRUID_TIMESTAMP];
$count = $curItem[self::DRUID_RESULT][self::METRIC_COUNT];
$result[$timestamp] = $count;
}
}
return $result;
}

protected static function getDruidContext()
{
return array(
self::DRUID_COMMENT => gethostname() . '[playsview]'
);
}

protected static function getDruidFilter($eventType, $entryIds)
{
$filter = self::getSelectorFilter(self::DIMENSION_EVENT_TYPE, $eventType);
if ($entryIds)
{
$filter = self::getAndFilter(array($filter,
self::getInFilter(self::DIMENSION_ENTRY_ID, $entryIds)));
}

return $filter;
}
}
// parse the command line
if ($argc < 2)
{
echo "Usage:\n\t" . basename(__file__) . " <id> [<plays views base>]\n";
exit(1);
}

$id = $argv[1];
$memcache = getenv(MEMCACHE_VAR . "_$id");
list($memcacheHost, $memcachePort) = explode(':', $memcache);

$baseFolder = isset($argv[2]) ? $argv[2] : null;

// connect to memcache
$memc = new kInfraMemcacheCacheWrapper();
$ret = $memc->init(array('host'=>$memcacheHost, 'port'=>$memcachePort));
if (!$ret)
{
Utils::writeLog("Failed to connect to cache host {$memcacheHost} port {$memcachePort}");
exit(1);
}

// get last played at
$lastPlayedAt = $memc->get(MEMC_KEY_LAST_PLAYED_AT);
if (!$lastPlayedAt)
{
Utils::writeLog('Error: failed to get last played at from memcache');
exit(1);
}

$lag = time() - $lastPlayedAt;
if ($lag > MAX_PLAYS_VIEWS_LAG)
{
Utils::writeLog("Error: last played at is lagging $lag seconds");
}

$lastPlayedAt += 3600;

$fromTime = $lastPlayedAt - 7 * 86400;
$toTime = $lastPlayedAt;

// get top plays from druid (using 2 topn queries to get accurate results)
$druidPlays = playsViewsQueries::getTopPlays($fromTime, $toTime);
if (!$druidPlays)
{
Utils::writeLog('Error: failed to get top plays from druid (1)');
exit(1);
}

$druidPlays = playsViewsQueries::getTopPlays($fromTime, $toTime, array_keys($druidPlays));
if (!$druidPlays)
{
Utils::writeLog('Error: failed to get top plays from druid (2)');
exit(1);
}

// get plays from memcache
$keys = array();
foreach (array_keys($druidPlays) as $entryId)
{
$keys[] = MEMC_KEY_PREFIX . $entryId;
}

$memcValues = $memc->multiGet($keys);
if (!$memcValues)
{
Utils::writeLog('Error: failed to get plays/views from memcache');
exit(1);
}

$memcPlays = array();
foreach ($memcValues as $key => $value)
{
$entryId = substr($key, strlen(MEMC_KEY_PREFIX));
$fields = json_decode($value, true);
$memcPlays[$entryId] = isset($fields['plays_7days']) ? $fields['plays_7days'] : 0;
}

// compare
$firstDiff = true;
foreach ($druidPlays as $entryId => $druidPlay)
{
$entryId = normalizeEntryId($entryId);
if (!$entryId)
{
continue;
}

$memcachePlay = $memcPlays[$entryId];
if ($memcachePlay == $druidPlay)
{
continue;
}

Utils::writeLog("Error: non-matching plays for entry $entryId, druid=$druidPlay memcache=$memcachePlay");

if (!$baseFolder)
{
continue;
}

if (!$firstDiff)
{
continue;
}

$firstDiff = false;

// try to find the source of the discrepancy by comparing hour-by-hour
$graph = playsViewsQueries::runPlaysViewsGraphQuery($fromTime, $toTime, kKavaBase::EVENT_TYPE_PLAY, array($entryId));
foreach ($graph as $timestamp => $druidCount)
{
$timestamp = strtr(substr($timestamp, 0, 13), 'T', '-');
list($year, $month, $day, $hour) = explode('-', $timestamp);
$filePath = "$baseFolder/$year/$month/$day/playsviews-$year-$month-$day-$hour.gz";

if (substr($baseFolder, 0, strlen('s3://')) == "s3://")
{
$savedResult = shell_exec("aws s3 cp $filePath - | zgrep $entryId");
}
else
{
$savedResult = shell_exec("zgrep $entryId $filePath");
}

if ($savedResult)
{
$savedResult = explode(',', trim($savedResult));
$savedCount = $savedResult[1];
}
else
{
$savedCount = 0;
}

if ($savedCount != $druidCount)
{
Utils::writeLog("Error: found mismatch for entry $entryId in file $filePath, savedCount=$savedCount druidCount=$druidCount");
}
}
}
36 changes: 36 additions & 0 deletions alpha/scripts/kava/playsViewsCommon.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,39 @@

define('CONF_TOPICS_PATH', 'analytics_plays_views_topics_path');
define('PLAYSVIEWS_TOPIC', 'playsViews');
define('MEMC_KEY_LAST_PLAYED_AT', 'plays_views_last_played_at');
define('MEMC_KEY_PREFIX', 'plays_views_');
define('CLUSTER_ID_VAR', 'CLUSTER_ID');
define('BULK_SIZE_VAR', 'BULK_SIZE');
define('MEMCACHE_VAR', 'MEMCACHE');
define('QC_MEMCACHE_VAR', 'QC_MEMCACHE');

function normalizeEntryId($entryId)
{
// modern entry id - 0_abcd1234
if (preg_match('/^[0-9]_[0-9a-z]{8}$/D', $entryId))
{
return strtolower($entryId);
}

// old entry id - a1b2c3d4e5
if (preg_match('/^[0-9a-z]{10}$/D', $entryId))
{
return strtolower($entryId);
}

// antique entry id - 12345
if (preg_match('/^[0-9]{1,6}$/D', $entryId))
{
return null;
}

// special entry id - _KMCLOGO
if (preg_match('/^_KMCLOGO\d?$/D', $entryId))
{
return $entryId;
}

return null;
}

9 changes: 5 additions & 4 deletions alpha/scripts/kava/playsViewsElasticConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ protected function processMessage($message)
}

// parse the command line
if ($argc < 3)
if ($argc < 2)
{
echo "Usage:\n\t" . basename(__file__) . " <consumerId> <bulk_size>\n";
echo "Usage:\n\t" . basename(__file__) . " <id>\n";
exit(1);
}

$consumerId = $argv[1];
$bulkSize = $argv[2];
$id = $argv[1];
$consumerId = getenv(CLUSTER_ID_VAR . "_$id");
$bulkSize = getenv(BULK_SIZE_VAR);

try
{
Expand Down
11 changes: 4 additions & 7 deletions alpha/scripts/kava/playsViewsMemcacheConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

define('QUERY_CACHE_KEY_PREFIX', 'QCI-entry:id=');
define('QUERY_CACHE_KEY_EXPIRY', 90000);
define('MEMC_KEY_PREFIX', 'plays_views_');
define('MEMC_KEY_LAST_PLAYED_AT', 'plays_views_last_played_at');


class playsViewsMemcacheConsumer extends BaseConsumer
{
Expand Down Expand Up @@ -58,13 +55,13 @@ protected function processMessage($message)
// parse the command line
if ($argc < 2)
{
echo "Usage:\n\t" . basename(__file__) . " <memcache host>:<memcache port> [<qc1 memcache host>:<qc1 memcache port>,<qc2 memcache host>:<qc2 memcache port>]\n";
echo "For example, php " . basename(__file__) . " localhost:11211\n";
echo "Usage:\n\t" . basename(__file__) . " <id>\n";
exit(1);
}

$memcache = $argv[1];
$queryCacheMemcaches = isset($argv[2]) ? $argv[2] : null;
$id = $argv[1];
$memcache = getenv(MEMCACHE_VAR . "_$id");
$queryCacheMemcaches = getenv(QC_MEMCACHE_VAR . "_$id");

try
{
Expand Down
Loading