diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9d62cba..9fa061de 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ on: - 'v*' pull_request: branches: - - 'master' + - 'main' jobs: test: diff --git a/doc/index.md b/doc/index.md index 2337b0e2..e994a4df 100644 --- a/doc/index.md +++ b/doc/index.md @@ -9,6 +9,7 @@ This process is one-way and any statements in the target LRS will not be replica - [Installation](install.md) - [Usage](usage.md) - [Persistence Config](persistence.md) +- [OAuth Support](oauth.md) - [All Options](options.md) - [JSON Config](json.md) - [Metrics](metrics.md) diff --git a/doc/oauth.md b/doc/oauth.md new file mode 100644 index 00000000..3bfc06ba --- /dev/null +++ b/doc/oauth.md @@ -0,0 +1,46 @@ +[<- Back to Index](index.md) +# OAuth Support + +LRSPipe supports the use of [OAuth 2.0](https://oauth.net/2/) with LRS endpoints that support it via the [Client Credentials Grant](https://tools.ietf.org/html/rfc6749#section-4.4) + +## Client Credentials Grant + +To use OAuth, specify a source/target `auth-uri`, `client-id` and `client-secret`: + +``` shell +bin/run.sh --source-url http://0.0.0.0:8080/xapi \ + --source-auth-uri http://0.0.0.0:8083/auth/realms/test/protocol/openid-connect \ + --source-client-id a_client_id \ + --source-client-secret 1234 \ + --target-url http://0.0.0.0:8081/xapi \ + --target-auth-uri http://0.0.0.0:8083/auth/realms/test/protocol/openid-connect \ + --target-client-id b_client_id \ + --target-client-secret 1234 +``` + +LRSPipe will connect to the specified auth provider(s) and provide up-to-date tokens for LRS requests as needed. + +### Scope + +According to OAuth 2.0 an optional `scope` parameter can be provided on Client Credentials Grant requests. To set this value for the source/target LRS: + +``` shell +bin/run.sh ... \ + --source-scope "lrs:read" \ + --target-scope "lrs:write" +``` + +Note that the configuration of claims like scope should be done on the OAuth client itself. This option is provided for backwards compatibility only. + +## Manual Bearer Token Usage + +If you have a bearer token that will be valid for the duration of your job, you can pass it directly: + +``` shell +bin/run.sh --source-url http://0.0.0.0:8080/xapi \ + --source-token eyJhbGciOi... + --target-url http://0.0.0.0:8081/xapi \ + --target-token eyJhbGciOi... +``` + +[<- Back to Index](index.md) diff --git a/doc/options.md b/doc/options.md index 5962a0a6..cb4b8336 100644 --- a/doc/options.md +++ b/doc/options.md @@ -48,6 +48,11 @@ All options: -p, --xapi-get-param KEY=VALUE {} xAPI GET Parameters --source-username USERNAME Source LRS BASIC Auth username --source-password PASSWORD Source LRS BASIC Auth password + --source-auth-uri URI Source LRS OAuth autentication URI + --source-client-id ID Source LRS OAuth client ID + --source-client-secret SECRET Source LRS OAuth client secret + --source-scope SCOPE Source LRS OAuth scope + --source-token TOKEN Source LRS OAuth Bearer token --source-backoff-budget BUDGET 10000 Source LRS Retry Backoff Budget in ms --source-backoff-max-attempt MAX 10 Source LRS Retry Backoff Max Attempts, set to -1 for no retry --source-backoff-j-range RANGE Source LRS Retry Backoff Jitter Range in ms @@ -56,6 +61,11 @@ All options: --target-batch-size SIZE 50 Target LRS POST desired batch size --target-username USERNAME Target LRS BASIC Auth username --target-password PASSWORD Target LRS BASIC Auth password + --target-auth-uri URI Target LRS OAuth autentication URI + --target-client-id ID Target LRS OAuth client ID + --target-client-secret SECRET Target LRS OAuth client secret + --target-scope SCOPE Target LRS OAuth scope + --target-token TOKEN Target LRS OAuth Bearer token --target-backoff-budget BUDGET 10000 Target LRS Retry Backoff Budget in ms --target-backoff-max-attempt MAX 10 Target LRS Retry Backoff Max Attempts, set to -1 for no retry --target-backoff-j-range RANGE Target LRS Retry Backoff Jitter Range in ms diff --git a/src/cli/com/yetanalytics/xapipe/cli.clj b/src/cli/com/yetanalytics/xapipe/cli.clj index 5bf9a628..6bbeff68 100644 --- a/src/cli/com/yetanalytics/xapipe/cli.clj +++ b/src/cli/com/yetanalytics/xapipe/cli.clj @@ -187,124 +187,81 @@ (assoc-in [:io-config :io-thread-count] conn-io-thread-count))}) +(def option-paths + {;; root config + :get-buffer-size [:get-buffer-size] + :cleanup-buffer-size [:cleanup-buffer-size] + :statement-buffer-size [:statement-buffer-size] + :batch-buffer-size [:batch-buffer-size] + :batch-timeout [:batch-timeout] + + ;; Source LRS + :source-batch-size [:source :batch-size] + :source-poll-interval [:source :poll-interval] + :get-params [:source :get-params] + :source-username [:source :request-config :username] + :source-password [:source :request-config :password] + :source-auth-uri [:source :request-config :oauth-params :auth-uri] + :source-client-id [:source :request-config :oauth-params :client-id] + :source-client-secret [:source :request-config :oauth-params :client-secret] + :source-scope [:source :request-config :oauth-params :scope] + :source-token [:source :request-config :token] + :source-backoff-budget [:source :backoff-opts :budget] + :source-backoff-max-attempt [:source :backoff-opts :max-attempt] + :source-backoff-j-range [:source :backoff-opts :j-range] + :source-backoff-initial [:source :backoff-opts :initial] + + ;; Target LRS + :target-batch-size [:target :batch-size] + :target-username [:target :request-config :username] + :target-password [:target :request-config :password] + :target-auth-uri [:target :request-config :oauth-params :auth-uri] + :target-client-id [:target :request-config :oauth-params :client-id] + :target-client-secret [:target :request-config :oauth-params :client-secret] + :target-scope [:target :request-config :oauth-params :scope] + :target-token [:target :request-config :token] + :target-backoff-budget [:target :backoff-opts :budget] + :target-backoff-max-attempt [:target :backoff-opts :max-attempt] + :target-backoff-j-range [:target :backoff-opts :j-range] + :target-backoff-initial [:target :backoff-opts :initial] + + ;; Filters + :filter-template-profile-urls [:filter :template :profile-urls] + :filter-template-ids [:filter :template :template-ids] + :filter-pattern-profile-urls [:filter :pattern :profile-urls] + :filter-pattern-ids [:filter :pattern :pattern-ids] + :filter-ensure-paths [:filter :path :ensure-paths] + :filter-match-paths [:filter :path :match-paths] + :filter-concept-profile-urls [:filter :concept :profile-urls] + :filter-concept-types [:filter :concept :concept-types] + :filter-activity-type-ids [:filter :concept :activity-type-ids] + :filter-verb-ids [:filter :concept :verb-ids] + :filter-attachment-usage-types [:filter :concept :attachment-usage-types]}) + (s/fdef options->config :args (s/cat :options ::opts/all-options) :ret ::job/config) (defn options->config - [{:keys [job-id - - source-url - - source-batch-size - source-poll-interval - get-params - source-username - source-password - source-backoff-budget - source-backoff-max-attempt - source-backoff-j-range - source-backoff-initial - - target-url - - target-batch-size - target-username - target-password - target-backoff-budget - target-backoff-max-attempt - target-backoff-j-range - target-backoff-initial - - get-buffer-size - batch-timeout - cleanup-buffer-size - - filter-template-profile-urls - filter-template-ids - filter-pattern-profile-urls - filter-pattern-ids - filter-ensure-paths - filter-match-paths - filter-concept-profile-urls - filter-concept-types - filter-activity-type-ids - filter-verb-ids - filter-attachment-usage-types - - statement-buffer-size - batch-buffer-size]}] - (cond-> {:get-buffer-size get-buffer-size - :batch-timeout batch-timeout - :source - {:request-config (cond-> (parse-lrs-url source-url) - (and source-username - source-password) - (assoc :username source-username - :password source-password)) - :get-params get-params - :poll-interval source-poll-interval - :batch-size source-batch-size - :backoff-opts - (cond-> {:budget source-backoff-budget - :max-attempt source-backoff-max-attempt} - source-backoff-j-range - (assoc :j-range source-backoff-j-range) - source-backoff-initial - (assoc :initial source-backoff-initial))} - :target - {:request-config (cond-> (parse-lrs-url target-url) - (and target-username - target-password) - (assoc :username target-username - :password target-password)) - :batch-size target-batch-size - :backoff-opts - (cond-> {:budget target-backoff-budget - :max-attempt target-backoff-max-attempt} - target-backoff-j-range - (assoc :j-range target-backoff-j-range) - target-backoff-initial - (assoc :initial target-backoff-initial))} - :filter {}} - statement-buffer-size - (assoc :statement-buffer-size statement-buffer-size) - - batch-buffer-size - (assoc :batch-buffer-size batch-buffer-size) - cleanup-buffer-size - (assoc :cleanup-buffer-size cleanup-buffer-size) - - (not-empty filter-template-profile-urls) - (assoc-in [:filter :template] {:profile-urls filter-template-profile-urls - :template-ids (into [] - filter-template-ids)}) - - (not-empty filter-pattern-profile-urls) - (assoc-in [:filter :pattern] {:profile-urls filter-pattern-profile-urls - :pattern-ids (into [] - filter-pattern-ids)}) - - (not-empty filter-ensure-paths) - (assoc-in [:filter :path :ensure-paths] filter-ensure-paths) - - (not-empty filter-match-paths) - (assoc-in [:filter :path :match-paths] filter-match-paths) - - (or (not-empty filter-concept-profile-urls) - (not-empty filter-activity-type-ids) - (not-empty filter-verb-ids) - (not-empty filter-attachment-usage-types)) - (assoc-in [:filter :concept] {:profile-urls - (into [] filter-concept-profile-urls) - :concept-types - (into [] filter-concept-types) - :activity-type-ids - (into [] filter-activity-type-ids) - :verb-ids - (into [] filter-verb-ids) - :attachment-usage-types - (into [] filter-attachment-usage-types)}))) + [{:keys [source-url + target-url] + :as options}] + (reduce-kv + (fn [m k v] + (if-let [path (get option-paths k)] + (if (= :filter (first path)) + ;; filters take collections + (if (not-empty v) + (assoc-in m path v) + m) + ;; All other opts are scalar + (assoc-in m path v)) + ;; ignore unknown + m)) + {:source {:request-config (parse-lrs-url source-url)} + :target {:request-config (parse-lrs-url target-url)} + :filter {}} + options)) (s/fdef create-job :args (s/cat :options ::opts/all-options) @@ -337,6 +294,48 @@ (.toString (java.util.UUID/randomUUID)))] (job/init-job job-id config)))) +(s/fdef only-auth + :args (s/cat + :config (s/with-gen ::job/config + (fn [] + (sgen/fmap + config/ensure-defaults + (s/gen ::job/config)))) + :lrs-type #{:source :target} + :auth-type #{:basic :token :oauth}) + :ret ::xapipe/job) + +(defn only-auth + "Given a job config, LRS type :source or :target and auth type of :basic, + :token, or :oauth, remove other auth from the LRS." + [config + lrs-type + auth-type] + (apply update-in + config + [lrs-type :request-config] + dissoc + (case auth-type + :basic [:token :oauth-params] + :token [:username :password :oauth-params] + :oauth [:username :password :token]))) + +(def auth-options + {:source-username [:source :basic] + :source-password [:source :basic] + :source-auth-uri [:source :oauth] + :source-client-id [:source :oauth] + :source-client-secret [:source :oauth] + :source-scope [:source :oauth] + :source-token [:source :token] + :target-username [:target :basic] + :target-password [:target :basic] + :target-auth-uri [:target :oauth] + :target-client-id [:target :oauth] + :target-client-secret [:target :oauth] + :target-scope [:target :oauth] + :target-token [:target :token]}) + (s/fdef reconfigure-with-options :args (s/cat :config (s/with-gen ::job/config (fn [] @@ -349,160 +348,43 @@ (defn reconfigure-with-options "Given an extant job and CLI options, apply any overriding options" [config - {:keys [source-url - source-username - source-password - - source-batch-size - source-poll-interval - get-params - source-backoff-budget - source-backoff-max-attempt - source-backoff-j-range - source-backoff-initial - - target-url - target-username - target-password - target-backoff-budget - target-backoff-max-attempt - target-backoff-j-range - target-backoff-initial - - target-batch-size - - get-buffer-size - batch-timeout - cleanup-buffer-size - - statement-buffer-size - batch-buffer-size]}] - (cond-> config - source-url - (update-in - [:source :request-config] - merge (parse-lrs-url source-url)) - - source-username - (assoc-in - [:source :request-config :username] - source-username) - - source-password - (assoc-in - [:source :request-config :password] - source-password) - - ;; if there's a default, only update on change + {:keys [source-batch-size + get-params] + :as options}] + (cond-> (reduce-kv + (fn [m k v] + (case k + :source-url (update-in m + [:source :request-config] + merge (parse-lrs-url v)) + :target-url (update-in m + [:target :request-config] + merge (parse-lrs-url v)) + (if-let [path (get option-paths k)] + (if (= :filter (first path)) + ;; filters are not reconfigurable + m + ;; All other opts + (let [m' (assoc-in m path v)] + (if-let [only-auth-args (get auth-options k)] + (apply only-auth m' only-auth-args) + m'))) + ;; ignore unknown + m))) + config + (dissoc options :get-params :source-batch-size)) + + ;; Special handling + ;; get params overwrite if not empty + (not-empty get-params) + (assoc-in [:source :get-params] get-params) + ;; a provided batch size also overwrites limit param (and source-batch-size (not= source-batch-size (get-in config [:source :batch-size]))) - (assoc-in - [:source :batch-size] - source-batch-size) - - (not= source-poll-interval - (get-in config [:source :poll-interval])) - (assoc-in - [:source :poll-interval] - source-poll-interval) - - ;; With no args, get-params is an empty map, so ignore - (and (not-empty get-params) - (not= get-params - (get-in config [:source :get-params]))) (-> - (assoc-in - [:source :get-params] - get-params) - (cond-> - (and source-batch-size - (not= source-batch-size - (get-in config [:source :batch-size]))) - (assoc-in - [:source :get-params :limit] - source-batch-size))) - - (not= source-backoff-budget - (get-in config [:source :backoff-opts :budget])) - (assoc-in - [:source :backoff-opts :budget] - source-backoff-budget) - - (not= source-backoff-max-attempt - (get-in config [:source :backoff-opts :max-attempt])) - (assoc-in - [:source :backoff-opts :max-attempt] - source-backoff-max-attempt) - - source-backoff-j-range - (assoc-in - [:source :backoff-opts :j-range] - source-backoff-j-range) - - source-backoff-initial - (assoc-in - [:source :backoff-opts :initial] - source-backoff-initial) - - target-url - (update-in - [:target :request-config] - merge (parse-lrs-url target-url)) - - target-username - (assoc-in - [:target :request-config :username] - target-username) - - target-password - (assoc-in - [:target :request-config :password] - target-password) - - target-batch-size - (assoc-in - [:target :batch-size] - target-batch-size) - - (not= target-backoff-budget - (get-in config [:target :backoff-opts :budget])) - (assoc-in - [:target :backoff-opts :budget] - target-backoff-budget) - - (not= target-backoff-max-attempt - (get-in config [:target :backoff-opts :max-attempt])) - (assoc-in - [:target :backoff-opts :max-attempt] - target-backoff-max-attempt) - - target-backoff-j-range - (assoc-in - [:target :backoff-opts :j-range] - target-backoff-j-range) - - target-backoff-initial - (assoc-in - [:target :backoff-opts :initial] - target-backoff-initial) - - (not= get-buffer-size - (get config :get-buffer-size)) - (assoc :get-buffer-size get-buffer-size) - - (not= batch-timeout - (get config :batch-timeout)) - (assoc :batch-timeout batch-timeout) - - statement-buffer-size - (assoc :statement-buffer-size statement-buffer-size) - - batch-buffer-size - (assoc :batch-buffer-size batch-buffer-size) - - cleanup-buffer-size - (assoc :cleanup-buffer-size cleanup-buffer-size))) + (assoc-in [:source :batch-size] source-batch-size) + (assoc-in [:source :get-params :limit] source-batch-size)))) (s/fdef list-store-jobs :args (s/cat :store :com.yetanalytics.xapipe/store) diff --git a/src/cli/com/yetanalytics/xapipe/cli/options.clj b/src/cli/com/yetanalytics/xapipe/cli/options.clj index 6c9c3c97..c8e44efe 100644 --- a/src/cli/com/yetanalytics/xapipe/cli/options.clj +++ b/src/cli/com/yetanalytics/xapipe/cli/options.clj @@ -191,6 +191,24 @@ :parse-fn #(Long/parseLong %) :validate [pos-int? "Must be a positive integer"]]]) +(defn oauth-opts + [tag] + [[nil + (format "--%s-auth-uri URI" tag) + (format "%s LRS OAuth autentication URI" (cs/capitalize tag))] + [nil + (format "--%s-client-id ID" tag) + (format "%s LRS OAuth client ID" (cs/capitalize tag))] + [nil + (format "--%s-client-secret SECRET" tag) + (format "%s LRS OAuth client secret" (cs/capitalize tag))] + [nil + (format "--%s-scope SCOPE" tag) + (format "%s LRS OAuth scope" (cs/capitalize tag))] + [nil + (format "--%s-token TOKEN" tag) + (format "%s LRS OAuth Bearer token" (cs/capitalize tag))]]) + ;; a set for filtering (def valid-get-params #{:agent @@ -234,7 +252,9 @@ m)))] [nil "--source-username USERNAME" "Source LRS BASIC Auth username"] [nil "--source-password PASSWORD" "Source LRS BASIC Auth password"]] - (backoff-opts "source"))) + (concat + (oauth-opts "source") + (backoff-opts "source")))) (def-option-specs source-options) @@ -247,7 +267,9 @@ :default 50] [nil "--target-username USERNAME" "Target LRS BASIC Auth username"] [nil "--target-password PASSWORD" "Target LRS BASIC Auth password"]] - (backoff-opts "target"))) + (concat + (oauth-opts "target") + (backoff-opts "target")))) (def-option-specs target-options) @@ -355,19 +377,25 @@ :target ::target-options-args :job ::job-options-args))) +(s/def ::no-defaults boolean?) + (s/fdef args->options - :args (s/cat :args (s/spec ::all-args)) + :args (s/cat :args (s/spec ::all-args) + :kwargs (s/keys* :opt-un [::no-defaults])) :ret ::all-options) (defn args->options - [args] + [args + & {:keys [no-defaults] + :or {no-defaults false}}] (let [{:keys [errors] :as ret} (cli/parse-opts args (concat common-options source-options target-options - job-options))] + job-options) + :no-defaults no-defaults)] (if (not-empty errors) (throw (ex-info (format "Options Error: %s" (cs/join \, errors)) diff --git a/src/cli/com/yetanalytics/xapipe/main.clj b/src/cli/com/yetanalytics/xapipe/main.clj index e22f1865..9069838b 100644 --- a/src/cli/com/yetanalytics/xapipe/main.clj +++ b/src/cli/com/yetanalytics/xapipe/main.clj @@ -93,12 +93,19 @@ Delete a Job: (job/reconfigure-job (cli/reconfigure-with-options (:config (or ?from-json ?from-storage)) - options))) + ;; reparse the args w/o defaults + (:options + (opts/args->options args + :no-defaults true))))) ;; Json is provided ?from-json (-> ?from-json job/upgrade-job - (update :config cli/reconfigure-with-options options)) + (update :config cli/reconfigure-with-options + ;; reparse args w/o defaults + (:options + (opts/args->options args + :no-defaults true)))) ;; New from options! :else diff --git a/src/lib/com/yetanalytics/xapipe.clj b/src/lib/com/yetanalytics/xapipe.clj index 5a34531d..4054ad77 100644 --- a/src/lib/com/yetanalytics/xapipe.clj +++ b/src/lib/com/yetanalytics/xapipe.clj @@ -607,3 +607,46 @@ (def stop-fn stop)) (stop-fn) ) + +(comment + ;; Use OAuth source LRS + (require '[com.yetanalytics.xapipe.store.impl.memory :as mem]) + + (def store (mem/new-store)) + + (def job-id (str (java.util.UUID/randomUUID))) + + (let [job (job/init-job + job-id + {:source + {:request-config + {:url-base "http://localhost:8080" + :xapi-prefix "/xapi" + :oauth-params + {:auth-uri "http://0.0.0.0:8081/auth/realms/test/protocol/openid-connect" + :client-id "lrs_client" + :client-secret "vGxvFpk9CLtfQwGCSJlb9SvUoDByuZjN"}} + :get-params {} + :poll-interval 1000 + :batch-size 50} + :target + {:request-config {:url-base "http://localhost:8082" + :xapi-prefix "/xapi"} + :batch-size 50}}) + + + {:keys [states] + stop :stop-fn} (run-job job) + + store-result + (-> states + (log-states :info) + (store-states store))] + (a/go + (let [result (a/ - ;; support basic auth if provided - (and (not-empty username) - (not-empty password)) - (assoc :basic-auth [username password]))))) + ;; support token if provided + (not-empty token) + (assoc :oauth-token token) + ;; support basic auth if provided + (and (not-empty username) + (not-empty password)) + (assoc :basic-auth [username password]) + ;; If OAuth support is enabled, pass through on a namespaced + ;; keyword to be picked up in async-request + oauth-params + (assoc ::oauth/oauth-params oauth-params))))) (def rate-limit-status? #{420 429}) @@ -195,13 +218,20 @@ (def retryable-error? #{502 503 504}) +(def retryable-oauth-error? + #{401}) + (defn retryable-status? "Is the HTTP status code one we care to retry?" - [status] + [status + & {:keys [oauth?] + :or {oauth? false}}] (and status (or (rate-limit-status? status) - (retryable-error? status)))) + (retryable-error? status) + (when oauth? + (retryable-oauth-error? status))))) (defn retryable-exception? "Is this a client exception we can retry?" @@ -257,50 +287,64 @@ :or {attempt 0 backoff-opts {:budget 10000 :max-attempt 10}}}] - (let [ret (or ret-chan (a/promise-chan)) - req (assoc request - :throw-exceptions false ;; don't throw so we handle resp as data - :async true - :async? true) ;; docs mention this but it is probably not needed - ] - (client/request - req - (fn [{:keys [status] - :as resp}] - (cond - ;; Both our GET and POST expect 200 - ;; If status is 200, pass the response - (= status 200) - (a/put! ret [:response resp]) - - ;; Retry based on retryable status - (retryable-status? status) - (maybe-retry - ret - reporter - req - attempt - backoff-opts) - - :else - (a/put! - ret - [:exception - (ex-info "Non-200 Request Status" - {:type ::request-fail - :response resp})]))) - (fn [exception] - (if (retryable-exception? exception) - (maybe-retry - ret - reporter - req - attempt - backoff-opts - exception) - (a/put! ret - [:exception - exception])))) + (let [ret (or ret-chan (a/promise-chan))] + (a/go + (let [[oauth-tag oauth-v] (if-let [oauth-params (::oauth/oauth-params request)] + (a/ (assoc request + ;; don't throw so we handle resp as data + :throw-exceptions false + :async true + ;; docs mention this but it is probably not needed + :async? true) + ;; If there is an oauth token result, use it, overwriting + ;; any existing token + oauth-v (assoc :oauth-token oauth-v))] + (client/request + req + (fn [{:keys [status] + :as resp}] + (cond + ;; Both our GET and POST expect 200 + ;; If status is 200, pass the response + (= status 200) + (a/put! ret [:response resp]) + + ;; Retry based on retryable status + (retryable-status? status :oauth? (some? oauth-v)) + (maybe-retry + ret + reporter + req + attempt + backoff-opts) + + :else + (a/put! + ret + [:exception + (ex-info "Non-200 Request Status" + {:type ::request-fail + :response resp})]))) + (fn [exception] + (if (retryable-exception? exception) + (maybe-retry + ret + reporter + req + attempt + backoff-opts + exception) + (a/put! ret + [:exception + exception])))))))) ret)) (s/def ::poll-interval diff --git a/src/lib/com/yetanalytics/xapipe/client/oauth.clj b/src/lib/com/yetanalytics/xapipe/client/oauth.clj new file mode 100644 index 00000000..6fd30f93 --- /dev/null +++ b/src/lib/com/yetanalytics/xapipe/client/oauth.clj @@ -0,0 +1,145 @@ +(ns com.yetanalytics.xapipe.client.oauth + "OAuth Client Credentials Grant Support" + (:require + [clj-http.client :as client] + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [clojure.string :as cs] + [clojure.tools.logging :as log])) + +;; Derive token URLs from OAuth2 endpoint/OIDC Issuer +(s/fdef token-url + :args (s/cat :auth-uri ::auth-uri) + :ret string?) + +(defn token-url* + [auth-uri] + (str auth-uri + (when-not (cs/ends-with? auth-uri "/") + "/") + "token")) + +(def token-url (memoize token-url*)) + +(s/def ::auth-uri string?) +(s/def ::client-id string?) +(s/def ::client-secret string?) +(s/def ::scope string?) + +(s/def ::oauth-params + (s/keys :req-un [::auth-uri + ::client-id + ::client-secret] + :opt-un [::scope])) + +(s/fdef token-request + :args (s/cat :params ::oauth-params) + :ret map?) + +(defn token-request + [{:keys [auth-uri + client-id + client-secret + scope]}] + {:url (token-url auth-uri) + :method :post + :basic-auth [client-id client-secret] + :form-params (cond-> {:grant_type "client_credentials"} + (not-empty scope) (assoc :scope scope)) + :as :json}) + +(defonce token-cache + (atom {})) + +(defn- token-cache-key + [{:keys [auth-uri + client-id]}] + (format "%s|%s" auth-uri client-id)) + +(defn get-token! + "Given client credentials grant params and kwarg options, attempt to get a + token, either from the cache or remote. Expires cache entries based on + :expires_in on the response. + + Returns a promise channel containing a tuple: + [:result ] + or + [:exception ] + + Options: + * bump-exp-ms time to bump up expiry of a token from the cache, with the + assumption that some time has already passed since issuance. + " + [{:keys [auth-uri + client-id + scope] :as params} + & {:keys [bump-exp-ms] + :or {bump-exp-ms 500}}] + (let [ret (a/promise-chan) + cache-key (token-cache-key params)] + (if-let [extant-token (get @token-cache cache-key)] + ;; If a token is already cached, return it + (a/put! ret [:result extant-token]) + ;; If not, go get it + (do + (log/debugf "Token request for %s" + cache-key) + (client/request + (merge (token-request params) + {:async true}) + (fn [{:keys [status + body] + :as resp}] + (if (= 200 status) + (let [{:keys [access_token + expires_in]} body] + ;; update the cache + (swap! token-cache assoc cache-key access_token) + ;; return to the user + (a/put! ret [:result access_token]) + (when expires_in + ;; later, remove from cache when expired + (let [remove-in (max + (- (* expires_in 1000) bump-exp-ms) + ;; don't go negative if exp is super 1s or below + ;; for some weird reason + 0)] + (a/go + (log/debugf "Waiting %d ms before removing token %s" + remove-in cache-key) + (a/options [] :no-defaults true)))))) diff --git a/src/test/com/yetanalytics/xapipe/cli_test.clj b/src/test/com/yetanalytics/xapipe/cli_test.clj index 18f37f26..4a6bd514 100644 --- a/src/test/com/yetanalytics/xapipe/cli_test.clj +++ b/src/test/com/yetanalytics/xapipe/cli_test.clj @@ -31,68 +31,80 @@ :conn-io-thread-count 16})))) (deftest options->config-test - (is (= {:get-buffer-size 100, - :batch-timeout 200, - :cleanup-buffer-size 100 - :source - {:request-config - {:url-base "http://0.0.0.0:8080", - :xapi-prefix "/xapi", - :username "foo", - :password "bar"}, - :get-params {}, - :poll-interval 1000, - :batch-size 50, - :backoff-opts - {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, - :target - {:request-config - {:url-base "http://0.0.0.0:8081", - :xapi-prefix "/xapi", - :username "foo", - :password "bar"}, - :batch-size 50, - :backoff-opts - {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, - :filter {}, - :statement-buffer-size 1000, - :batch-buffer-size 100} - (options->config - {:job-id "foo" + (testing "minimal" + (is (= {:source + {:request-config + {:url-base "http://0.0.0.0:8080", :xapi-prefix "/xapi"}}, + :target + {:request-config + {:url-base "http://0.0.0.0:8081", :xapi-prefix "/xapi"}}, + :filter {}} + (options->config + {:source-url "http://0.0.0.0:8080/xapi" + :target-url "http://0.0.0.0:8081/xapi"})))) + (testing "complex" + (is (= {:get-buffer-size 100, + :batch-timeout 200, + :cleanup-buffer-size 100 + :source + {:request-config + {:url-base "http://0.0.0.0:8080", + :xapi-prefix "/xapi", + :username "foo", + :password "bar"}, + :get-params {}, + :poll-interval 1000, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :target + {:request-config + {:url-base "http://0.0.0.0:8081", + :xapi-prefix "/xapi", + :username "foo", + :password "bar"}, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :filter {}, + :statement-buffer-size 1000, + :batch-buffer-size 100} + (options->config + {:job-id "foo" - :source-url "http://0.0.0.0:8080/xapi" + :source-url "http://0.0.0.0:8080/xapi" - :source-batch-size 50 - :source-poll-interval 1000 - :get-params {} - :source-username "foo" - :source-password "bar" - :source-backoff-budget 1000 - :source-backoff-max-attempt 10 - :source-backoff-j-range 10 - :source-backoff-initial 1 + :source-batch-size 50 + :source-poll-interval 1000 + :get-params {} + :source-username "foo" + :source-password "bar" + :source-backoff-budget 1000 + :source-backoff-max-attempt 10 + :source-backoff-j-range 10 + :source-backoff-initial 1 - :target-url "http://0.0.0.0:8081/xapi" + :target-url "http://0.0.0.0:8081/xapi" - :target-batch-size 50 - :target-username "foo" - :target-password "bar" - :target-backoff-budget 1000 - :target-backoff-max-attempt 10 - :target-backoff-j-range 10 - :target-backoff-initial 1 + :target-batch-size 50 + :target-username "foo" + :target-password "bar" + :target-backoff-budget 1000 + :target-backoff-max-attempt 10 + :target-backoff-j-range 10 + :target-backoff-initial 1 - :get-buffer-size 100 - :batch-timeout 200 - :cleanup-buffer-size 100 + :get-buffer-size 100 + :batch-timeout 200 + :cleanup-buffer-size 100 - :filter-template-profile-urls [] - :filter-template-ids [] - :filter-pattern-profile-urls [] - :filter-pattern-ids [] + :filter-template-profile-urls [] + :filter-template-ids [] + :filter-pattern-profile-urls [] + :filter-pattern-ids [] - :statement-buffer-size 1000 - :batch-buffer-size 100})))) + :statement-buffer-size 1000 + :batch-buffer-size 100}))))) (deftest create-job-test (is (= {:id "foo", @@ -108,7 +120,10 @@ {:url-base "http://0.0.0.0:8080", :xapi-prefix "/xapi", :username "foo", - :password "bar"}, + :password "bar", + :oauth-params {:auth-uri "http://example.com/token" + :client-id "foo" + :client-secret "bar"}}, :get-params {:limit 50}, :poll-interval 1000, :batch-size 50, @@ -119,7 +134,10 @@ {:url-base "http://0.0.0.0:8081", :xapi-prefix "/xapi", :username "foo", - :password "bar"}, + :password "bar", + :oauth-params {:auth-uri "http://example.com/token" + :client-id "foo" + :client-secret "bar"}}, :batch-size 50, :backoff-opts {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, @@ -140,6 +158,9 @@ :get-params {} :source-username "foo" :source-password "bar" + :source-auth-uri "http://example.com/token" + :source-client-id "foo" + :source-client-secret "bar" :source-backoff-budget 1000 :source-backoff-max-attempt 10 :source-backoff-j-range 10 @@ -149,6 +170,9 @@ :target-batch-size 50 :target-username "foo" :target-password "bar" + :target-auth-uri "http://example.com/token" + :target-client-id "foo" + :target-client-secret "bar" :target-backoff-budget 1000 :target-backoff-max-attempt 10 :target-backoff-j-range 10 @@ -166,87 +190,143 @@ :statement-buffer-size 1000 :batch-buffer-size 100})))) +(deftest only-auth-test + (let [config {:get-buffer-size 100, + :batch-timeout 200, + :cleanup-buffer-size 100 + :source + {:request-config + {:url-base "http://0.0.0.0:8080", + :xapi-prefix "/xapi"}, + :get-params {}, + :poll-interval 1000, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :target + {:request-config + {:url-base "http://0.0.0.0:8081", + :xapi-prefix "/xapi"}, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :filter {}, + :statement-buffer-size 1000, + :batch-buffer-size 100} + ambig-config (-> config + (update-in [:source :request-config] + merge + {:username "foo" + :password "bar" + :token "foobar" + :oauth-params + {:auth-uri "http://example.com/token" + :client-id "foo" + :client-secret "bar"}}))] + (are [auth-type req-config-after] + (-> (only-auth ambig-config :source auth-type) + (get-in [:source :request-config]) + (select-keys [:username + :password + :token + :oauth-params])) + :basic {:username "foo" + :password "bar"} + :token {:token "foobar"} + :oauth {:oauth-params + {:auth-uri "http://example.com/token" + :client-id "foo" + :client-secret "bar"}}))) + (deftest reconfigure-with-options-test - (let [reconfigured - (reconfigure-with-options - {:get-buffer-size 100, - :statement-buffer-size 1000, - :batch-buffer-size 100, - :batch-timeout 200, - :cleanup-buffer-size 50, - :source - {:request-config - {:url-base "http://0.0.0.0:8080", - :xapi-prefix "/xapi", - :username "foo", - :password "bar"}, - :get-params {:limit 50}, - :poll-interval 1000, - :batch-size 50, - :backoff-opts - {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, - :target - {:request-config - {:url-base "http://0.0.0.0:8081", - :xapi-prefix "/xapi", - :username "foo", - :password "bar"}, - :batch-size 50, - :backoff-opts - {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, - :filter {}} - {:job-id "foo" - :source-url "http://0.0.0.0:8082/xapi2" - :source-batch-size 100 - :source-poll-interval 3000 - :get-params {:format "exact"} - :source-username "baz" - :source-password "quxx" - :source-backoff-budget 999 - :source-backoff-max-attempt 9 - :source-backoff-j-range 9 - :source-backoff-initial 2 - - :target-url "http://0.0.0.0:8083/xapi2" - :target-batch-size 100 - :target-username "baz" - :target-password "quxx" - :target-backoff-budget 999 - :target-backoff-max-attempt 9 - :target-backoff-j-range 9 - :target-backoff-initial 2 - - :get-buffer-size 200 - :batch-timeout 300 - :cleanup-buffer-size 100 - - :statement-buffer-size 10000 - :batch-buffer-size 1000})] - (is (= {:get-buffer-size 200, - :batch-timeout 300, - :statement-buffer-size 10000, - :batch-buffer-size 1000, - :cleanup-buffer-size 100, - :source - {:request-config - {:url-base "http://0.0.0.0:8082", - :xapi-prefix "/xapi2", - :username "baz", - :password "quxx"}, - :get-params {:format "exact" - :limit 100}, - :poll-interval 3000, - :batch-size 100, - :backoff-opts - {:budget 999, :max-attempt 9, :j-range 9, :initial 2}}, - :target - {:request-config - {:url-base "http://0.0.0.0:8083", - :xapi-prefix "/xapi2", - :username "baz", - :password "quxx"}, - :batch-size 100, - :backoff-opts - {:budget 999, :max-attempt 9, :j-range 9, :initial 2}}, - :filter {}} - reconfigured)))) + (let [config {:get-buffer-size 100, + :statement-buffer-size 1000, + :batch-buffer-size 100, + :batch-timeout 200, + :cleanup-buffer-size 50, + :source + {:request-config + {:url-base "http://0.0.0.0:8080", + :xapi-prefix "/xapi", + :username "foo", + :password "bar"}, + :get-params {:limit 50}, + :poll-interval 1000, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :target + {:request-config + {:url-base "http://0.0.0.0:8081", + :xapi-prefix "/xapi", + :username "foo", + :password "bar"}, + :batch-size 50, + :backoff-opts + {:budget 1000, :max-attempt 10, :j-range 10, :initial 1}}, + :filter {}}] + (testing "General reconfig" + (is (= {:get-buffer-size 200, + :batch-timeout 300, + :statement-buffer-size 10000, + :batch-buffer-size 1000, + :cleanup-buffer-size 100, + :source + {:request-config + {:url-base "http://0.0.0.0:8082", + :xapi-prefix "/xapi2", + :username "baz", + :password "quxx"}, + :get-params {:format "exact" + :limit 100}, + :poll-interval 3000, + :batch-size 100, + :backoff-opts + {:budget 999, :max-attempt 9, :j-range 9, :initial 2}}, + :target + {:request-config + {:url-base "http://0.0.0.0:8083", + :xapi-prefix "/xapi2", + :username "baz", + :password "quxx"}, + :batch-size 100, + :backoff-opts + {:budget 999, :max-attempt 9, :j-range 9, :initial 2}}, + :filter {}} + (reconfigure-with-options + config + {:job-id "foo" + :source-url "http://0.0.0.0:8082/xapi2" + :source-batch-size 100 + :source-poll-interval 3000 + :get-params {:format "exact"} + :source-username "baz" + :source-password "quxx" + :source-backoff-budget 999 + :source-backoff-max-attempt 9 + :source-backoff-j-range 9 + :source-backoff-initial 2 + + :target-url "http://0.0.0.0:8083/xapi2" + :target-batch-size 100 + :target-username "baz" + :target-password "quxx" + :target-backoff-budget 999 + :target-backoff-max-attempt 9 + :target-backoff-j-range 9 + :target-backoff-initial 2 + + :get-buffer-size 200 + :batch-timeout 300 + :cleanup-buffer-size 100 + + :statement-buffer-size 10000 + :batch-buffer-size 1000})))) + (testing "Auth change" + (is (= {:url-base "http://0.0.0.0:8080", + :xapi-prefix "/xapi", + :token "foobar"} + (-> (reconfigure-with-options + config + {:source-token "foobar"}) + (get-in [:source :request-config])))))))