Skip to content

Commit

Permalink
Merge pull request #168 from BenediktMKuehne/fix-user-seperation
Browse files Browse the repository at this point in the history
increase user separation
  • Loading branch information
m-1-k-3 authored Nov 28, 2023
2 parents bca11a3 + dee8e80 commit f40cbc5
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 180 deletions.
1 change: 1 addition & 0 deletions embark/dashboard/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
path('dashboard/report/', views.report_dashboard, name='embark-ReportDashboard'),
path('dashboard/report/deleteAnalysis/<uuid:analysis_id>', views.delete_analysis, name='embark-dashboard-delete-analysis'),
path('dashboard/report/archive/<uuid:analysis_id>', views.archive_analysis, name='embark-dashboard-archive'),
path('dashboard/report/hide/<uuid:analysis_id>', views.hide_analysis, name='embark-dashboard-hide'),
path('dashboard/individualReport/<uuid:analysis_id>', views.individual_report_dashboard, name='embark-IndividualReportDashboard'),
path('dashboard/stop/', views.stop_analysis, name='embark-stop-analysis'),
path('dashboard/log/<uuid:analysis_id>', views.show_log, name='embark-show-log'),
Expand Down
46 changes: 39 additions & 7 deletions embark/dashboard/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
@login_required(login_url='/' + settings.LOGIN_URL)
def main_dashboard(request):
if request.user.is_authenticated:
if FirmwareAnalysis.objects.filter(finished=True, failed=False).count() > 0 and Result.objects.all().count() > 0:
if FirmwareAnalysis.objects.filter(finished=True, failed=False).count() > 0 and Result.objects.filter(restricted=False).count() > 0:
return render(request, 'dashboard/mainDashboard.html', {'nav_switch': True, 'username': request.user.username})
messages.info(request, "Redirected - There are no Results to display yet")
return redirect('embark-uploader-home')
Expand All @@ -50,18 +50,21 @@ def stop_analysis(request):
logger.debug("Posted Form is valid")
# get id
analysis = form.cleaned_data['analysis']
logger.info("Stopping analysis with id %s", analysis.id)
pid = FirmwareAnalysis.objects.get(id=analysis.id).pid
analysis_object_ = FirmwareAnalysis.objects.get(id=analysis.id)
# check if user auth
if request.user != analysis_object_.user:
return HttpResponseForbidden("You are not authorized!")
logger.info("Stopping analysis with id %s", analysis_object_.id)
pid = analysis_object_.pid
logger.debug("PID is %s", pid)
try:
BoundedExecutor.submit_kill(analysis.id)
os.killpg(os.getpgid(pid), signal.SIGTERM) # kill proc group too
form = StopAnalysisForm()
form.fields['analysis'].queryset = FirmwareAnalysis.objects.filter(finished=False)
form.fields['analysis'].queryset = FirmwareAnalysis.objects.filter(user=request.user).filter(finished=False)
return render(request, 'dashboard/serviceDashboard.html', {'username': request.user.username, 'form': form, 'success_message': True, 'message': "Stopped successfully"})
except builtins.Exception as error:
logger.error("Error %s", error)
analysis_object_ = FirmwareAnalysis.objects.get(id=analysis.id)
analysis_object_.failed = True
analysis_object_.save(update_fields=["failed"])
return HttpResponseServerError("Failed to stop process, but set its status to failed. Please handle EMBA process manually: PID=" + str(pid))
Expand All @@ -78,7 +81,7 @@ def service_dashboard(request):
:return httpresp: html servicedashboard
"""
form = StopAnalysisForm()
form.fields['analysis'].queryset = FirmwareAnalysis.objects.filter(finished=False)
form.fields['analysis'].queryset = FirmwareAnalysis.objects.filter(user=request.user).filter(finished=False)
return render(request, 'dashboard/serviceDashboard.html', {'username': request.user.username, 'form': form, 'success_message': False})


Expand All @@ -92,7 +95,8 @@ def report_dashboard(request):
:return: rendered ReportDashboard
"""
firmwares = FirmwareAnalysis.objects.all()
# show all not hidden by others and ALL of your own
firmwares = (FirmwareAnalysis.objects.filter(hidden=False) | FirmwareAnalysis.objects.filter(user=request.user)).distinct()
return render(request, 'dashboard/reportDashboard.html', {'firmwares': firmwares, 'username': request.user.username})


Expand Down Expand Up @@ -122,6 +126,9 @@ def show_log(request, analysis_id):
"""
logger.info("showing log for analyze_id: %s", analysis_id)
firmware = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != firmware.user:
return HttpResponseForbidden("You are not authorized!")
# get the file path
log_file_path_ = f"{Path(firmware.path_to_logs).parent}/emba_run.log"
logger.debug("Taking file at %s and render it", log_file_path_)
Expand All @@ -145,6 +152,9 @@ def show_logviewer(request, analysis_id):

logger.info("showing log viewer for analyze_id: %s", analysis_id)
firmware = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != firmware.user:
return HttpResponseForbidden("You are not authorized!")
# get the file path
log_file_path_ = f"{Path(firmware.path_to_logs).parent}/emba_run.log"
logger.debug("Taking file at %s and render it", log_file_path_)
Expand Down Expand Up @@ -200,10 +210,32 @@ def archive_analysis(request, analysis_id):
"""
logger.info("Archiving Analysis with id: %s", analysis_id)
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != analysis.user and not request.user.is_superuser:
return HttpResponseForbidden("You are not authorized!")
if analysis.zip_file is None:
# make archive for uuid
_ = make_zip(request, analysis_id)
analysis.do_archive()
analysis.archived = True
analysis.save(update_fields=["archived"])
messages.success(request, 'Analysis: ' + str(analysis_id) + ' successfully archived')
return redirect('..')


@login_required(login_url='/' + settings.LOGIN_URL)
@require_http_methods(["GET"])
def hide_analysis(request, analysis_id):
"""
hides the analysis
checks user
"""
logger.info("Hiding Analysis with id: %s", analysis_id)
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != analysis.user and not request.user.is_superuser:
return HttpResponseForbidden("You are not authorized!")
analysis.hidden = True
analysis.save(update_fields=["hidden"])
messages.success(request, 'Analysis: ' + str(analysis_id) + ' successfully hidden')
return redirect('..')
2 changes: 1 addition & 1 deletion embark/porter/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def import_read(request):
messages.error(request, 'import failed')
return redirect('..')
messages.error(request, 'form invalid')
return HttpResponseBadRequest
return HttpResponseBadRequest("invalid form")


@login_required(login_url='/' + settings.LOGIN_URL)
Expand Down
162 changes: 85 additions & 77 deletions embark/reporter/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from django.shortcuts import redirect
from django.contrib import messages
from django.template.loader import get_template
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse
from django.http import HttpResponse, JsonResponse
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_http_methods
from django.urls import reverse
Expand All @@ -41,12 +41,13 @@ def html_report(request, analysis_id, html_file):
report_path = Path(f'{settings.EMBA_LOG_ROOT}{request.path[10:]}')
if FirmwareAnalysis.objects.filter(id=analysis_id).exists():
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.user == request.user:
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
html_body = get_template(report_path)
logger.debug("html_report - analysis_id: %s html_file: %s", analysis_id, html_file)
return HttpResponse(html_body.render({'embarkBackUrl': reverse('embark-ReportDashboard')}))
messages.error(request, "User not authorized")
logger.error("could not get template - %s", request)
return HttpResponseBadRequest
return redirect("..")


@require_http_methods(["GET"])
Expand All @@ -55,58 +56,62 @@ def html_report_path(request, analysis_id, html_path, html_file):
report_path = Path(f'{settings.EMBA_LOG_ROOT}{request.path[10:]}')
if FirmwareAnalysis.objects.filter(id=analysis_id).exists():
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.user == request.user:
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
html_body = get_template(report_path)
logger.debug("html_report - analysis_id: %s path: %s html_file: %s", analysis_id, html_path, html_file)
return HttpResponse(html_body.render({'embarkBackUrl': reverse('embark-ReportDashboard')}))
messages.error(request, "User not authorized")
logger.error("could not get path - %s", request)
return HttpResponseBadRequest
return redirect("..")


@require_http_methods(["GET"])
@login_required(login_url='/' + settings.LOGIN_URL)
def html_report_download(request, analysis_id, html_path, download_file): # Needed for EMBA?
base_path = f"{settings.EMBA_LOG_ROOT}"
if request.path.startswith('/'):
file_path = request.path[1:]
else:
file_path = request.path[2:]
full_path = os.path.normpath(os.path.join(base_path, file_path))
if full_path.startswith(base_path):
with open(full_path, 'rb') as requested_file:
response = HttpResponse(requested_file.read(), content_type="text/plain")
response['Content-Disposition'] = 'attachment; filename=' + os.path.basename(full_path)
logger.info("html_report - analysis_id: %s html_path: %s download_file: %s", analysis_id, html_path,
download_file)
return response
else:
response = Http404
return response
def html_report_download(request, analysis_id, html_path, download_file): # TODO Needed for EMBA?
response = Http404("Resource not found")
if FirmwareAnalysis.objects.filter(id=analysis_id).exists():
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
base_path = f"{settings.EMBA_LOG_ROOT}"
if request.path.startswith('/'):
file_path = request.path[1:]
else:
file_path = request.path[2:]
full_path = os.path.normpath(os.path.join(base_path, file_path))
if full_path.startswith(base_path):
with open(full_path, 'rb') as requested_file:
response = HttpResponse(requested_file.read(), content_type="text/plain")
response['Content-Disposition'] = 'attachment; filename=' + os.path.basename(full_path)
logger.info("html_report - analysis_id: %s html_path: %s download_file: %s", analysis_id, html_path,
download_file)
return response


@require_http_methods(["GET"])
@login_required(login_url='/' + settings.LOGIN_URL)
def html_report_resource(request, analysis_id, img_file):
content_type = "text/plain"

if img_file.endswith(".css"):
content_type = "text/css"
elif img_file.endswith(".svg"):
content_type = "image/svg+xml"
elif img_file.endswith(".png"):
content_type = "image/png"

resource_path = Path(f'{settings.EMBA_LOG_ROOT}{request.path[10:]}')
logger.info("html_report_resource - analysis_id: %s request.path: %s", analysis_id, request.path)

try:
# CodeQL issue is not relevant as the urls are defined via urls.py
with open(resource_path, "rb") as file_:
return HttpResponse(file_.read(), content_type=content_type)
except IOError as error:
logger.error(error)
logger.error(request.path)

if FirmwareAnalysis.objects.filter(id=analysis_id).exists():
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
content_type = "text/plain"

if img_file.endswith(".css"):
content_type = "text/css"
elif img_file.endswith(".svg"):
content_type = "image/svg+xml"
elif img_file.endswith(".png"):
content_type = "image/png"

resource_path = Path(f'{settings.EMBA_LOG_ROOT}{request.path[10:]}')
logger.info("html_report_resource - analysis_id: %s request.path: %s", analysis_id, request.path)

try:
# CodeQL issue is not relevant as the urls are defined via urls.py
with open(resource_path, "rb") as file_:
return HttpResponse(file_.read(), content_type=content_type)
except IOError as error:
logger.error(error)
logger.error(request.path)
# just in case -> back to report intro
report_path = Path(f'{settings.EMBA_LOG_ROOT}{request.path[10:]}')
html_body = get_template(report_path)
Expand All @@ -119,42 +124,43 @@ def get_individual_report(request, analysis_id):
"""
Get individual firmware report based on scan id (analysis_id)
"""
if not analysis_id:
logger.error('Bad request for get_individual_report')
return JsonResponse(data={'error': 'Bad request'}, status=HTTPStatus.BAD_REQUEST)
try:
if FirmwareAnalysis.objects.filter(id=analysis_id).exists():
analysis_object = FirmwareAnalysis.objects.get(id=analysis_id)
result = Result.objects.get(firmware_analysis=analysis_object)

logger.debug("getting individual report for %s", result)

return_dict = model_to_dict(instance=result, exclude=['vulnerability'])

return_dict['firmware_name'] = analysis_object.firmware_name
if analysis_object.firmware:
return_dict['id'] = analysis_object.firmware.id
else:
return_dict['id'] = "Firmware was deleted"
return_dict['device_list'] = [str(_device) for _device in analysis_object.device.all()]
return_dict['start_date'] = analysis_object.start_date
return_dict['end_date'] = analysis_object.end_date
return_dict['duration'] = analysis_object.duration
return_dict['notes'] = analysis_object.notes
return_dict['version'] = analysis_object.version
return_dict['path_to_logs'] = analysis_object.path_to_logs
return_dict['strcpy_bin'] = json.loads(return_dict['strcpy_bin'])
# architecture
if isinstance(return_dict['architecture_verified'], dict):
arch_ = json.loads(return_dict['architecture_verified'])
for key_, value_ in arch_.items():
return_dict['architecture_verified'] += f"{key_}-{value_} "
else:
return_dict['architecture_verified'] = str(return_dict['architecture_verified'])

return JsonResponse(data=return_dict, status=HTTPStatus.OK)
except Result.DoesNotExist:
logger.error('Report for firmware_id: %s not found in database', analysis_id)
return JsonResponse(data={'error': 'Not Found'}, status=HTTPStatus.NOT_FOUND)
if analysis_object.hidden is False or analysis_object.user == request.user or request.user.is_superuser:
try:
result = Result.objects.get(firmware_analysis=analysis_object)

logger.debug("getting individual report for %s", result)

return_dict = model_to_dict(instance=result, exclude=['vulnerability'])

return_dict['firmware_name'] = analysis_object.firmware_name
if analysis_object.firmware:
return_dict['id'] = analysis_object.firmware.id
else:
return_dict['id'] = "Firmware was deleted"
return_dict['device_list'] = [str(_device) for _device in analysis_object.device.all()]
return_dict['start_date'] = analysis_object.start_date
return_dict['end_date'] = analysis_object.end_date
return_dict['duration'] = analysis_object.duration
return_dict['notes'] = analysis_object.notes
return_dict['version'] = analysis_object.version
return_dict['path_to_logs'] = analysis_object.path_to_logs
return_dict['strcpy_bin'] = json.loads(return_dict['strcpy_bin'])
# architecture
if isinstance(return_dict['architecture_verified'], dict):
arch_ = json.loads(return_dict['architecture_verified'])
for key_, value_ in arch_.items():
return_dict['architecture_verified'] += f"{key_}-{value_} "
else:
return_dict['architecture_verified'] = str(return_dict['architecture_verified'])

return JsonResponse(data=return_dict, status=HTTPStatus.OK)
except Result.DoesNotExist:
logger.error('Report for firmware_id: %s not found in database', analysis_id)
return JsonResponse(data={'error': 'Not Found'}, status=HTTPStatus.NOT_FOUND)
logger.error('Bad request for get_individual_report')
return JsonResponse(data={'error': 'Bad request'}, status=HTTPStatus.BAD_REQUEST)


@require_http_methods(["GET"])
Expand Down Expand Up @@ -257,6 +263,7 @@ def download_zipped(request, analysis_id):
:return: HttpResponse with zipped log directory on success or HttpResponse including error message
"""
# TODO only user can download zips
logger.debug("entry download_zipped")
try:
firmware = FirmwareAnalysis.objects.get(id=analysis_id)
Expand All @@ -282,6 +289,7 @@ def make_zip(request, analysis_id):
"""
submit analysis for zipping log directory
"""
# TODO only user can make zips
try:
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# look for LogZipFile
Expand Down
Loading

0 comments on commit f40cbc5

Please sign in to comment.