Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: Add libmagic & detections in response #391

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions python/src/vaas/vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,25 @@ def get_ssl_context(url, verify):
return ssl.create_default_context()
return ssl._create_unverified_context() # pylint: disable=W0212


def problem_details_to_error(problem_details):
type = problem_details.get("type")
details = problem_details.get("details")
if type == "VaasClientException":
return VaasClientError(details)
return VaasServerError(details)


def map_response(verdict_response):
return {
"Sha256": verdict_response.get("sha256"),
"Guid": verdict_response.get("guid"),
"Verdict": verdict_response.get("verdict"),
"LibMagic": verdict_response.get("lib_magic"),
"Detections": verdict_response.get("detections"),
}


class Vaas:
"""Verdict-as-a-Service client"""

Expand Down Expand Up @@ -167,11 +179,7 @@ async def for_sha256(self, sha256, verdict_request_attributes=None, guid=None):
verdict_response = await self.__for_sha256(
sha256, verdict_request_attributes, guid
)
return {
"Sha256": verdict_response.get("sha256"),
"Guid": verdict_response.get("guid"),
"Verdict": verdict_response.get("verdict"),
}
return map_response(verdict_response)

async def __for_stream(self, verdict_request_attributes=None, guid=None):
if verdict_request_attributes is not None and not isinstance(
Expand Down Expand Up @@ -254,7 +262,7 @@ async def __receive_loop(self):
future.set_result(vaas_message)
if message_kind == "Error":
problem_details = vaas_message.get("problem_details")
if guid is None or problem_details is None :
if guid is None or problem_details is None:
# Error: Server sent guid we are not waiting for, or problem details are null, ignore it
continue
future = self.results.get(guid)
Expand All @@ -281,12 +289,8 @@ async def for_buffer(self, buffer, verdict_request_attributes=None, guid=None):
verdict_response, buffer, len(buffer)
)

return {
"Sha256": verdict_response.get("sha256"),
"Guid": verdict_response.get("guid"),
"Verdict": verdict_response.get("verdict"),
}

return map_response(verdict_response)

async def _for_unknown_buffer(self, response, buffer, buffer_len):
start = time.time()
guid = response.get("guid")
Expand Down Expand Up @@ -315,10 +319,10 @@ async def for_stream(self, asyncBufferedReader, len, verdict_request_attributes=

if verdict != "Unknown":
raise VaasServerError("server returned verdict without receiving content")

if token == None:
raise VaasServerError("VerdictResponse missing UploadToken for stream upload")

if url == None:
raise VaasServerError("VerdictResponse missing URL for stream upload")

Expand All @@ -332,11 +336,7 @@ async def for_stream(self, asyncBufferedReader, len, verdict_request_attributes=
raise VaasTimeoutError() from ex
self.tracing.trace_upload_request(time.time() - start, len)

return {
"Sha256": verdict_response.get("sha256"),
"Guid": verdict_response.get("guid"),
"Verdict": verdict_response.get("verdict"),
}
return map_response(verdict_response)

async def for_file(self, path, verdict_request_attributes=None, guid=None):
"""Returns the verdict for a file"""
Expand All @@ -356,11 +356,7 @@ async def for_file(self, path, verdict_request_attributes=None, guid=None):
verdict_response, buffer, len(buffer)
)

return {
"Sha256": verdict_response.get("sha256"),
"Guid": verdict_response.get("guid"),
"Verdict": verdict_response.get("verdict"),
}
return map_response(verdict_response)

async def __upload(self, token, upload_uri, buffer_or_file, content_length):
jwt = PyJWT()
Expand Down Expand Up @@ -411,8 +407,4 @@ async def for_url(self, url, verdict_request_attributes=None, guid=None):

self.tracing.trace_url_request(time.time() - start)

return {
"Sha256": result.get("sha256"),
"Guid": result.get("guid"),
"Verdict": result.get("verdict"),
}
return map_response(result)
16 changes: 10 additions & 6 deletions python/tests/test_vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ async def test_for_buffer_returns_unknown_for_random_buffer(self):
self.assertEqual(verdict["Sha256"].casefold(), sha256.casefold())
self.assertEqual(verdict["Guid"].casefold(), guid)


async def test_for_file_returns_verdict(self):
async with await create_and_connect() as vaas:
with open("eicar.txt", "wb") as f:
Expand All @@ -173,7 +172,6 @@ async def test_for_file_returns_verdict(self):
self.assertEqual(verdict["Sha256"].casefold(), sha256.casefold())
self.assertEqual(verdict["Guid"].casefold(), guid)


async def test_for_file_returns_verdict_if_no_cache_or_shed(self):
options = get_disabled_options()

Expand All @@ -187,7 +185,6 @@ async def test_for_file_returns_verdict_if_no_cache_or_shed(self):
self.assertEqual(verdict["Sha256"].casefold(), sha256.casefold())
self.assertEqual(verdict["Guid"].casefold(), guid)


async def test_for_url_returns_malicious_for_eicar(self):
options = get_disabled_options()
async with await create_and_connect(options=options) as vaas:
Expand All @@ -196,7 +193,6 @@ async def test_for_url_returns_malicious_for_eicar(self):
self.assertEqual(verdict["Verdict"], "Malicious")
self.assertEqual(verdict["Guid"].casefold(), guid)


async def test_for_url_without_shed_and_cache_returns_clean_for_robots_txt(self):
options = get_disabled_options()
async with await create_and_connect(options=options) as vaas:
Expand All @@ -205,7 +201,6 @@ async def test_for_url_without_shed_and_cache_returns_clean_for_robots_txt(self)
self.assertEqual(verdict["Verdict"], "Clean")
self.assertEqual(verdict["Guid"].casefold(), guid)


async def test_for_url_without_cache_returns_clean_for_robots_txt(self):
options = VaasOptions()
options.use_cache = False
Expand Down Expand Up @@ -238,7 +233,6 @@ async def test_for_buffer_traces(self):
tracing.trace_hash_request.assert_called_with(ANY)
tracing.trace_upload_request.assert_called_with(ANY, 1024)


async def test_for_empty_buffer_returns_clean(self):
async with await create_and_connect() as vaas:
buffer = bytes("", "utf-8")
Expand All @@ -249,6 +243,16 @@ async def test_for_empty_buffer_returns_clean(self):
self.assertEqual(verdict["Sha256"].casefold(), sha256.casefold())
self.assertEqual(verdict["Guid"].casefold(), guid)

async def test_for_url_returns_detections_and_mime_type(self):
options = get_disabled_options()
async with await create_and_connect(options=options) as vaas:
guid = str(uuid.uuid4())
verdict = await vaas.for_url("https://secure.eicar.org/eicar.com.txt", guid=guid)
self.assertEqual(verdict["Verdict"], "Malicious")
self.assertIsNotNone(verdict["Detections"])
self.assertEqual(verdict["LibMagic"]['file_type'], "EICAR virus test files")
self.assertEqual(verdict["LibMagic"]['mime_type'], "text/plain")


if __name__ == "__main__":
unittest.main()
Loading