Skip to content

Commit

Permalink
Fix handling blob response headers (#504)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: munishchouhan <[email protected]>
  • Loading branch information
pditommaso and munishchouhan authored May 31, 2024
1 parent c98db8f commit de97575
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ class RegistryProxyController {
}

MutableHttpResponse<?> fromDownloadResponse(final DelegateResponse resp, RoutePath route, Map<String, List<String>> headers) {
final blobCache = blobCacheService .retrieveBlobCache(route, headers)
log.debug "Blob cache $blobCache"
log.debug "== Blob cache upstream $resp"
final blobCache = blobCacheService .retrieveBlobCache(route, headers, resp.headers)
log.debug "== Blob cache response [succeeded=${blobCache.succeeded()}] $blobCache"
if( !blobCache.succeeded() ) {
final String msg = blobCache.logs ?: "Unable to cache blob ${blobCache.locationUri}"
return badRequest(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.seqera.wave.core

import groovy.transform.CompileStatic
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import io.micronaut.cache.annotation.Cacheable
import io.micronaut.context.annotation.Context
Expand Down Expand Up @@ -220,6 +221,7 @@ class RegistryProxyService {
: null
}

@ToString(includeNames = true, includePackage = false)
static class DelegateResponse {
int statusCode
Map<String,List<String>> headers
Expand Down
69 changes: 53 additions & 16 deletions src/main/groovy/io/seqera/wave/service/blob/BlobCacheInfo.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.transform.ToString
import groovy.util.logging.Slf4j

/**
* Model a blob cache metadata entry
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@ToString(includePackage = false, includeNames = true)
@Canonical
@CompileStatic
Expand All @@ -43,6 +46,21 @@ class BlobCacheInfo {
*/
final Map<String,String> headers

/**
* The blob length
*/
final Long contentLength

/**
* The content type of this blob
*/
final String contentType

/**
* The blob cache control directive
*/
final String cacheControl

/**
* The instant when the cache request was created
*/
Expand Down Expand Up @@ -73,29 +91,37 @@ class BlobCacheInfo {
locationUri && completionTime!=null
}

String getContentType() {
headers?.find(it-> it.key.toLowerCase()=='content-type')?.value
}

String getCacheControl() {
headers?.find(it-> it.key.toLowerCase()=='cache-control')?.value
}

static BlobCacheInfo create(String locationUrl, Map<String,List<String>> headers) {
static BlobCacheInfo create(String locationUrl, Map<String,List<String>> request, Map<String,List<String>> response) {
final headers0 = new LinkedHashMap<String,String>()
for( Map.Entry<String,List<String>> it : headers )
for( Map.Entry<String,List<String>> it : request )
headers0.put( it.key, it.value.join(',') )
new BlobCacheInfo(locationUrl, headers0, Instant.now())
final length = headerLong0(response, 'Content-Length')
final type = headerString0(response, 'Content-Type')
final cache = headerString0(response, 'Cache-Control')
new BlobCacheInfo(locationUrl, headers0, length, type, cache, Instant.now(), null, null, null)
}

static String headerString0(Map<String,List<String>> headers, String name) {
headers?.find(it-> it.key.toLowerCase()==name.toLowerCase())?.value?.first()
}

static BlobCacheInfo create1(String locationUrl, Map<String,String> headers) {
new BlobCacheInfo(locationUrl, headers, Instant.now())
static Long headerLong0(Map<String,List<String>> headers, String name) {
try {
return headerString0(headers,name) as Long
}
catch (NumberFormatException e) {
log.warn "Unexpected content length value - cause: $e"
return null
}
}

BlobCacheInfo cached() {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
creationTime,
0)
Expand All @@ -105,6 +131,9 @@ class BlobCacheInfo {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
Instant.now(),
status,
Expand All @@ -115,25 +144,33 @@ class BlobCacheInfo {
new BlobCacheInfo(
locationUri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
Instant.now(),
null,
logs)
logs
)
}

BlobCacheInfo withLocation(String uri) {
new BlobCacheInfo(
uri,
headers,
contentLength,
contentType,
cacheControl,
creationTime,
completionTime,
exitStatus,
logs)
logs
)
}

@Memoized
static BlobCacheInfo unknown() {
new BlobCacheInfo(null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null) {
new BlobCacheInfo(null, null, null, null, null, Instant.ofEpochMilli(0), Instant.ofEpochMilli(0), null) {
@Override
BlobCacheInfo withLocation(String uri) {
// prevent the change of location for unknown status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ interface BlobCacheService {
* information.
*
* @param route The HTTP request of a container layer blob
* @param headers The HTTP headers of a container layer blob
* @param requestHeaders The HTTP headers of the upstream request
* @param responseHeaders The HTTP headers of the response providing the blob to be cached
* @return
*/
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> headers)
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> requestHeaders, Map<String,List<String>> responseHeaders)

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ class BlobCacheServiceImpl implements BlobCacheService {
}

@Override
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> headers) {
BlobCacheInfo retrieveBlobCache(RoutePath route, Map<String,List<String>> requestHeaders, Map<String,List<String>> responseHeaders) {
final uri = blobDownloadUri(route)
log.trace "Container blob download uri: $uri"

final info = BlobCacheInfo.create(uri, headers)
final info = BlobCacheInfo.create(uri, requestHeaders, responseHeaders)
final target = route.targetPath
if( blobStore.storeIfAbsent(target, info) ) {
// start download and caching job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class StreamServiceImpl implements StreamService {
// otherwise cache the blob and stream the resulting uri
if( blobCacheService ) {
log.debug "Streaming blob cache for route: $route"
final blobCache = blobCacheService .retrieveBlobCache(route, empty)
final blobCache = blobCacheService .retrieveBlobCache(route, empty, resp.headers)
if( blobCache.succeeded() ) {
return httpStream0(blobCache.locationUri)
}
Expand Down
107 changes: 74 additions & 33 deletions src/test/groovy/io/seqera/wave/service/blob/BlobCacheInfoTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,73 +30,88 @@ class BlobCacheInfoTest extends Specification {

def 'should create blob info' () {
expect:
BlobCacheInfo.create('http://foo.com', [:])
BlobCacheInfo.create('http://foo.com', [:], [:])
.locationUri == 'http://foo.com'
and:
BlobCacheInfo.create('http://foo.com', [:])
BlobCacheInfo.create('http://foo.com', [:], [:])
.headers == [:]
and:
BlobCacheInfo.create('http://foo.com', [Foo:['alpha'], Bar:['delta', 'gamma', 'omega']])
BlobCacheInfo.create('http://foo.com', [Foo:['alpha'], Bar:['delta', 'gamma', 'omega']], [:])
.headers == [Foo:'alpha', Bar: 'delta,gamma,omega']

and:
BlobCacheInfo.create1('http://foo.com', [Foo:'alpha', Bar:'beta'])
.headers == [Foo:'alpha', Bar: 'beta']

}

def 'should find content type' () {
expect:
BlobCacheInfo.create1('http:/foo', HEADERS ).getContentType() == EXPECTED
BlobCacheInfo.create('http://foo', [:], HEADERS ).getContentType() == EXPECTED

where:
HEADERS | EXPECTED
['Content-Type': 'alpha'] | 'alpha'
['Content-type': 'delta'] | 'delta'
['content-type': 'gamma'] | 'gamma'
['Content-Type': ['alpha']] | 'alpha'
['Content-type': ['delta']] | 'delta'
['content-type': ['gamma']] | 'gamma'

}

def 'should find content type' () {
def 'should find cache control' () {
expect:
BlobCacheInfo.create1('http:/foo', HEADERS ).getCacheControl() == EXPECTED
BlobCacheInfo.create('http://foo', [:], HEADERS ).getCacheControl() == EXPECTED

where:
HEADERS | EXPECTED
['Cache-Control': 'alpha'] | 'alpha'
['cache-control': 'delta'] | 'delta'
['CACHE-CONTROL': 'gamma'] | 'gamma'
HEADERS | EXPECTED
['Cache-Control': ['alpha']] | 'alpha'
['cache-control': ['delta']] | 'delta'
['CACHE-CONTROL': ['gamma']] | 'gamma'

}

def 'should find content length' () {
expect:
BlobCacheInfo.create('http://foo', [:], HEADERS ).getContentLength() == EXPECTED

where:
HEADERS | EXPECTED
[:] | null
['Content-Length': ['']] | null
['Content-Length': ['100']] | 100L
['content-length': ['200']] | 200L

}

def 'should complete blob info' () {
given:
def location = 'http://foo.com'
def headers = [Foo:'something']
def cache = BlobCacheInfo.create1(location, headers)
def headers = [Foo:['something']]
def response = ['Content-Length':['100'], 'Content-Type':['text'], 'Cache-Control': ['12345']]
def cache = BlobCacheInfo.create(location, headers, response)

when:
def result = cache.completed(0, 'OK')
then:
result.headers == headers
result.headers == [Foo:'something']
result.locationUri == 'http://foo.com'
result.creationTime == cache.creationTime
result.completionTime >= cache.creationTime
result.exitStatus == 0
result.logs == 'OK'
result.contentLength == 100L
result.contentType == 'text'
result.cacheControl == '12345'
and:
result.done()
result.succeeded()


when:
result = cache.completed(1, 'Oops')
then:
result.headers == headers
result.headers == [Foo:'something']
result.locationUri == 'http://foo.com'
result.creationTime == cache.creationTime
result.completionTime >= cache.creationTime
result.exitStatus == 1
result.contentLength == 100L
result.contentType == 'text'
result.cacheControl == '12345'
and:
result.done()
!result.succeeded()
Expand All @@ -105,17 +120,22 @@ class BlobCacheInfoTest extends Specification {
def 'should fail blob info' () {
given:
def location = 'http://foo.com'
def headers = [Foo:'something']
def cache = BlobCacheInfo.create1(location, headers)
def headers = [Foo:['something']]
def response = ['Content-Length':['100'], 'Content-Type':['text'], 'Cache-Control': ['12345']]
def cache = BlobCacheInfo.create(location, headers, response)

when:
def result = cache.failed('Oops')
then:
result.headers == headers
result.headers == [Foo:'something']
result.locationUri == 'http://foo.com'
result.creationTime == cache.creationTime
result.completionTime >= cache.creationTime
result.exitStatus == null
result.logs == 'Oops'
result.contentLength == 100L
result.contentType == 'text'
result.cacheControl == '12345'
and:
result.done()
!result.succeeded()
Expand All @@ -124,17 +144,22 @@ class BlobCacheInfoTest extends Specification {
def 'should cache blob info' () {
given:
def location = 'http://foo.com'
def headers = [Foo:'something']
def cache = BlobCacheInfo.create1(location, headers)
def headers = [Foo:['something']]
def response = ['Content-Length':['100'], 'Content-Type':['text'], 'Cache-Control': ['12345']]
and:
def cache = BlobCacheInfo.create(location, headers, response)
when:
def result = cache.cached()
then:
result.headers == headers
result.headers == [Foo:'something']
result.locationUri == 'http://foo.com'
result.creationTime == cache.creationTime
result.completionTime == cache.creationTime
result.exitStatus == 0
result.logs == null
result.cacheControl == '12345'
result.contentType == 'text'
result.contentLength == 100L
and:
result.done()
result.succeeded()
Expand All @@ -160,11 +185,27 @@ class BlobCacheInfoTest extends Specification {

def 'should change location uri' () {
given:
def result = BlobCacheInfo.create('http://foo.com', [:])
def headers = [Foo:['something']]
def response = ['Content-Length':['100'], 'Content-Type':['text'], 'Cache-Control': ['12345']]

expect:
result.locationUri == 'http://foo.com'
result.withLocation('http://bar.com')
.locationUri == 'http://bar.com'
when:
def result1 = BlobCacheInfo.create('http://foo.com', headers, response)
then:
result1.locationUri == 'http://foo.com'
and:
result1.headers == [Foo:'something']
result1.contentType == 'text'
result1.contentLength == 100L
result1.cacheControl == '12345'

when:
def result2 = result1.withLocation('http://bar.com')
then:
result2.locationUri == 'http://bar.com'
and:
result2.headers == [Foo:'something']
result2.contentType == 'text'
result2.contentLength == 100L
result2.cacheControl == '12345'
}
}
Loading

0 comments on commit de97575

Please sign in to comment.