Skip to content

Commit

Permalink
Merge branch 'master' into add-buildrecord-cache-store
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed May 31, 2024
2 parents f0e5f64 + b1bdb94 commit 5e42f63
Show file tree
Hide file tree
Showing 19 changed files with 351 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,25 @@ class MetricsController {
@Get(uri = "/v1alpha2/metrics/builds", produces = MediaType.APPLICATION_JSON)
HttpResponse<?> getBuildsMetrics(@Nullable @QueryValue String date, @Nullable @QueryValue String org) {
if(!date && !org)
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_BUILDS))
return HttpResponse.ok(metricsService.getAllOrgCount(MetricConstants.PREFIX_BUILDS))
validateQueryParams(date)
final count = metricsService.getBuildsMetrics(date, org)
return HttpResponse.ok(new GetBuildsCountResponse(count))
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_BUILDS, date, org))
}

@Get(uri = "/v1alpha2/metrics/pulls", produces = MediaType.APPLICATION_JSON)
HttpResponse<?> getPullsMetrics(@Nullable @QueryValue String date, @Nullable @QueryValue String org) {
if(!date && !org)
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_PULLS))
return HttpResponse.ok(metricsService.getAllOrgCount(MetricConstants.PREFIX_PULLS))
validateQueryParams(date)
final count = metricsService.getPullsMetrics(date, org)
return HttpResponse.ok(new GetPullsCountResponse(count))
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_PULLS, date, org))
}

@Get(uri = "/v1alpha2/metrics/fusion/pulls", produces = MediaType.APPLICATION_JSON)
HttpResponse<?> getFusionPullsMetrics(@Nullable @QueryValue String date, @Nullable @QueryValue String org) {
if(!date && !org)
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_FUSION))
return HttpResponse.ok(metricsService.getAllOrgCount(MetricConstants.PREFIX_FUSION))
validateQueryParams(date)
final count = metricsService.getFusionPullsMetrics(date, org)
return HttpResponse.ok(new GetFusionPullsCountResponse(count))
return HttpResponse.ok(metricsService.getOrgCount(MetricConstants.PREFIX_FUSION, date, org))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ class RegistryProxyController {
}

MutableHttpResponse<?> fromDownloadResponse(final DelegateResponse resp, RoutePath route, Map<String, List<String>> headers) {
final blobCache = blobCacheService .retrieveBlobCache(route, headers)
log.debug "Blob cache $blobCache"
log.debug "== Blob cache upstream $resp"
final blobCache = blobCacheService .retrieveBlobCache(route, headers, resp.headers)
log.debug "== Blob cache response [succeeded=${blobCache.succeeded()}] $blobCache"
if( !blobCache.succeeded() ) {
final String msg = blobCache.logs ?: "Unable to cache blob ${blobCache.locationUri}"
return badRequest(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.seqera.wave.core

import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import io.micronaut.cache.annotation.Cacheable
import io.micronaut.context.annotation.Context
Expand Down Expand Up @@ -220,6 +221,7 @@ class RegistryProxyService {
: null
}

@ToString(includeNames = true, includePackage = false)
static class DelegateResponse {
int statusCode
Map<String,List<String>> headers
Expand Down
69 changes: 53 additions & 16 deletions src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.transform.ToString
import groovy.util.logging.Slf4j

/**
* Model a blob cache metadata entry
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@ToString(includePackage = false, includeNames = true)
@Canonical
@CompileStatic
Expand All @@ -43,6 +46,21 @@ class BlobCacheInfo {
*/
final Map<String,String> headers

/**
* The blob length
*/
final Long contentLength

/**
* The content type of this blob
*/
final String contentType

/**
* The blob cache control directive
*/
final String cacheControl

/**
* The instant when the cache request was created
*/
Expand Down Expand Up @@ -73,29 +91,37 @@ class BlobCacheInfo {
locationUri && completionTime!=null
}

String getContentType() {
headers?.find(it-> it.key.toLowerCase()=='content-type')?.value
}

String getCacheControl() {
headers?.find(it-> it.key.toLowerCase()=='cache-control')?.value
}

static BlobCacheInfo create(String locationUrl, Map<String,List<String>> headers) {
static BlobCacheInfo create(String locationUrl, Map<String,List<String>> request, Map<String,List<String>> response) {
final headers0 = new LinkedHashMap<String,String>()
for( Map.Entry<String,List<String>> it : headers )
for( Map.Entry<String,List<String>> it : request )
headers0.put( it.key, it.value.join(',') )
new BlobCacheInfo(locationUrl, headers0, Instant.now())
final length = headerLong0(response, 'Content-Length')
final type = headerString0(response, 'Content-Type')
final cache = headerString0(response, 'Cache-Control')
new BlobCacheInfo(locationUrl, headers0, length, type, cache, Instant.now(), null, null, null)
}

static String headerString0(Map<String,List<String>> headers, String name) {
headers?.find(it-> it.key.toLowerCase()==name.toLowerCase())?.value?.first()
}

static BlobCacheInfo create1(String locationUrl, Map<String,String> headers) {
new BlobCacheInfo(locationUrl, headers, Instant.now())
static Long headerLong0(Map<String,List<String>> headers, String name) {
try {
return headerString0(headers,name) as Long
}
catch (NumberFormatException e) {
log.warn "Unexpected content length value - cause: $e"
return null
}
}

BlobCacheInfo cached() {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
creationTime,
0)
Expand All @@ -105,6 +131,9 @@ class BlobCacheInfo {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
Instant.now(),
status,
Expand All @@ -115,25 +144,33 @@ class BlobCacheInfo {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
Instant.now(),
null,
logs)
logs
)
}

BlobCacheInfo withLocation(String uri) {
new BlobCacheInfo(
uri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
completionTime,
exitStatus,
logs)
logs
)
}

@Memoized
static BlobCacheInfo unknown() {
new BlobCacheInfo(null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null) {
new BlobCacheInfo(null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null) {
@Override
BlobCacheInfo withLocation(String uri) {
// prevent the change of location for unknown status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ interface BlobCacheService {
* information.
*
* @param route The HTTP request of a container layer blob
* @param headers The HTTP headers of a container layer blob
* @param requestHeaders The HTTP headers of the upstream request
* @param responseHeaders The HTTP headers of the response providing the blob to be cached
* @return
*/
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> headers)
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> requestHeaders, Map<String,List<String>> responseHeaders)

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ class BlobCacheServiceImpl implements BlobCacheService {
}

@Override
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> headers) {
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> requestHeaders, Map<String,List<String>> responseHeaders) {
final uri = blobDownloadUri(route)
log.trace "Container blob download uri: $uri"

final info = BlobCacheInfo.create(uri, headers)
final info = BlobCacheInfo.create(uri, requestHeaders, responseHeaders)
final target = route.targetPath
if( blobStore.storeIfAbsent(target, info) ) {
// start download and caching job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.counter.impl

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern

import groovy.transform.CompileStatic
import io.micronaut.context.annotation.Requires
Expand Down Expand Up @@ -49,10 +50,11 @@ class LocalCounterProvider implements CounterProvider {

@Override
Map<String, Long> getAllMatchingEntries(String key, String pattern) {
def keyStore = store.get(key)
Pattern compiledPattern = Pattern.compile(pattern.replace('*', '.*'))
Map keyStore = store.get(key)
Map<String, Long> result = [:]
if (keyStore){
def matchingPairs = keyStore.findAll { k, v -> k =~ pattern }
def matchingPairs = keyStore.findAll {entry -> compiledPattern.matcher(entry.key).matches()}
matchingPairs.each { k, v ->
result.put(k, v as Long)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,6 @@ import io.seqera.wave.tower.PlatformId
* @author Munish Chouhan <[email protected]>
*/
interface MetricsService {
/**
* get the Wave builds metrics
*
* @param date, date of the required metrics
* @param org, org of the required metrics
* @return Long, builds counts
*/
Long getBuildsMetrics(String date, String org)

/**
* get the Wave pulls metrics
*
* @param date, date of the required metrics
* @param org, org of the required metrics
* @return Long, pulls counts
*/
Long getPullsMetrics(String date, String org)

/**
* get the Wave fusion pulls metrics
*
* @param date, date of the required metrics
* @param org, org of the required metrics
* @return Long, fusion pulls counts
*/
Long getFusionPullsMetrics(String date, String org)

/**
* increment wave fusion pulls count
*
Expand Down Expand Up @@ -80,5 +53,13 @@ interface MetricsService {
* @param metric
* @return GetOrgCountResponse
*/
GetOrgCountResponse getOrgCount(String metric)
GetOrgCountResponse getAllOrgCount(String metric)

/**
* Get counts per organisations or per date or per both
*
* @param metric
* @return GetOrgCountResponse
*/
GetOrgCountResponse getOrgCount(String metric, String date, String org)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package io.seqera.wave.service.metric.impl

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.regex.Matcher
import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -40,28 +42,15 @@ import jakarta.inject.Singleton
@CompileStatic
class MetricsServiceImpl implements MetricsService {

@Inject
private MetricsCounterStore metricsCounterStore

DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

@Override
Long getBuildsMetrics(String date, String org) {
return metricsCounterStore.get(getKey(MetricConstants.PREFIX_BUILDS, date, org)) ?: 0
}
static final private DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd")

@Override
Long getPullsMetrics(String date, String org) {
return metricsCounterStore.get(getKey(MetricConstants.PREFIX_PULLS, date, org)) ?: 0
}
static final private Pattern ORG_DATE_KEY_PATTERN = Pattern.compile('(builds|pulls|fusion)/o/([^/]+)/d/\\d{4}-\\d{2}-\\d{2}')

@Override
Long getFusionPullsMetrics(String date, String org) {
return metricsCounterStore.get(getKey(MetricConstants.PREFIX_FUSION, date, org)) ?: 0
}
@Inject
private MetricsCounterStore metricsCounterStore

@Override
GetOrgCountResponse getOrgCount(String metric){
GetOrgCountResponse getAllOrgCount(String metric){
final response = new GetOrgCountResponse(metric, 0, [:])
final orgCounts = metricsCounterStore.getAllMatchingEntries("$metric/$MetricConstants.PREFIX_ORG/*")
for(def entry : orgCounts) {
Expand All @@ -75,6 +64,28 @@ class MetricsServiceImpl implements MetricsService {
return response
}

@Override
GetOrgCountResponse getOrgCount(String metric, String date, String org) {
final response = new GetOrgCountResponse(metric, 0, [:])

// count is stored per date and per org, so it can be extracted from get method
response.count = metricsCounterStore.get(getKey(metric, date, org)) ?: 0L

//when org and date is provided, return the org count for given date
if (org) {
response.orgs.put(org, response.count)
}else{
// when only date is provide, scan the store and return the count for all orgs on given date
final orgCounts = metricsCounterStore.getAllMatchingEntries("$metric/$MetricConstants.PREFIX_ORG/*/$MetricConstants.PREFIX_DAY/$date")
for(def entry : orgCounts) {
response.orgs.put(extractOrgFromKey(entry.key), entry.value)
}
}

return response

}

@Override
void incrementFusionPullsCounter(PlatformId platformId){
incrementCounter(MetricConstants.PREFIX_FUSION, platformId?.user?.email)
Expand All @@ -92,14 +103,14 @@ class MetricsServiceImpl implements MetricsService {

protected void incrementCounter(String prefix, String email) {
def org = getOrg(email)
def key = getKey(prefix, LocalDate.now().format(dateFormatter), null)
def key = getKey(prefix, LocalDate.now().format(DATE_FORMATTER), null)
metricsCounterStore.inc(key)
log.trace("increment metrics count of: $key")
if ( org ) {
key = getKey(prefix, null, org)
metricsCounterStore.inc(key)
log.trace("increment metrics count of: $key")
key = getKey(prefix, LocalDate.now().format(dateFormatter), org)
key = getKey(prefix, LocalDate.now().format(DATE_FORMATTER), org)
metricsCounterStore.inc(key)
log.trace("increment metrics count of: $key")
}
Expand Down Expand Up @@ -127,4 +138,8 @@ class MetricsServiceImpl implements MetricsService {
return null
}

protected static String extractOrgFromKey(String key) {
Matcher matcher = ORG_DATE_KEY_PATTERN.matcher(key)
return matcher.matches() ? matcher.group(2) : "unknown"
}
}
Loading

0 comments on commit 5e42f63

Please sign in to comment.