Skip to content

Commit

Permalink
Merge pull request #263 from ianmcorvidae/group-data-sharing
Browse files Browse the repository at this point in the history
Enable sharing analysis inputs and outputs with groups
  • Loading branch information
ianmcorvidae authored Mar 7, 2023
2 parents 8a8ce9e + a9fd509 commit 251680e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 74 deletions.
11 changes: 8 additions & 3 deletions src/apps/clients/async_tasks.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [apps.util.config :as config]
[clojure.string :as string]
[clojure.tools.logging :as log]
[otel.otel :as otel]
[async-tasks-client.core :as async-tasks-client]))

(defn get-by-id
Expand Down Expand Up @@ -34,9 +35,13 @@

(defn run-async-thread
[async-task-id thread-function prefix]
(let [^Runnable task-thread (fn [] (thread-function async-task-id))]
(.start (Thread. task-thread (str prefix "-" (string/replace async-task-id #".*/tasks/" "")))))
async-task-id)
(otel/with-span [outer-span ["run-async-thread" {:kind :producer :attributes {"async-task-id" (str async-task-id)}}]]
(let [^Runnable task-thread (fn []
(with-open [_ (otel/span-scope outer-span)]
(otel/with-span [s ["async thread" {:kind :consumer :attributes {"async-task-id" (str async-task-id)}}]]
(thread-function async-task-id))))]
(.start (Thread. task-thread (str prefix "-" (string/replace async-task-id #".*/tasks/" "")))))
async-task-id))

(defn new-task
[type user data & [{:keys [timeout] :or {timeout "10m"}}]]
Expand Down
154 changes: 83 additions & 71 deletions src/apps/service/apps/jobs/sharing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
[apps.service.apps.jobs.params :as job-params]
[apps.service.apps.jobs.permissions :as job-permissions]
[apps.util.service :as service]
[otel.otel :as otel]
[clojure.string :as string]
[clojure.tools.logging :as log]
[clojure-commons.error-codes :as ce]))
Expand Down Expand Up @@ -106,37 +107,43 @@
(when-not (job-permissions/job-supports-job-sharing? apps-client job-id)
(job-sharing-error (job-sharing-msg :not-supported job-id))))

(defn- verify-not-group
[{subject-source-id :source_id subject-id :id} job-id]
(when-not (ipg/user-source? subject-source-id)
(job-sharing-error (job-sharing-msg :is-group job-id (str subject-id " is a group")))))

(defn- share-app-for-job
[apps-client sharer sharee job-id {system-id :system_id app-id :app_id}]
(when-not (.hasAppPermission apps-client sharee system-id app-id "read")
(let [response (.shareAppWithSubject apps-client false {} sharee system-id app-id "read")]
(when-not (:success response)
(get-in response [:error :reason] "unable to share app")))))
(otel/with-span [s ["share-app-for-job"]]
(when-not (.hasAppPermission apps-client sharee system-id app-id "read")
(let [response (.shareAppWithSubject apps-client false {} sharee system-id app-id "read")]
(when-not (:success response)
(get-in response [:error :reason] "unable to share app"))))))

(defn- get-user-from-subject
[subject]
(condp = (:source_id subject)
"ldap" (:id subject)
"g:gsa" (str "@grouper-" (:id subject))
nil))

(defn- share-output-folder
[sharer {sharee :id} {:keys [result_folder_path]}]
(try+
(data-info/share-path sharer result_folder_path sharee "read")
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to share result folder: " (:error_code (service/parse-json body))))))
[sharer sharee {:keys [result_folder_path]}]
(let [sharee (get-user-from-subject sharee)]
(try+
(data-info/share-path sharer result_folder_path sharee "read")
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to share result folder: " (:error_code (service/parse-json body)))))))

(defn- share-input-file
[sharer {sharee :id} path]
(try+
(data-info/share-path sharer path sharee "read")
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to share input file, " path ": " (:error_code (service/parse-json body))))))
[sharer sharee path]
(let [sharee (get-user-from-subject sharee)]
(try+
(data-info/share-path sharer path sharee "read")
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to share input file, " path ": " (:error_code (service/parse-json body)))))))

(defn- process-child-jobs
[f job-id]
(remove nil? (mapcat f (jp/list-child-jobs job-id))))
(otel/with-span [s ["process-child-jobs"]]
(doall (remove nil? (mapcat f (jp/list-child-jobs job-id))))))

(defn- list-job-inputs
[apps-client {system-id :system_id app-id :app_id app-version-id :app_version_id :as job}]
Expand All @@ -148,7 +155,8 @@

(defn- process-job-inputs
[f apps-client job]
(remove nil? (map f (list-job-inputs apps-client job))))
(otel/with-span [s ["process-job-inputs"]]
(doall (remove nil? (map f (list-job-inputs apps-client job))))))

(defn- share-analysis
[job-id sharee level]
Expand All @@ -162,35 +170,37 @@

(defn- share-job
[update-fn apps-client sharer sharee {job-id :analysis_id level :permission}]
(let [job-id (uuidify job-id)
job (jp/get-job-by-id job-id)]
(try+
(share-analysis job-id sharee level)

(let [child-input-share-errs (process-child-jobs (partial share-child-job apps-client sharer sharee level) job-id)
input-share-errs (process-job-inputs (partial share-input-file sharer sharee) apps-client job)
output-share-err-msg (share-output-folder sharer sharee job)
app-share-err-msg (share-app-for-job apps-client sharer sharee job-id job)]
(update-fn (format "shared job ID %s with %s" job-id sharee))
(job-sharing-success job-id
job
level
(concat input-share-errs child-input-share-errs)
output-share-err-msg
app-share-err-msg))
(catch [:type ::job-sharing-failure] {:keys [failure-reason]}
(update-fn (format "failed to share job ID %s with %s: %s" job-id sharee failure-reason))
(job-sharing-failure job-id job level failure-reason))
(catch Object _
(update-fn (format "failed to share job ID %s with %s: %s" job-id sharee (str (:throwable &throw-context))))
(job-sharing-failure job-id job level)))))
(otel/with-span [s ["share-job"]]
(let [job-id (uuidify job-id)
job (jp/get-job-by-id job-id)]
(try+
(share-analysis job-id sharee level)

(let [child-input-share-errs (process-child-jobs (partial share-child-job apps-client sharer sharee level) job-id)
input-share-errs (process-job-inputs (partial share-input-file sharer sharee) apps-client job)
output-share-err-msg (share-output-folder sharer sharee job)
app-share-err-msg (share-app-for-job apps-client sharer sharee job-id job)]
(update-fn (format "shared job ID %s with %s" job-id sharee))
(job-sharing-success job-id
job
level
(concat input-share-errs child-input-share-errs)
output-share-err-msg
app-share-err-msg))
(catch [:type ::job-sharing-failure] {:keys [failure-reason]}
(update-fn (format "failed to share job ID %s with %s: %s" job-id sharee failure-reason))
(job-sharing-failure job-id job level failure-reason))
(catch Object _
(update-fn (format "failed to share job ID %s with %s: %s" job-id sharee (str (:throwable &throw-context))))
(job-sharing-failure job-id job level))))))

(defn- share-jobs-with-user
[update-fn apps-client sharer {sharee :subject :keys [analyses]}]
(let [responses (mapv (partial share-job update-fn apps-client sharer sharee) analyses)]
(cn/send-analysis-sharing-notifications (:shortUsername sharer) sharee responses)
{:subject sharee
:analyses responses}))
(otel/with-span [s ["share-jobs-with-user"]]
(let [responses (mapv (partial share-job update-fn apps-client sharer sharee) analyses)]
(cn/send-analysis-sharing-notifications (:shortUsername sharer) sharee responses)
{:subject sharee
:analyses responses})))

(defn- share-jobs-thread
[async-task-id]
Expand All @@ -209,10 +219,11 @@

(defn share-jobs
[apps-client {username :shortUsername} sharing-requests]
(-> (async-tasks/new-task "analysis-sharing" username sharing-requests)
(async-tasks/run-async-thread share-jobs-thread "analysis-sharing")
(string/replace #".*/tasks/" "")
uuidify))
(otel/with-span [s ["share-jobs"]]
(-> (async-tasks/new-task "analysis-sharing" username sharing-requests)
(async-tasks/run-async-thread share-jobs-thread "analysis-sharing")
(string/replace #".*/tasks/" "")
uuidify)))

(defn- job-sharing-validation-response
[job-id job level & [failure-reason]]
Expand All @@ -232,7 +243,6 @@
(verify-not-subjob job)
(verify-accessible sharer job-id)
(verify-support apps-client job-id)
(verify-not-group sharee job-id)
(job-sharing-validation-response job-id job level)
(catch [:type ::job-sharing-failure] {:keys [failure-reason]}
(job-sharing-validation-response job-id job level failure-reason)))
Expand All @@ -251,20 +261,22 @@
(update request-body :sharing (partial mapv (partial validate-subject-job-sharing-requests apps-client user)))))

(defn- unshare-output-folder
[sharer {sharee :id} {:keys [result_folder_path]}]
(try+
(data-info/unshare-path sharer result_folder_path sharee)
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to unshare result folder: " (:error_code (service/parse-json body))))))
[sharer sharee {:keys [result_folder_path]}]
(let [sharee (get-user-from-subject sharee)]
(try+
(data-info/unshare-path sharer result_folder_path sharee)
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to unshare result folder: " (:error_code (service/parse-json body)))))))

(defn- unshare-input-file
[sharer {sharee :id} path]
(try+
(data-info/unshare-path sharer path sharee)
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to unshare input file, " path ": " (:error_code (service/parse-json body))))))
[sharer sharee path]
(let [sharee (get-user-from-subject sharee)]
(try+
(data-info/unshare-path sharer path sharee)
nil
(catch ce/clj-http-error? {:keys [body]}
(str "unable to unshare input file, " path ": " (:error_code (service/parse-json body)))))))

(defn- unshare-analysis
[job-id sharee]
Expand Down Expand Up @@ -322,10 +334,11 @@

(defn unshare-jobs
[apps-client {username :shortUsername} sharing-requests]
(-> (async-tasks/new-task "analysis-unsharing" username sharing-requests)
(async-tasks/run-async-thread unshare-jobs-thread "analysis-unsharing")
(string/replace #".*/tasks/" "")
uuidify))
(otel/with-span [s ["unshare-jobs"]]
(-> (async-tasks/new-task "analysis-unsharing" username sharing-requests)
(async-tasks/run-async-thread unshare-jobs-thread "analysis-unsharing")
(string/replace #".*/tasks/" "")
uuidify)))

(defn- job-unsharing-validation-response
[job-id job & [failure-reason]]
Expand All @@ -344,7 +357,6 @@
(verify-not-subjob job)
(verify-accessible sharer job-id)
(verify-support apps-client job-id)
(verify-not-group sharee job-id)
(job-unsharing-validation-response job-id job)
(catch [:type ::job-sharing-failure] {:keys [failure-reason]}
(job-unsharing-validation-response job-id job failure-reason)))
Expand Down

0 comments on commit 251680e

Please sign in to comment.