From 7d0202320f6095850d6b0d196a9ce710a3cad6f2 Mon Sep 17 00:00:00 2001 From: To-om Date: Tue, 24 May 2022 17:12:19 +0200 Subject: [PATCH 1/8] #410 Check the job timeout from the main threadpool --- .../cortex/services/DockerJobRunnerSrv.scala | 6 +- .../thp/cortex/services/JobRunnerSrv.scala | 59 +++++++++---------- .../cortex/services/ProcessJobRunnerSrv.scala | 7 ++- 3 files changed, 38 insertions(+), 34 deletions(-) diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index b92d1dfe6..fe92ab568 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -62,7 +62,9 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { + def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + ec: ExecutionContext + ): Future[Unit] = { import scala.collection.JavaConverters._ if (autoUpdate) client.pull(dockerImage) // ContainerConfig.builder().addVolume() @@ -113,7 +115,7 @@ class DockerJobRunnerSrv( client.startContainer(containerCreation.id()) client.waitContainer(containerCreation.id()) () - }.andThen { + }(jobExecutor).andThen { case r => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index 018215d43..6913e8050 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -191,12 +191,10 @@ class JobRunnerSrv @Inject() ( case Failure(e) => endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e")) case _ => endJob(job, JobStatus.Success) } - } else { + } else endJob(job, JobStatus.Failure, (report \ "errorMessage").asOpt[String], (report \ "input").asOpt[JsValue].map(_.toString)) - } - } else { + } else endJob(job, JobStatus.Failure, Some(s"no output")) - } } def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = @@ -207,31 +205,32 @@ class JobRunnerSrv @Inject() ( } val finishedJob = for { _ <- startJob(job) - j <- runners - .foldLeft[Option[Future[Unit]]](None) { - case (None, "docker") => - worker - .dockerImage() - .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") - None - } - case (None, "process") => - worker - .command() - .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))(executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") - None - } - case (j: Some[_], _) => j - case (None, runner) => - logger.warn(s"Unknown job runner: $runner") - None + j <- + runners + .foldLeft[Option[Future[Unit]]](None) { + case (None, "docker") => + worker + .dockerImage() + .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") + None + } + case (None, "process") => + worker + .command() + .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") + None + } + case (j: Some[_], _) => j + case (None, runner) => + logger.warn(s"Unknown job runner: $runner") + None - } - .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) + } + .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) } yield j finishedJob .transformWith { r => @@ -253,8 +252,8 @@ class JobRunnerSrv @Inject() ( updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) } - private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)( - implicit authContext: AuthContext + private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit + authContext: AuthContext ): Future[Job] = { val fields = Fields .empty diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index ec981a358..0030805d7 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -31,7 +31,10 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)( + implicit + ec: ExecutionContext + ): Future[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent val output = StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") @@ -46,7 +49,7 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { .apply { process.exitValue() () - } + }(jobExecutor) .map { _ => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { From 01c6b2ade264e231b754eaba5256f652a7162fcc Mon Sep 17 00:00:00 2001 From: To-om Date: Wed, 25 May 2022 16:55:24 +0200 Subject: [PATCH 2/8] #412 Improve logs --- .../thp/cortex/controllers/AnalyzerCtrl.scala | 4 +- .../cortex/controllers/AttachmentCtrl.scala | 13 +-- .../cortex/controllers/ResponderCtrl.scala | 2 +- .../thp/cortex/controllers/StreamCtrl.scala | 10 +- app/org/thp/cortex/models/Audit.scala | 15 +-- .../thp/cortex/services/AccessLogFilter.scala | 32 ++++++ app/org/thp/cortex/services/CustomWSAPI.scala | 22 ++-- .../cortex/services/DockerJobRunnerSrv.scala | 6 +- .../thp/cortex/services/ErrorHandler.scala | 103 ++++++++---------- .../thp/cortex/services/JobRunnerSrv.scala | 44 +++++--- app/org/thp/cortex/services/JobSrv.scala | 3 +- app/org/thp/cortex/services/KeyAuthSrv.scala | 19 ++-- .../thp/cortex/services/LocalAuthSrv.scala | 8 +- app/org/thp/cortex/services/OAuth2Srv.scala | 17 +-- .../cortex/services/ProcessJobRunnerSrv.scala | 12 +- app/org/thp/cortex/services/WorkerSrv.scala | 25 +++-- app/org/thp/cortex/util/JsonConfig.scala | 2 +- conf/reference.conf | 4 +- 18 files changed, 185 insertions(+), 156 deletions(-) create mode 100644 app/org/thp/cortex/services/AccessLogFilter.scala diff --git a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala index 644f2c180..9a82029e2 100644 --- a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala +++ b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala @@ -44,7 +44,9 @@ class AnalyzerCtrl @Inject() ( private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): JsObject = if (isAdmin) - analyzer.toJson + ("configuration" -> Json.parse(analyzer.configuration())) + ("analyzerDefinitionId" -> JsString(analyzer.workerDefinitionId())) + analyzer.toJson + ("configuration" -> Json.parse(analyzer.configuration())) + ("analyzerDefinitionId" -> JsString( + analyzer.workerDefinitionId() + )) else analyzer.toJson + ("analyzerDefinitionId" -> JsString(analyzer.workerDefinitionId())) diff --git a/app/org/thp/cortex/controllers/AttachmentCtrl.scala b/app/org/thp/cortex/controllers/AttachmentCtrl.scala index 2f849db8e..2aeaf0270 100644 --- a/app/org/thp/cortex/controllers/AttachmentCtrl.scala +++ b/app/org/thp/cortex/controllers/AttachmentCtrl.scala @@ -18,8 +18,7 @@ import play.api.libs.Files.DefaultTemporaryFileCreator import play.api.mvc._ import play.api.{mvc, Configuration} -/** - * Controller used to access stored attachments (plain or zipped) +/** Controller used to access stored attachments (plain or zipped) */ @Singleton class AttachmentCtrl( @@ -41,8 +40,7 @@ class AttachmentCtrl( ) = this(configuration.get[String]("datastore.attachment.password"), tempFileCreator, attachmentSrv, authenticated, components, executionContextSrv) - /** - * Download an attachment, identified by its hash, in plain format + /** Download an attachment, identified by its hash, in plain format * File name can be specified. This method is not protected : browser will * open the document directly. It must be used only for safe file */ @@ -51,7 +49,7 @@ class AttachmentCtrl( executionContextSrv.withDefault { implicit ec => if (hash.startsWith("{{")) // angularjs hack NoContent - else if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + else if (name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).nonEmpty) mvc.Results.BadRequest("File name is invalid") else Result( @@ -69,15 +67,14 @@ class AttachmentCtrl( } } - /** - * Download an attachment, identified by its hash, in zip format. + /** Download an attachment, identified by its hash, in zip format. * Zip file is protected by the password "malware" * File name can be specified (zip extension is append) */ @Timed("controllers.AttachmentCtrl.downloadZip") def downloadZip(hash: String, name: Option[String]): Action[AnyContent] = authenticated(Roles.read) { _ => executionContextSrv.withDefault { implicit ec => - if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + if (name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).nonEmpty) BadRequest("File name is invalid") else { val f = tempFileCreator.create("zip", hash).path diff --git a/app/org/thp/cortex/controllers/ResponderCtrl.scala b/app/org/thp/cortex/controllers/ResponderCtrl.scala index e74648061..89dd52268 100644 --- a/app/org/thp/cortex/controllers/ResponderCtrl.scala +++ b/app/org/thp/cortex/controllers/ResponderCtrl.scala @@ -69,7 +69,7 @@ class ResponderCtrl @Inject() ( def listForType(dataType: String): Action[AnyContent] = authenticated(Roles.read).async { request => import org.elastic4play.services.QueryDSL._ val (responderList, responderCount) = workerSrv.findRespondersForUser(request.userId, "dataTypeList" ~= dataType, Some("all"), Nil) - renderer.toOutput(OK, responderList.map(responderJson(false)), responderCount) + renderer.toOutput(OK, responderList.map(responderJson(isAdmin = false)), responderCount) } def create(responderDefinitionId: String): Action[Fields] = authenticated(Roles.orgAdmin).async(fieldsBodyParser) { implicit request => diff --git a/app/org/thp/cortex/controllers/StreamCtrl.scala b/app/org/thp/cortex/controllers/StreamCtrl.scala index 2a0450f5c..0aa42ab86 100644 --- a/app/org/thp/cortex/controllers/StreamCtrl.scala +++ b/app/org/thp/cortex/controllers/StreamCtrl.scala @@ -70,8 +70,7 @@ class StreamCtrl( ) private[StreamCtrl] lazy val logger = Logger(getClass) - /** - * Create a new stream entry with the event head + /** Create a new stream entry with the event head */ @Timed("controllers.StreamCtrl.create") def create: Action[AnyContent] = authenticated(Roles.read) { @@ -85,8 +84,7 @@ class StreamCtrl( private[controllers] def isValidStreamId(streamId: String): Boolean = streamId.length == 10 && streamId.forall(alphanumeric.contains) - /** - * Get events linked to the identified stream entry + /** Get events linked to the identified stream entry * This call waits up to "refresh", if there is no event, return empty response */ @Timed("controllers.StreamCtrl.get") @@ -113,10 +111,10 @@ class StreamCtrl( } @Timed("controllers.StreamCtrl.status") - def status = Action { implicit request => + def status: Action[AnyContent] = Action { implicit request => val status = authenticated.expirationStatus(request) match { case ExpirationWarning(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> true) - case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) + case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) case ExpirationOk(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> false) } Ok(status) diff --git a/app/org/thp/cortex/models/Audit.scala b/app/org/thp/cortex/models/Audit.scala index cf94b1eca..4d8dfd99b 100644 --- a/app/org/thp/cortex/models/Audit.scala +++ b/app/org/thp/cortex/models/Audit.scala @@ -70,13 +70,14 @@ class AuditModel(auditName: String, auditedModels: immutable.Set[AuditedModel]) def mergeAttributeFormat(context: String, format1: AttributeFormat[_], format2: AttributeFormat[_]): Option[AttributeFormat[_]] = (format1, format2) match { - case (OptionalAttributeFormat(f1), f2) => mergeAttributeFormat(context, f1, f2) - case (f1, OptionalAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2) - case (MultiAttributeFormat(f1), MultiAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2).map(MultiAttributeFormat(_)) - case (f1, EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_)) => mergeAttributeFormat(context, f1, StringAttributeFormat) - case (EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_), f2) => mergeAttributeFormat(context, StringAttributeFormat, f2) - case (ObjectAttributeFormat(subAttributes1), ObjectAttributeFormat(subAttributes2)) => mergeAttributes(context, subAttributes1 ++ subAttributes2) - case (f1, f2) if f1 == f2 => Some(f1) + case (OptionalAttributeFormat(f1), f2) => mergeAttributeFormat(context, f1, f2) + case (f1, OptionalAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2) + case (MultiAttributeFormat(f1), MultiAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2).map(MultiAttributeFormat(_)) + case (f1, EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_)) => mergeAttributeFormat(context, f1, StringAttributeFormat) + case (EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_), f2) => mergeAttributeFormat(context, StringAttributeFormat, f2) + case (ObjectAttributeFormat(subAttributes1), ObjectAttributeFormat(subAttributes2)) => + mergeAttributes(context, subAttributes1 ++ subAttributes2) + case (f1, f2) if f1 == f2 => Some(f1) case (f1, f2) => logger.warn(s"Attribute $f1 != $f2") None diff --git a/app/org/thp/cortex/services/AccessLogFilter.scala b/app/org/thp/cortex/services/AccessLogFilter.scala new file mode 100644 index 000000000..af0d0ef06 --- /dev/null +++ b/app/org/thp/cortex/services/AccessLogFilter.scala @@ -0,0 +1,32 @@ +package org.thp.cortex.services + +import play.api.Logger +import play.api.http.HttpErrorHandler +import play.api.mvc.{EssentialAction, EssentialFilter, RequestHeader} + +import javax.inject.Inject +import scala.concurrent.ExecutionContext + +class AccessLogFilter @Inject() (errorHandler: HttpErrorHandler)(implicit ec: ExecutionContext) extends EssentialFilter { + + val logger: Logger = Logger(getClass) + + override def apply(next: EssentialAction): EssentialAction = + (requestHeader: RequestHeader) => { + val startTime = System.currentTimeMillis + next(requestHeader) + .recoverWith { case error => errorHandler.onServerError(requestHeader, error) } + .map { result => + val endTime = System.currentTimeMillis + val requestTime = endTime - startTime + + logger.info( + s"${requestHeader.remoteAddress} ${requestHeader.method} ${requestHeader.uri} took ${requestTime}ms and returned ${result.header.status} ${result + .body + .contentLength + .fold("")(b => s"$b bytes")}" + ) + result.withHeaders("Request-Time" -> requestTime.toString) + } + } +} diff --git a/app/org/thp/cortex/services/CustomWSAPI.scala b/app/org/thp/cortex/services/CustomWSAPI.scala index 9452066cf..f59053eed 100644 --- a/app/org/thp/cortex/services/CustomWSAPI.scala +++ b/app/org/thp/cortex/services/CustomWSAPI.scala @@ -19,7 +19,7 @@ object CustomWSAPI { .parse() def parseProxyConfig(config: Configuration): Option[WSProxyServer] = - config.getOptional[Configuration]("play.ws.proxy").map { proxyConfig ⇒ + config.getOptional[Configuration]("play.ws.proxy").map { proxyConfig => DefaultWSProxyServer( proxyConfig.get[String]("host"), proxyConfig.get[Int]("port"), @@ -35,7 +35,7 @@ object CustomWSAPI { def getWS(config: Configuration)(implicit mat: Materializer): AhcWSClient = { val clientConfig = parseWSConfig(config) val clientConfigWithTruststore = config.getOptional[String]("play.cert") match { - case Some(p) ⇒ + case Some(p) => logger.warn("""Use of "cert" parameter in configuration file is deprecated. Please use: | ws.ssl { | trustManager = { @@ -67,7 +67,7 @@ object CustomWSAPI { ) ) ) - case None ⇒ clientConfig + case None => clientConfig } AhcWSClient(clientConfigWithTruststore, None) } @@ -84,13 +84,13 @@ object CustomWSAPI { @Singleton class CustomWSAPI( - ws: AhcWSClient, - val proxy: Option[WSProxyServer], - config: Configuration, - environment: Environment, - lifecycle: ApplicationLifecycle, - mat: Materializer - ) extends WSClient { + ws: AhcWSClient, + val proxy: Option[WSProxyServer], + config: Configuration, + environment: Environment, + lifecycle: ApplicationLifecycle, + mat: Materializer +) extends WSClient { private[CustomWSAPI] lazy val logger = Logger(getClass) @Inject() def this(config: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, mat: Materializer) = @@ -110,7 +110,7 @@ class CustomWSAPI( try { new CustomWSAPI(Configuration(subConfig.underlying.atKey("play").withFallback(config.underlying)), environment, lifecycle, mat) } catch { - case NonFatal(e) ⇒ + case NonFatal(e) => logger.error(s"WSAPI configuration error, use default values", e) this } diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index fe92ab568..9415815f5 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -50,10 +50,11 @@ class DockerJobRunnerSrv( system: ActorSystem ) - lazy val logger = Logger(getClass) + lazy val logger: Logger = Logger(getClass) lazy val isAvailable: Boolean = Try { + logger.debug(s"Retrieve docker information ...") logger.info(s"Docker is available:\n${client.info()}") true }.recover { @@ -103,6 +104,8 @@ class DockerJobRunnerSrv( else containerConfigBuilder.build() val containerCreation = client.createContainer(containerConfig) // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) + + logger.debug(s"Container configuration: $containerConfig") logger.info( s"Execute container ${containerCreation.id()}\n" + s" timeout: ${timeout.fold("none")(_.toString)}\n" + @@ -119,6 +122,7 @@ class DockerJobRunnerSrv( case r => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file, use output stream.") val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) .fold(e => s"Container logs can't be read (${e.getMessage})", identity) val message = r.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) diff --git a/app/org/thp/cortex/services/ErrorHandler.scala b/app/org/thp/cortex/services/ErrorHandler.scala index fefa006b4..5fb22295b 100644 --- a/app/org/thp/cortex/services/ErrorHandler.scala +++ b/app/org/thp/cortex/services/ErrorHandler.scala @@ -1,82 +1,75 @@ package org.thp.cortex.services -import java.net.ConnectException - -import scala.concurrent.Future - +import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites +import org.elastic4play._ import org.thp.cortex.models.{JobNotFoundError, RateLimitExceeded, WorkerNotFoundError} import play.api.Logger +import play.api.http.Status.{BAD_REQUEST, FORBIDDEN, NOT_FOUND} import play.api.http.{HttpErrorHandler, Status, Writeable} -import play.api.libs.json.{JsNull, JsValue, Json} -import play.api.mvc.{RequestHeader, ResponseHeader, Result, Results} +import play.api.libs.json.{JsNull, JsString, JsValue, Json} +import play.api.mvc.{RequestHeader, ResponseHeader, Result} -import org.elastic4play.{ - AttributeCheckingError, - AuthenticationError, - AuthorizationError, - BadRequestError, - CreateError, - ErrorWithObject, - GetError, - IndexNotFoundException, - InternalError, - MultiError, - NotFoundError, - SearchError, - UpdateError -} -import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites +import java.net.ConnectException +import scala.concurrent.Future -/** - * This class handles errors. It traverses all causes of exception to find known error and shows the appropriate message +/** This class handles errors. It traverses all causes of exception to find known error and shows the appropriate message */ class ErrorHandler extends HttpErrorHandler { private[ErrorHandler] lazy val logger = Logger(getClass) - def onClientError(request: RequestHeader, statusCode: Int, message: String): Future[Result] = Future.successful { - Results.Status(statusCode)(s"A client error occurred on ${request.method} ${request.uri} : $message") + def onClientError(request: RequestHeader, statusCode: Int, message: String): Future[Result] = { + val tpe = statusCode match { + case BAD_REQUEST => "BadRequest" + case FORBIDDEN => "Forbidden" + case NOT_FOUND => "NotFound" + case _ => "Unknown" + } + Future.successful(toResult(statusCode, Json.obj("type" -> tpe, "message" -> message))) } - def toErrorResult(ex: Throwable): Option[(Int, JsValue)] = + def toErrorResult(ex: Throwable): (Int, JsValue) = ex match { - case AuthenticationError(message) => Some(Status.UNAUTHORIZED -> Json.obj("type" -> "AuthenticationError", "message" -> message)) - case AuthorizationError(message) => Some(Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message)) + case AuthenticationError(message) => Status.UNAUTHORIZED -> Json.obj("type" -> "AuthenticationError", "message" -> message) + case AuthorizationError(message) => Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message) case UpdateError(_, message, attributes) => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "UpdateError", "message" -> message, "object" -> attributes)) - case rle: RateLimitExceeded => Some(Status.TOO_MANY_REQUESTS -> Json.obj("type" -> "RateLimitExceeded", "message" -> rle.getMessage)) - case InternalError(message) => Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message)) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "UpdateError", "message" -> message, "object" -> attributes) + case rle: RateLimitExceeded => Status.TOO_MANY_REQUESTS -> Json.obj("type" -> "RateLimitExceeded", "message" -> rle.getMessage) + case InternalError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message) case nfe: NumberFormatException => - Some(Status.BAD_REQUEST -> Json.obj("type" -> "NumberFormatException", "message" -> ("Invalid format " + nfe.getMessage))) - case NotFoundError(message) => Some(Status.NOT_FOUND -> Json.obj("type" -> "NotFoundError", "message" -> message)) - case BadRequestError(message) => Some(Status.BAD_REQUEST -> Json.obj("type" -> "BadRequest", "message" -> message)) - case SearchError(message) => Some(Status.BAD_REQUEST -> Json.obj("type" -> "SearchError", "message" -> s"$message")) - case ace: AttributeCheckingError => Some(Status.BAD_REQUEST -> Json.toJson(ace)) - case iae: IllegalArgumentException => Some(Status.BAD_REQUEST -> Json.obj("type" -> "IllegalArgument", "message" -> iae.getMessage)) + Status.BAD_REQUEST -> Json.obj("type" -> "NumberFormatException", "message" -> ("Invalid format " + nfe.getMessage)) + case NotFoundError(message) => Status.NOT_FOUND -> Json.obj("type" -> "NotFoundError", "message" -> message) + case BadRequestError(message) => Status.BAD_REQUEST -> Json.obj("type" -> "BadRequest", "message" -> message) + case SearchError(message) => Status.BAD_REQUEST -> Json.obj("type" -> "SearchError", "message" -> s"$message") + case ace: AttributeCheckingError => Status.BAD_REQUEST -> Json.toJson(ace) + case iae: IllegalArgumentException => Status.BAD_REQUEST -> Json.obj("type" -> "IllegalArgument", "message" -> iae.getMessage) case _: ConnectException => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "NoNodeAvailable", "message" -> "ElasticSearch cluster is unreachable")) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "NoNodeAvailable", "message" -> "ElasticSearch cluster is unreachable") case CreateError(_, message, attributes) => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "CreateError", "message" -> message, "object" -> attributes)) - case ErrorWithObject(tpe, message, obj) => Some(Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj)) - case GetError(message) => Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "GetError", "message" -> message)) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "CreateError", "message" -> message, "object" -> attributes) + case ErrorWithObject(tpe, message, obj) => Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj) + case GetError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "GetError", "message" -> message) case MultiError(message, exceptions) => - val suberrors = exceptions.map(e => toErrorResult(e)).collect { - case Some((_, j)) => j - } - Some(Status.MULTI_STATUS -> Json.obj("type" -> "MultiError", "error" -> message, "suberrors" -> suberrors)) - case JobNotFoundError(jobId) => Some(Status.NOT_FOUND -> Json.obj("type" -> "JobNotFoundError", "message" -> s"Job $jobId not found")) + val suberrors = exceptions.map(e => toErrorResult(e)) + Status.MULTI_STATUS -> Json.obj("type" -> "MultiError", "error" -> message, "suberrors" -> suberrors) + case JobNotFoundError(jobId) => Status.NOT_FOUND -> Json.obj("type" -> "JobNotFoundError", "message" -> s"Job $jobId not found") case WorkerNotFoundError(analyzerId) => - Some(Status.NOT_FOUND -> Json.obj("type" -> "AnalyzerNotFoundError", "message" -> s"analyzer $analyzerId not found")) - case IndexNotFoundException => Some(520 -> JsNull) - case t: Throwable => Option(t.getCause).flatMap(toErrorResult) + Status.NOT_FOUND -> Json.obj("type" -> "AnalyzerNotFoundError", "message" -> s"analyzer $analyzerId not found") + case IndexNotFoundException => 520 -> JsNull + case _ if Option(ex.getCause).isDefined => toErrorResult(ex.getCause) + case _ => + logger.error("Internal error", ex) + val json = Json.obj("type" -> ex.getClass.getName, "message" -> ex.getMessage) + Status.INTERNAL_SERVER_ERROR -> (if (ex.getCause == null) json else json + ("cause" -> JsString(ex.getCause.getMessage))) } - def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]) = Result(header = ResponseHeader(status), body = writeable.toEntity(c)) + def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]): Result = + Result(header = ResponseHeader(status), body = writeable.toEntity(c)) def onServerError(request: RequestHeader, exception: Throwable): Future[Result] = { - val (status, body) = toErrorResult(exception).getOrElse( - Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> exception.getClass.getName, "message" -> exception.getMessage) - ) - logger.info(s"${request.method} ${request.uri} returned $status", exception) + val (status, body) = toErrorResult(exception) + if (!exception.isInstanceOf[AuthenticationError]) + if (logger.isDebugEnabled) logger.warn(s"${request.method} ${request.uri} returned $status", exception) + else logger.warn(s"${request.method} ${request.uri} returned $status") Future.successful(toResult(status, body)) } } diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index 6913e8050..e649470ec 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -5,20 +5,17 @@ import java.nio.charset.StandardCharsets import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import java.util.Date - import scala.concurrent.duration.DurationLong import scala.concurrent.{ExecutionContext, Future} -import scala.util.Failure - +import scala.util.{Failure, Success} import play.api.libs.json._ import play.api.{Configuration, Logger} - import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.FileIO + import javax.inject.Inject import org.thp.cortex.models._ - import org.elastic4play.BadRequestError import org.elastic4play.controllers.{Fields, FileInputValue} import org.elastic4play.database.ModifyConfig @@ -39,10 +36,11 @@ class JobRunnerSrv @Inject() ( implicit val mat: Materializer ) { - val logger = Logger(getClass) + val logger: Logger = Logger(getClass) lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer") lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder") val jobDirectory: Path = Paths.get(config.get[String]("job.directory")) + private val globalKeepJobFolder: Boolean = config.get[Boolean]("job.keepJobFolder") private val runners: Seq[String] = config .getOptional[Seq[String]]("job.runners") @@ -90,7 +88,8 @@ class JobRunnerSrv @Inject() ( } private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = { - val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") + val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") + logger.debug(s"Job folder is $jobFolder") val inputJobFolder = Files.createDirectories(jobFolder.resolve("input")) Files.createDirectories(jobFolder.resolve("output")) @@ -138,22 +137,26 @@ class JobRunnerSrv @Inject() ( ("config" -> config) } .map { input => + logger.debug(s"Write worker input: $input") Files.write(inputJobFolder.resolve("input.json"), input.toString.getBytes(StandardCharsets.UTF_8)) jobFolder } .recoverWith { case error => - delete(jobFolder) + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + delete(jobFolder) Future.failed(error) } } - private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext) = { + private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext): Future[Job] = { val outputFile = jobFolder.resolve("output").resolve("output.json") if (Files.exists(outputFile)) { - val is = Files.newInputStream(outputFile) - val report = Json.parse(is) - is.close() + logger.debug(s"Job output: ${new String(Files.readAllBytes(outputFile))}") + val is = Files.newInputStream(outputFile) + val report = + try Json.parse(is) + finally is.close() val success = (report \ "success").asOpt[Boolean].getOrElse(false) if (success) { @@ -233,13 +236,18 @@ class JobRunnerSrv @Inject() ( .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) } yield j finishedJob - .transformWith { r => - r.fold( - error => endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))), - _ => extractReport(jobFolder, job) - ) + .transformWith { + case _: Success[_] => + extractReport(jobFolder, job) + case Failure(error) => + endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))) + + } + .andThen { + case _ => + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + delete(jobFolder) } - .andThen { case _ => delete(jobFolder) } } private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) diff --git a/app/org/thp/cortex/services/JobSrv.scala b/app/org/thp/cortex/services/JobSrv.scala index cbf470bdb..915e16ca4 100644 --- a/app/org/thp/cortex/services/JobSrv.scala +++ b/app/org/thp/cortex/services/JobSrv.scala @@ -190,7 +190,8 @@ class JobSrv( case (dt, Right(fiv)) => dt -> attachmentSrv.save(fiv).map(Right.apply) case (dt, Left(data)) => dt -> Future.successful(Left(data)) }.fold( - typeDataAttachment => typeDataAttachment._2.flatMap(da => create(worker, typeDataAttachment._1, da, tlp, pap, message, parameters, label, force)), + typeDataAttachment => + typeDataAttachment._2.flatMap(da => create(worker, typeDataAttachment._1, da, tlp, pap, message, parameters, label, force)), errors => { val attributeError = AttributeCheckingError("job", errors) logger.error("legacy job create fails", attributeError) diff --git a/app/org/thp/cortex/services/KeyAuthSrv.scala b/app/org/thp/cortex/services/KeyAuthSrv.scala index a8dc8e130..62eb29417 100644 --- a/app/org/thp/cortex/services/KeyAuthSrv.scala +++ b/app/org/thp/cortex/services/KeyAuthSrv.scala @@ -1,20 +1,17 @@ package org.thp.cortex.services -import java.util.Base64 -import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random - -import play.api.libs.json.JsArray -import play.api.mvc.RequestHeader - import akka.stream.Materializer import akka.stream.scaladsl.Sink - import org.elastic4play.controllers.Fields import org.elastic4play.services.{AuthCapability, AuthContext, AuthSrv} import org.elastic4play.{AuthenticationError, BadRequestError} +import play.api.libs.json.JsArray +import play.api.mvc.RequestHeader + +import java.util.Base64 +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random @Singleton class KeyAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { @@ -26,7 +23,7 @@ class KeyAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, Base64.getEncoder.encodeToString(bytes) } - override val capabilities = Set(AuthCapability.authByKey) + override val capabilities: Set[AuthCapability.Type] = Set(AuthCapability.authByKey) override def authenticate(key: String)(implicit request: RequestHeader): Future[AuthContext] = { import org.elastic4play.services.QueryDSL._ diff --git a/app/org/thp/cortex/services/LocalAuthSrv.scala b/app/org/thp/cortex/services/LocalAuthSrv.scala index ae1af07b6..d45b9425d 100644 --- a/app/org/thp/cortex/services/LocalAuthSrv.scala +++ b/app/org/thp/cortex/services/LocalAuthSrv.scala @@ -1,15 +1,11 @@ package org.thp.cortex.services import javax.inject.{Inject, Singleton} - import scala.concurrent.{ExecutionContext, Future} import scala.util.Random - import play.api.mvc.RequestHeader - import akka.stream.Materializer import org.thp.cortex.models.User - import org.elastic4play.controllers.Fields import org.elastic4play.services.{AuthCapability, AuthContext, AuthSrv} import org.elastic4play.utils.Hasher @@ -18,8 +14,8 @@ import org.elastic4play.{AuthenticationError, AuthorizationError} @Singleton class LocalAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { - val name = "local" - override val capabilities = Set(AuthCapability.changePassword, AuthCapability.setPassword) + val name: String = "local" + override val capabilities: Set[AuthCapability.Type] = Set(AuthCapability.changePassword, AuthCapability.setPassword) private[services] def doAuthenticate(user: User, password: String): Boolean = user.password().map(_.split(",", 2)).fold(false) { diff --git a/app/org/thp/cortex/services/OAuth2Srv.scala b/app/org/thp/cortex/services/OAuth2Srv.scala index 27ea7f9b6..dfbed6d1d 100644 --- a/app/org/thp/cortex/services/OAuth2Srv.scala +++ b/app/org/thp/cortex/services/OAuth2Srv.scala @@ -87,7 +87,7 @@ class OAuth2Srv( if (!isSecuredAuthCode(request)) { logger.debug("Code or state is not provided, redirect to authorizationUrl") Future.successful(Left(authRedirect(oauth2Config))) - } else { + } else (for { token <- getToken(oauth2Config, request) userData <- getUserData(oauth2Config, token) @@ -95,14 +95,12 @@ class OAuth2Srv( } yield Right(authContext)).recoverWith { case error => Future.failed(AuthenticationError(s"OAuth2 authentication failure: ${error.getMessage}")) } - } } private def isSecuredAuthCode(request: RequestHeader): Boolean = request.queryString.contains("code") && request.queryString.contains("state") - /** - * Filter checking whether we initiate the OAuth2 process + /** Filter checking whether we initiate the OAuth2 process * and redirecting to OAuth2 server if necessary * @return */ @@ -122,12 +120,11 @@ class OAuth2Srv( .withSession("state" -> state) } - /** - * Enriching the initial request with OAuth2 token gotten + /** Enriching the initial request with OAuth2 token gotten * from OAuth2 code * @return */ - private def getToken[A](oauth2Config: OAuth2Config, request: RequestHeader): Future[String] = { + private def getToken(oauth2Config: OAuth2Config, request: RequestHeader): Future[String] = { val token = for { state <- request.session.get("state") @@ -147,8 +144,7 @@ class OAuth2Srv( token.getOrElse(Future.failed(BadRequestError("OAuth2 states mismatch"))) } - /** - * Querying the OAuth2 server for a token + /** Querying the OAuth2 server for a token * @param code the previously obtained code * @return */ @@ -181,8 +177,7 @@ class OAuth2Srv( } } - /** - * Client query for user data with OAuth2 token + /** Client query for user data with OAuth2 token * @param token the token * @return */ diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index 0030805d7..d7db36e48 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -2,14 +2,15 @@ package org.thp.cortex.services import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths} - import akka.actor.ActorSystem + import javax.inject.{Inject, Singleton} import org.elastic4play.utils.RichFuture import org.thp.cortex.models._ import play.api.Logger import play.api.libs.json.Json +import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} import scala.sys.process.{Process, ProcessLogger, _} @@ -18,9 +19,9 @@ import scala.util.Try @Singleton class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { - lazy val logger = Logger(getClass) + lazy val logger: Logger = Logger(getClass) - private val pythonPackageVersionRegex = "^Version: ([0-9]*)\\.([0-9]*)\\.([0-9]*)".r + private val pythonPackageVersionRegex = "^Version: (\\d*)\\.(\\d*)\\.(\\d*)".r def checkCortexUtilsVersion(pythonVersion: String): Option[(Int, Int, Int)] = Try { @@ -31,12 +32,11 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)( - implicit + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit ec: ExecutionContext ): Future[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent - val output = StringBuilder.newBuilder + val output = mutable.StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") val env = if (Files.exists(cacertsFile)) Seq("REQUESTS_CA_BUNDLE" -> cacertsFile.toString) else Nil diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 690a99e41..8b4a37cdd 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -50,10 +50,11 @@ class WorkerSrv @Inject() ( rescan() - def getDefinition(workerId: String): Try[WorkerDefinition] = workerMap.get(workerId) match { - case Some(worker) => Success(worker) - case None => Failure(NotFoundError(s"Worker $workerId not found")) - } + def getDefinition(workerId: String): Try[WorkerDefinition] = + workerMap.get(workerId) match { + case Some(worker) => Success(worker) + case None => Failure(NotFoundError(s"Worker $workerId not found")) + } // def listDefinitions: (Source[WorkerDefinition, NotUsed], Future[Long]) = Source(workerMap.values.toList) → Future.successful(workerMap.size.toLong) @@ -150,8 +151,9 @@ class WorkerSrv @Inject() ( case "file" => Future.successful(readFile(Paths.get(url.toURI), workerType)) case "http" | "https" => val reads = WorkerDefinition.reads(workerType) - ws.url(url.toString) - .get() + val query = ws.url(url.toString).get() + logger.debug(s"Read catalog using query $query") + query .map(response => response.json.as(reads)) .map(_.filterNot(_.command.isDefined)) } @@ -198,12 +200,12 @@ class WorkerSrv @Inject() ( Future(new URL(workerUrl)) .flatMap(readUrl(_, workerType)) .recover { - case _ => + case error => val path = Paths.get(workerUrl) if (Files.isRegularFile(path)) readFile(path, workerType) else if (Files.isDirectory(path)) readDirectory(path, workerType) else { - logger.warn(s"Worker path ($workerUrl) is not found") + logger.warn(s"Worker path ($workerUrl) is not found", error) Nil } } @@ -216,8 +218,8 @@ class WorkerSrv @Inject() ( } - def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)( - implicit authContext: AuthContext + def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit + authContext: AuthContext ): Future[Worker] = { val rawConfig = workerFields.getValue("configuration").fold(JsObject.empty)(_.as[JsObject]) val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe, config).items ++ BaseConfig @@ -252,7 +254,8 @@ class WorkerSrv @Inject() ( .set("configuration", cfg.toString) .set("type", workerDefinition.tpe.toString) .addIfAbsent("dataTypeList", StringInputValue(workerDefinition.dataTypeList)) - ), { + ), + { case One(e) => Future.failed(e) case Every(es @ _*) => Future.failed(AttributeCheckingError(s"worker(${workerDefinition.name}).configuration", es)) } diff --git a/app/org/thp/cortex/util/JsonConfig.scala b/app/org/thp/cortex/util/JsonConfig.scala index 33bce0f5b..382b682fd 100644 --- a/app/org/thp/cortex/util/JsonConfig.scala +++ b/app/org/thp/cortex/util/JsonConfig.scala @@ -19,7 +19,7 @@ object JsonConfig { } ) - implicit def configWrites = OWrites { (cfg: Configuration) => + implicit def configWrites: OWrites[Configuration] = OWrites { (cfg: Configuration) => JsObject(cfg.subKeys.map(key => key -> configValueWrites.writes(cfg.underlying.getValue(key))).toSeq) } } diff --git a/conf/reference.conf b/conf/reference.conf index 7f457257b..a96f0f1e1 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -14,6 +14,7 @@ job { runners = [docker, process] directory = ${java.io.tmpdir} dockerDirectory = ${job.directory} + keepJobFolder = false } # HTTP filters @@ -26,7 +27,8 @@ play.filters { enabled = [ org.thp.cortex.services.StreamFilter, org.elastic4play.services.TempFilter, - org.thp.cortex.services.CSRFFilter + org.thp.cortex.services.CSRFFilter, + org.thp.cortex.services.AccessLogFilter ] } From b6925bdce7b854552514ff4ee838885c4c2ba4dc Mon Sep 17 00:00:00 2001 From: To-om Date: Mon, 30 May 2022 17:54:11 +0200 Subject: [PATCH 3/8] #410 Handle job in one thread --- .../cortex/services/DockerJobRunnerSrv.scala | 40 +++--- .../thp/cortex/services/JobRunnerSrv.scala | 120 ++++++++++-------- .../cortex/services/ProcessJobRunnerSrv.scala | 49 +++---- 3 files changed, 106 insertions(+), 103 deletions(-) diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 9415815f5..2979a794b 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -63,9 +63,9 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext - ): Future[Unit] = { + ): Try[Unit] = { import scala.collection.JavaConverters._ if (autoUpdate) client.pull(dockerImage) // ContainerConfig.builder().addVolume() @@ -114,28 +114,30 @@ class DockerJobRunnerSrv( Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) ) - val execution = Future { + val timeoutSched = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, stopping the container") + client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) + } + ) + val execution = Try { client.startContainer(containerCreation.id()) client.waitContainer(containerCreation.id()) () - }(jobExecutor).andThen { - case r => - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - logger.warn(s"The worker didn't generate output file, use output stream.") - val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) - .fold(e => s"Container logs can't be read (${e.getMessage})", identity) - val message = r.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) + } + timeoutSched.foreach(_.cancel()) + val outputFile = jobDirectory.resolve("output").resolve("output.json") + if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file.") + val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) + .fold(e => s"Container logs can't be read (${e.getMessage})", identity) + val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) - val report = Json.obj("success" -> false, "errorMessage" -> message) - Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8)) - } + val report = Json.obj("success" -> false, "errorMessage" -> message) + Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) } - timeout - .fold(execution)(t => execution.withTimeout(t, client.stopContainer(containerCreation.id(), 3))) - .andThen { - case _ => client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - } + client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) + execution } } diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index e649470ec..1731eec4a 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -6,8 +6,8 @@ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import java.util.Date import scala.concurrent.duration.DurationLong -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} import play.api.libs.json._ import play.api.{Configuration, Logger} import akka.actor.ActorSystem @@ -87,7 +87,7 @@ class JobRunnerSrv @Inject() ( case t: Throwable => logger.warn(s"Fail to remove temporary files ($directory) : $t") } - private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = { + private def prepareJobFolder(worker: Worker, job: Job): Try[Path] = { val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") logger.debug(s"Job folder is $jobFolder") val inputJobFolder = Files.createDirectories(jobFolder.resolve("input")) @@ -97,12 +97,17 @@ class JobRunnerSrv @Inject() ( .attachment() .map { attachment => val attachmentFile = Files.createTempFile(inputJobFolder, "attachment", "") - attachmentSrv - .source(attachment.id) - .runWith(FileIO.toPath(attachmentFile)) + Try( + Await.result( + attachmentSrv + .source(attachment.id) + .runWith(FileIO.toPath(attachmentFile)), + 10.minutes + ) + ) .map(_ => Some(attachmentFile)) } - .getOrElse(Future.successful(None)) + .getOrElse(Success(None)) .map { case Some(file) => Json.obj("file" -> file.getFileName.toString, "filename" -> job.attachment().get.name, "contentType" -> job.attachment().get.contentType) @@ -145,7 +150,7 @@ class JobRunnerSrv @Inject() ( case error => if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) delete(jobFolder) - Future.failed(error) + Failure(error) } } @@ -200,64 +205,67 @@ class JobRunnerSrv @Inject() ( endJob(job, JobStatus.Failure, Some(s"no output")) } - def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = - prepareJobFolder(worker, job).flatMap { jobFolder => - val executionContext = worker.tpe() match { - case WorkerType.analyzer => analyzerExecutionContext - case WorkerType.responder => responderExecutionContext - } - val finishedJob = for { - _ <- startJob(job) - j <- - runners - .foldLeft[Option[Future[Unit]]](None) { - case (None, "docker") => - worker - .dockerImage() - .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") - None - } - case (None, "process") => - worker - .command() - .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") - None - } - case (j: Some[_], _) => j - case (None, runner) => - logger.warn(s"Unknown job runner: $runner") - None + def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = { + val executionContext = worker.tpe() match { + case WorkerType.analyzer => analyzerExecutionContext + case WorkerType.responder => responderExecutionContext + } + var maybeJobFolder: Option[Path] = None - } - .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) - } yield j - finishedJob - .transformWith { - case _: Success[_] => - extractReport(jobFolder, job) - case Failure(error) => - endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))) + Future { + syncStartJob(job).get + val jobFolder = prepareJobFolder(worker, job).get + maybeJobFolder = Some(jobFolder) + runners + .foldLeft[Option[Try[Unit]]](None) { + case (None, "docker") => + worker + .dockerImage() + .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, worker.jobTimeout().map(_.minutes))) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") + None + } + case (None, "process") => + worker + .command() + .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with process (doesn't have command)") + None + } + case (j: Some[_], _) => j + case (None, runner) => + logger.warn(s"Unknown job runner: $runner") + None } - .andThen { - case _ => - if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) - delete(jobFolder) - } - } + .getOrElse(throw BadRequestError("Worker cannot be run")) + }(executionContext) + .transformWith { + case _: Success[_] => + extractReport(maybeJobFolder.get /* can't be none */, job) + case Failure(error) => + endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json")))) + + } + .andThen { + case _ => + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + maybeJobFolder.foreach(delete) + } + } private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) - private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = { + private def syncStartJob(job: Job)(implicit authContext: AuthContext): Try[Job] = { val fields = Fields .empty .set("status", JobStatus.InProgress.toString) .set("startDate", Json.toJson(new Date)) - updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) + Try( + Await.result(updateSrv(job, fields, ModifyConfig(retryOnConflict = 0, seqNoAndPrimaryTerm = Some((job.seqNo, job.primaryTerm)))), 1.minute) + ) } private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index d7db36e48..5c33c79d8 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -32,9 +32,9 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext - ): Future[Unit] = { + ): Try[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent val output = mutable.StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") @@ -45,33 +45,26 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { logger.info(s" Job ${job.id}: $s") output ++= s }) - val execution = Future - .apply { - process.exitValue() - () - }(jobExecutor) - .map { _ => - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - val report = Json.obj("success" -> false, "errorMessage" -> output.toString) - Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) - } - () - } - .recoverWith { - case error => - logger.error(s"Execution of command $command failed", error) - Future.apply { - val report = Json.obj("success" -> false, "errorMessage" -> s"${error.getMessage}\n$output") - Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8)) - () - } + val timeoutSched = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, killing process") + process.destroy() } - timeout.fold(execution)(t => execution.withTimeout(t, killProcess(process))) - } + ) - def killProcess(process: Process): Unit = { - logger.info("Timeout reached, killing process") - process.destroy() + val execution = Try { + process.exitValue() + () + } + timeoutSched.foreach(_.cancel()) + val outputFile = jobDirectory.resolve("output").resolve("output.json") + if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file, use output stream.") + val message = execution.fold(e => s"Process execution error: ${e.getMessage}\n$output.result()", _ => output.result()) + val report = Json.obj("success" -> false, "errorMessage" -> message) + Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) + } + execution } + } From cc29f71ba1bbe1bcbf1d984b5f99361d37d98239 Mon Sep 17 00:00:00 2001 From: To-om Date: Tue, 21 Jun 2022 07:44:15 +0200 Subject: [PATCH 4/8] #416 Update dependencies --- app/org/thp/cortex/Module.scala | 30 +++++++++---------- .../cortex/controllers/AttachmentCtrl.scala | 15 +++++----- app/org/thp/cortex/models/Roles.scala | 5 +--- .../cortex/services/DockerJobRunnerSrv.scala | 21 +++++-------- .../cortex/services/ProcessJobRunnerSrv.scala | 10 +++---- app/org/thp/cortex/services/StreamSrv.scala | 17 ++++------- project/Dependencies.scala | 10 +++---- project/plugins.sbt | 6 ++-- 8 files changed, 48 insertions(+), 66 deletions(-) diff --git a/app/org/thp/cortex/Module.scala b/app/org/thp/cortex/Module.scala index 640120a84..dcf54bf31 100644 --- a/app/org/thp/cortex/Module.scala +++ b/app/org/thp/cortex/Module.scala @@ -1,25 +1,23 @@ package org.thp.cortex -import java.lang.reflect.Modifier - import com.google.inject.AbstractModule -import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder} -import play.api.libs.concurrent.AkkaGuiceSupport -import play.api.{Configuration, Environment, Logger, Mode} -import scala.collection.JavaConverters._ - import com.google.inject.name.Names +import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder} +import org.elastic4play.models.BaseModelDef +import org.elastic4play.services.auth.MultiAuthSrv +import org.elastic4play.services.{AuthSrv, MigrationOperations, UserSrv => EUserSrv} import org.reflections.Reflections -import org.reflections.scanners.SubTypesScanner +import org.reflections.scanners.Scanners import org.reflections.util.ConfigurationBuilder +import org.thp.cortex.controllers.{AssetCtrl, AssetCtrlDev, AssetCtrlProd} import org.thp.cortex.models.{AuditedModel, Migration} import org.thp.cortex.services._ +import org.thp.cortex.services.mappers.{MultiUserMapperSrv, UserMapper} +import play.api.libs.concurrent.AkkaGuiceSupport +import play.api.{Configuration, Environment, Logger, Mode} -import org.elastic4play.models.BaseModelDef -import org.elastic4play.services.auth.MultiAuthSrv -import org.elastic4play.services.{UserSrv => EUserSrv, AuthSrv, MigrationOperations} -import org.thp.cortex.controllers.{AssetCtrl, AssetCtrlDev, AssetCtrlProd} -import services.mappers.{MultiUserMapperSrv, UserMapper} +import java.lang.reflect.Modifier +import scala.collection.JavaConverters._ class Module(environment: Environment, configuration: Configuration) extends AbstractModule with ScalaModule with AkkaGuiceSupport { @@ -31,11 +29,11 @@ class Module(environment: Environment, configuration: Configuration) extends Abs val reflectionClasses = new Reflections( new ConfigurationBuilder() .forPackages("org.elastic4play") - .addClassLoader(getClass.getClassLoader) - .addClassLoader(environment.getClass.getClassLoader) + .addClassLoaders(getClass.getClassLoader) + .addClassLoaders(environment.getClass.getClassLoader) .forPackages("org.thp.cortex") .setExpandSuperTypes(false) - .setScanners(new SubTypesScanner(false)) + .setScanners(Scanners.SubTypes) ) reflectionClasses diff --git a/app/org/thp/cortex/controllers/AttachmentCtrl.scala b/app/org/thp/cortex/controllers/AttachmentCtrl.scala index 2aeaf0270..635cb7ca8 100644 --- a/app/org/thp/cortex/controllers/AttachmentCtrl.scala +++ b/app/org/thp/cortex/controllers/AttachmentCtrl.scala @@ -2,12 +2,12 @@ package org.thp.cortex.controllers import java.net.URLEncoder import java.nio.file.Files - import akka.stream.scaladsl.FileIO + import javax.inject.{Inject, Singleton} -import net.lingala.zip4j.core.ZipFile +import net.lingala.zip4j.ZipFile import net.lingala.zip4j.model.ZipParameters -import net.lingala.zip4j.util.Zip4jConstants +import net.lingala.zip4j.model.enums.{CompressionLevel, EncryptionMethod} import org.elastic4play.Timed import org.elastic4play.controllers.Authenticated import org.elastic4play.models.AttachmentAttributeFormat @@ -79,14 +79,13 @@ class AttachmentCtrl( else { val f = tempFileCreator.create("zip", hash).path Files.delete(f) - val zipFile = new ZipFile(f.toFile) + val zipFile = new ZipFile(f.toFile) + zipFile.setPassword(password.toCharArray) val zipParams = new ZipParameters - zipParams.setCompressionLevel(Zip4jConstants.DEFLATE_LEVEL_FASTEST) + zipParams.setCompressionLevel(CompressionLevel.FASTEST) zipParams.setEncryptFiles(true) - zipParams.setEncryptionMethod(Zip4jConstants.ENC_METHOD_STANDARD) - zipParams.setPassword(password) + zipParams.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD) zipParams.setFileNameInZip(name.getOrElse(hash)) - zipParams.setSourceExternalStream(true) zipFile.addStream(attachmentSrv.stream(hash), zipParams) Result( diff --git a/app/org/thp/cortex/models/Roles.scala b/app/org/thp/cortex/models/Roles.scala index 3116d4be0..241ccd453 100644 --- a/app/org/thp/cortex/models/Roles.scala +++ b/app/org/thp/cortex/models/Roles.scala @@ -1,16 +1,13 @@ package org.thp.cortex.models import play.api.libs.json.{JsString, JsValue} - import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.requests.mappings.KeywordField +import com.sksamuel.elastic4s.fields.KeywordField import org.scalactic.{Every, Good, One, Or} - import org.elastic4play.{AttributeError, InvalidFormatAttributeError} import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} import org.elastic4play.models.AttributeFormat import org.elastic4play.services.Role - import org.thp.cortex.models.JsonFormat.roleFormat object Roles { diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 2979a794b..830e1e62b 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -1,24 +1,19 @@ package org.thp.cortex.services -import java.nio.charset.StandardCharsets -import java.nio.file._ - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try - -import play.api.libs.json.Json -import play.api.{Configuration, Logger} - import akka.actor.ActorSystem import com.spotify.docker.client.DockerClient.LogsParam import com.spotify.docker.client.messages.HostConfig.Bind import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} import com.spotify.docker.client.{DefaultDockerClient, DockerClient} -import javax.inject.{Inject, Singleton} -import org.thp.cortex.models._ +import play.api.libs.json.Json +import play.api.{Configuration, Logger} -import org.elastic4play.utils.RichFuture +import java.nio.charset.StandardCharsets +import java.nio.file._ +import javax.inject.{Inject, Singleton} +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.util.Try @Singleton class DockerJobRunnerSrv( diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index 5c33c79d8..d5edcb275 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -1,18 +1,16 @@ package org.thp.cortex.services -import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Path, Paths} import akka.actor.ActorSystem - -import javax.inject.{Inject, Singleton} -import org.elastic4play.utils.RichFuture import org.thp.cortex.models._ import play.api.Logger import play.api.libs.json.Json +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import javax.inject.{Inject, Singleton} import scala.collection.mutable +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} import scala.sys.process.{Process, ProcessLogger, _} import scala.util.Try diff --git a/app/org/thp/cortex/services/StreamSrv.scala b/app/org/thp/cortex/services/StreamSrv.scala index de418907b..3695199af 100644 --- a/app/org/thp/cortex/services/StreamSrv.scala +++ b/app/org/thp/cortex/services/StreamSrv.scala @@ -1,8 +1,6 @@ package org.thp.cortex.services -import javax.inject.{Inject, Singleton} - -import akka.actor.{actorRef2Scala, Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, DeadLetter, PoisonPill} +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Cancellable, DeadLetter, PoisonPill} import akka.stream.Materializer import org.elastic4play.services._ import org.elastic4play.utils.Instance @@ -10,11 +8,11 @@ import play.api.Logger import play.api.libs.json.JsObject import play.api.mvc.{Filter, RequestHeader, Result} +import javax.inject.{Inject, Singleton} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} -/** - * This actor monitors dead messages and log them +/** This actor monitors dead messages and log them */ @Singleton class DeadLetterMonitoringActor @Inject() (system: ActorSystem) extends Actor { @@ -75,8 +73,7 @@ class StreamActor( private class WaitingRequest(senderRef: ActorRef, itemCancellable: Cancellable, globalCancellable: Cancellable, hasResult: Boolean) { def this(senderRef: ActorRef) = this(senderRef, FakeCancellable, context.system.scheduler.scheduleOnce(refresh, self, Submit), false) - /** - * Renew timers + /** Renew timers */ def renew: WaitingRequest = if (itemCancellable.cancel()) { @@ -92,8 +89,7 @@ class StreamActor( } else this - /** - * Send message + /** Send message */ def submit(messages: Seq[JsObject]): Unit = { itemCancellable.cancel() @@ -104,8 +100,7 @@ class StreamActor( var killCancel: Cancellable = FakeCancellable - /** - * renew global timer and rearm it + /** renew global timer and rearm it */ def renewExpiration(): Unit = if (killCancel.cancel()) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3ee6e24ef..096a1daf1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,7 @@ import sbt._ object Dependencies { - val scalaVersion = "2.12.12" + val scalaVersion = "2.12.16" object Play { val version = play.core.PlayVersion.current @@ -14,11 +14,11 @@ object Dependencies { val guice = "com.typesafe.play" %% "play-guice" % version } - val scalaGuice = "net.codingwell" %% "scala-guice" % "4.1.0" + val scalaGuice = "net.codingwell" %% "scala-guice" % "5.1.0" - val reflections = "org.reflections" % "reflections" % "0.9.11" - val zip4j = "net.lingala.zip4j" % "zip4j" % "1.3.2" - val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.3" + val reflections = "org.reflections" % "reflections" % "0.10.2" + val zip4j = "net.lingala.zip4j" % "zip4j" % "2.10.0" + val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.4" val dockerClient = "com.spotify" % "docker-client" % "8.14.4" val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion diff --git a/project/plugins.sbt b/project/plugins.sbt index e25cbb8ad..7207b3c38 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,6 +2,6 @@ logLevel := Level.Info // The Play plugin -addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.3") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") -addSbtPlugin("org.thehive-project" % "sbt-github-changelog" % "0.3.0") +addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.8.16") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("org.thehive-project" % "sbt-github-changelog" % "0.4.0") From 57e58a3f5163e9f1179176f7a9c07ac25de2ccf4 Mon Sep 17 00:00:00 2001 From: To-om Date: Tue, 21 Jun 2022 08:00:36 +0200 Subject: [PATCH 5/8] #413 Add missing python dependencies in cortex image --- project/DockerSettings.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/project/DockerSettings.scala b/project/DockerSettings.scala index ebb1b7eb3..a2e631f01 100644 --- a/project/DockerSettings.scala +++ b/project/DockerSettings.scala @@ -29,11 +29,16 @@ object DockerSettings { case (_, filepath) => filepath == "/opt/cortex/conf/application.conf" }), dockerCommands := Seq( - Cmd("FROM", "openjdk:8"), + Cmd("FROM", "openjdk:8-slim"), Cmd("LABEL", "MAINTAINER=\"TheHive Project \"", "repository=\"https://github.com/TheHive-Project/TheHive\""), Cmd("WORKDIR", "/opt/cortex"), // format: off Cmd("RUN", + "apt", "update", "&&", + "apt", "upgrade", "-y", "&&", + "apt", "install", "-y", "iptables", "lxc", "wget", "&&", + "apt", "autoclean", "-y", "-q", "&&", + "apt", "autoremove", "-y", "-q", "&&", "wget", "-q", "-O", "-", "https://download.docker.com/linux/static/stable/x86_64/docker-18.09.0.tgz", "|", "tar", "-xzC", "/usr/local/bin/", "--strip-components", "1", "&&", "addgroup", "--system", "dockremap", "&&", @@ -41,11 +46,6 @@ object DockerSettings { "addgroup", "--system", "docker", "&&", "echo", "dockremap:165536:65536", ">>", "/etc/subuid", "&&", "echo", "dockremap:165536:65536", ">>", "/etc/subgid", "&&", - "apt", "update", "&&", - "apt", "upgrade", "-y", "&&", - "apt", "install", "-y", "iptables", "lxc", "&&", - "apt", "autoclean", "-y", "-q", "&&", - "apt", "autoremove", "-y", "-q", "&&", "rm", "-rf", "/var/lib/apt/lists/*", "&&", "(", "type", "groupadd", "1>/dev/null", "2>&1", "&&", "groupadd", "-g", "1001", "cortex", "||", @@ -112,6 +112,11 @@ object DockerSettings { | pip2 install $I || true && | pip3 install $I || true ; | done && + | for I in $(find /tmp/analyzers -name requirements.txt) ; + | do + | pip2 install -r $I || true && + | pip3 install -r $I || true ; + | done && | rm -rf /tmp/analyzers """.stripMargin.split("\\s").filter(_.nonEmpty): _* ) From 9c6196a8ca882143fc8b6b287a462173210eb8f2 Mon Sep 17 00:00:00 2001 From: To-om Date: Tue, 21 Jun 2022 08:10:26 +0200 Subject: [PATCH 6/8] #417 Add API to check status of several jobs --- app/org/thp/cortex/controllers/JobCtrl.scala | 20 +++++++++++++++++--- conf/routes | 1 + 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/app/org/thp/cortex/controllers/JobCtrl.scala b/app/org/thp/cortex/controllers/JobCtrl.scala index f3a762217..fcc22bc2b 100644 --- a/app/org/thp/cortex/controllers/JobCtrl.scala +++ b/app/org/thp/cortex/controllers/JobCtrl.scala @@ -3,21 +3,20 @@ package org.thp.cortex.controllers import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.DurationInt - import play.api.http.Status import play.api.libs.json.{JsObject, JsString, JsValue, Json} import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} - import akka.actor.{ActorRef, ActorSystem} import akka.pattern.ask import akka.stream.Materializer import akka.stream.scaladsl.Sink import akka.util.Timeout +import org.elastic4play.NotFoundError + import javax.inject.{Inject, Named, Singleton} import org.thp.cortex.models.{Job, JobStatus, Roles} import org.thp.cortex.services.AuditActor.{JobEnded, Register} import org.thp.cortex.services.JobSrv - import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} import org.elastic4play.models.JsonFormat.baseModelEntityWrites import org.elastic4play.services.JsonFormat.queryReads @@ -159,6 +158,21 @@ class JobCtrl @Inject() ( .map(Ok(_)) } + def getJobStatus: Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request => + val jobIds = request.body.getStrings("jobIds").getOrElse(Nil) + Future + .traverse(jobIds) { jobId => + jobSrv + .getForUser(request.userId, jobId) + .map(j => jobId -> JsString(j.status().toString)) + .recover { + case _: NotFoundError => jobId -> JsString("NotFound") + case error => jobId -> JsString(s"Error($error)") + } + } + .map(statuses => Ok(JsObject(statuses))) + } + def listArtifacts(jobId: String): Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request => val query = request.body.getValue("query").fold[QueryDef](QueryDSL.any)(_.as[QueryDef]) val range = request.body.getString("range") diff --git a/conf/routes b/conf/routes index 8389c614d..20f7519fd 100644 --- a/conf/routes +++ b/conf/routes @@ -65,6 +65,7 @@ POST /api/job/_search org.thp.cort DELETE /api/job/:id org.thp.cortex.controllers.JobCtrl.delete(id) GET /api/job/:id org.thp.cortex.controllers.JobCtrl.get(id) GET /api/job/:id/report org.thp.cortex.controllers.JobCtrl.report(id) +POST /api/job/status org.thp.cortex.controllers.JobCtrl.getJobStatus POST /api/stream org.thp.cortex.controllers.StreamCtrl.create() From 5fbb7a6eee130a8276c1fa3178c117f24ecfe187 Mon Sep 17 00:00:00 2001 From: To-om Date: Wed, 22 Jun 2022 09:50:50 +0200 Subject: [PATCH 7/8] #416 Fix serialisation on null values --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 096a1daf1..4c3c75af6 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,7 +18,7 @@ object Dependencies { val reflections = "org.reflections" % "reflections" % "0.10.2" val zip4j = "net.lingala.zip4j" % "zip4j" % "2.10.0" - val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.4" + val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.5" val dockerClient = "com.spotify" % "docker-client" % "8.14.4" val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion From d522504839e860007aaaedb074274e3735bca7c7 Mon Sep 17 00:00:00 2001 From: To-om Date: Wed, 22 Jun 2022 10:26:13 +0200 Subject: [PATCH 8/8] Release 3.1.5 --- CHANGELOG.md | 106 +++++++++++++++++++++++++++-------------------- version.sbt | 2 +- www/package.json | 2 +- 3 files changed, 63 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73c0568fd..66bae0adc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,25 @@ # Change Log -## 3.1.4 (2021-12-20) +## [3.1.5](https://github.com/TheHive-Project/Cortex/milestone/29) (2022-06-22) + +**Implemented enhancements:** + +- Improve logs for troubleshooting [\#412](https://github.com/TheHive-Project/Cortex/issues/412) +- Add API to check status of several jobs [\#417](https://github.com/TheHive-Project/Cortex/issues/417) + +**Fixed bugs:** + +- Job timeout doesn't work if threadpool is full [\#410](https://github.com/TheHive-Project/Cortex/issues/410) +- Update libraries [\#416](https://github.com/TheHive-Project/Cortex/issues/416) + +**Closed issues:** + +- Add missing dependencies on Docker image [\#413](https://github.com/TheHive-Project/Cortex/issues/413) +- [Bug ] Authentication Bypass Vulnerability [\#418](https://github.com/TheHive-Project/Cortex/issues/418) + +## [3.1.4 - Update library log4j-to-slf4j to version 2.17.0](https://github.com/TheHive-Project/Cortex/milestone/33) (2022-05-24) -**Dependency update**: -- Update library `log4j-to-slf4j` to version 2.17.0 ## [3.1.3](https://github.com/TheHive-Project/Cortex/milestone/31) (2021-11-10) @@ -14,25 +29,25 @@ ## [3.1.2](https://github.com/TheHive-Project/Cortex/milestone/30) (2021-11-05) +**Implemented enhancements:** + +- Create a docker image with all dependencies [\#388](https://github.com/TheHive-Project/Cortex/issues/388) + **Closed issues:** - More settings on docker containers instantiated by Cortex [\#387](https://github.com/TheHive-Project/Cortex/issues/387) -**Implemented enhancements:** +## [3.1.1](https://github.com/TheHive-Project/Cortex/milestone/28) (2021-03-01) -- Create a docker image with all dependencies [\#388](https://github.com/TheHive-Project/Cortex/issues/388) +**Implemented enhancements:** -## [3.1.1](https://github.com/TheHive-Project/Cortex/milestone/28) (2021-03-01) +- [Improvement] Create logfile after installation [\#341](https://github.com/TheHive-Project/Cortex/issues/341) **Fixed bugs:** - [BUG] Certificate not taken into account when running neurons with process [\#317](https://github.com/TheHive-Project/Cortex/issues/317) - [Bug] Update doesn't work on Elasticsearch 7.11 [\#346](https://github.com/TheHive-Project/Cortex/issues/346) -**Implemented enhancements:** - -- [Improvement] Create logfile after installation [\#341](https://github.com/TheHive-Project/Cortex/issues/341) - ## [3.1.0](https://github.com/TheHive-Project/Cortex/milestone/27) (2020-10-30) **Implemented enhancements:** @@ -47,19 +62,20 @@ ## [3.1.0-RC1](https://github.com/TheHive-Project/Cortex/milestone/21) (2020-08-13) -**Fixed bugs:** - -- OAuth2 SSO Login Broken [\#264](https://github.com/TheHive-Project/Cortex/issues/264) - **Implemented enhancements:** - Support of ElasticSearch 7 [\#279](https://github.com/TheHive-Project/Cortex/issues/279) +**Fixed bugs:** + +- OAuth2 SSO Login Broken [\#264](https://github.com/TheHive-Project/Cortex/issues/264) + ## [3.0.1](https://github.com/TheHive-Project/Cortex/milestone/24) (2020-04-24) **Implemented enhancements:** - Handle second/minute-rates limits on Flavors and Analyzers [\#164](https://github.com/TheHive-Project/Cortex/issues/164) +- Remove Elasticsearch cluster configuration option [\#230](https://github.com/TheHive-Project/Cortex/pull/230) - Docker image has many CVE's open against it [\#238](https://github.com/TheHive-Project/Cortex/issues/238) - Analyzer reports "no output" when it fails [\#241](https://github.com/TheHive-Project/Cortex/issues/241) - Cortex logs the Play secret key at startup. [\#244](https://github.com/TheHive-Project/Cortex/issues/244) @@ -130,10 +146,10 @@ ## [2.1.3](https://github.com/TheHive-Project/Cortex/milestone/18) (2019-02-05) -**Closed issues:** +**Implemented enhancements:** -- conf/logback.xml: Rotate logs [\#62](https://github.com/TheHive-Project/Cortex/issues/62) -- Build Error on NodeJS 8 [\#142](https://github.com/TheHive-Project/Cortex/issues/142) +- Add PAP property to jobs list [\#146](https://github.com/TheHive-Project/Cortex/issues/146) +- Add configuration for drone continuous integration [\#156](https://github.com/TheHive-Project/Cortex/issues/156) **Fixed bugs:** @@ -143,10 +159,10 @@ - Unable to disable invalid responders [\#157](https://github.com/TheHive-Project/Cortex/issues/157) - Wrong checks of role when an user is created [\#158](https://github.com/TheHive-Project/Cortex/issues/158) -**Implemented enhancements:** +**Closed issues:** -- Add PAP property to jobs list [\#146](https://github.com/TheHive-Project/Cortex/issues/146) -- Add configuration for drone continuous integration [\#156](https://github.com/TheHive-Project/Cortex/issues/156) +- conf/logback.xml: Rotate logs [\#62](https://github.com/TheHive-Project/Cortex/issues/62) +- Build Error on NodeJS 8 [\#142](https://github.com/TheHive-Project/Cortex/issues/142) ## [2.1.2](https://github.com/TheHive-Project/Cortex/milestone/17) (2018-10-12) @@ -156,18 +172,18 @@ ## [2.1.1](https://github.com/TheHive-Project/Cortex/milestone/16) (2018-10-12) -**Fixed bugs:** - -- Console output should not be logged in syslog [\#136](https://github.com/TheHive-Project/Cortex/issues/136) -- RPM update replace configuration file [\#137](https://github.com/TheHive-Project/Cortex/issues/137) -- Fix Cache column in analyzers admin page [\#139](https://github.com/TheHive-Project/Cortex/issues/139) - **Implemented enhancements:** - Publish stable versions in beta package channels [\#138](https://github.com/TheHive-Project/Cortex/issues/138) - Allow Cortex to use a custom root context [\#140](https://github.com/TheHive-Project/Cortex/issues/140) - Change Debian dependencies [\#141](https://github.com/TheHive-Project/Cortex/issues/141) +**Fixed bugs:** + +- Console output should not be logged in syslog [\#136](https://github.com/TheHive-Project/Cortex/issues/136) +- RPM update replace configuration file [\#137](https://github.com/TheHive-Project/Cortex/issues/137) +- Fix Cache column in analyzers admin page [\#139](https://github.com/TheHive-Project/Cortex/issues/139) + ## [2.1.0](https://github.com/TheHive-Project/Cortex/milestone/15) (2018-09-25) **Implemented enhancements:** @@ -189,19 +205,23 @@ - Automated response via Cortex [\#110](https://github.com/TheHive-Project/Cortex/issues/110) - New TheHive-Project repository [\#112](https://github.com/TheHive-Project/Cortex/issues/112) -**Closed issues:** - -- Unable to update user [\#106](https://github.com/TheHive-Project/Cortex/issues/106) -- Refreshing analyzers does not refresh definition if already defined [\#115](https://github.com/TheHive-Project/Cortex/issues/115) - **Fixed bugs:** - Fix redirection from Migration page to login on 401 error [\#114](https://github.com/TheHive-Project/Cortex/issues/114) - Analyzers filter in Jobs History view is limited to 25 analyzers [\#116](https://github.com/TheHive-Project/Cortex/issues/116) - First analyze of a "file" always fail, must re-run the analyze a second time [\#117](https://github.com/TheHive-Project/Cortex/issues/117) +**Closed issues:** + +- Unable to update user [\#106](https://github.com/TheHive-Project/Cortex/issues/106) +- Refreshing analyzers does not refresh definition if already defined [\#115](https://github.com/TheHive-Project/Cortex/issues/115) + ## [2.0.4](https://github.com/TheHive-Project/Cortex/milestone/13) (2018-04-13) +**Implemented enhancements:** + +- Let a Read/Analyze User Display/Change their API Key [\#89](https://github.com/TheHive-Project/Cortex/issues/89) + **Fixed bugs:** - Install python3 requirements for analyzers in public docker image [\#58](https://github.com/TheHive-Project/Cortex/issues/58) @@ -212,17 +232,8 @@ - Updating users by orgAdmin users fails silently [\#94](https://github.com/TheHive-Project/Cortex/issues/94) - Strictly filter the list of analyzers in the run dialog [\#95](https://github.com/TheHive-Project/Cortex/issues/95) -**Implemented enhancements:** - -- Let a Read/Analyze User Display/Change their API Key [\#89](https://github.com/TheHive-Project/Cortex/issues/89) - ## [2.0.3](https://github.com/TheHive-Project/Cortex/milestone/12) (2018-04-12) -**Fixed bugs:** - -- Version Upgrade of Analyzer makes all Analyzers invisible for TheHive (Cortex2) [\#75](https://github.com/TheHive-Project/Cortex/issues/75) -- Refresh Analyzers button not working [\#83](https://github.com/TheHive-Project/Cortex/issues/83) - **Implemented enhancements:** - Allow configuring auto artifacts extraction per analyzer [\#80](https://github.com/TheHive-Project/Cortex/issues/80) @@ -231,6 +242,11 @@ - Allow specifying a cache period per analyzer [\#85](https://github.com/TheHive-Project/Cortex/issues/85) - Allow arbitrary parameters for a job [\#86](https://github.com/TheHive-Project/Cortex/issues/86) +**Fixed bugs:** + +- Version Upgrade of Analyzer makes all Analyzers invisible for TheHive (Cortex2) [\#75](https://github.com/TheHive-Project/Cortex/issues/75) +- Refresh Analyzers button not working [\#83](https://github.com/TheHive-Project/Cortex/issues/83) + ## [2.0.2](https://github.com/TheHive-Project/Cortex/milestone/11) (2018-04-04) **Fixed bugs:** @@ -285,17 +301,17 @@ ## [1.1.2](https://github.com/TheHive-Project/Cortex/milestone/6) (2017-06-12) +**Implemented enhancements:** + +- Initialize MISP modules at startup [\#28](https://github.com/TheHive-Project/Cortex/issues/28) +- Add page loader [\#30](https://github.com/TheHive-Project/Cortex/issues/30) + **Fixed bugs:** - Error 500 in TheHive when a job is submited to Cortex [\#27](https://github.com/TheHive-Project/Cortex/issues/27) - Cortex and MISP unclear and error-loop [\#29](https://github.com/TheHive-Project/Cortex/issues/29) - jobstatus from jobs within cortex are not updated when status changes [\#31](https://github.com/TheHive-Project/Cortex/issues/31) -**Implemented enhancements:** - -- Initialize MISP modules at startup [\#28](https://github.com/TheHive-Project/Cortex/issues/28) -- Add page loader [\#30](https://github.com/TheHive-Project/Cortex/issues/30) - ## [1.1.1](https://github.com/TheHive-Project/Cortex/milestone/5) (2017-05-17) **Implemented enhancements:** diff --git a/version.sbt b/version.sbt index c3e8a03cf..070087c2e 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "3.1.4-1" +ThisBuild / version := "3.1.5-1" diff --git a/www/package.json b/www/package.json index b5f7eea54..e10f574aa 100755 --- a/www/package.json +++ b/www/package.json @@ -1,6 +1,6 @@ { "name": "cortex", - "version": "3.1.3", + "version": "3.1.5", "description": "A powerfull observable analysis engine", "license": "AGPL-3.0-or-later", "homepage": "https://github.com/TheHive-Project/Cortex",