From 7af037a064826ca8fc40bbe3b81e804d127208e4 Mon Sep 17 00:00:00 2001 From: wdhowe Date: Mon, 14 Sep 2020 11:35:16 -0500 Subject: [PATCH] issue/95/scheduler-calls-missing: Implement missing scheduler calls Added the following scheduler protocols and calls: * acknowledge-operation-status * reconcile-operations * suppress * update-framework Also enabled clj-fmt on save, which made the formatting changes. --- src/meson/client/impl/master/scheduler.clj | 310 ++++++++++++--------- src/meson/protocols/master/scheduler.clj | 51 +++- 2 files changed, 223 insertions(+), 138 deletions(-) diff --git a/src/meson/client/impl/master/scheduler.clj b/src/meson/client/impl/master/scheduler.clj index f14020d..639c897 100644 --- a/src/meson/client/impl/master/scheduler.clj +++ b/src/meson/client/impl/master/scheduler.clj @@ -16,13 +16,13 @@ (defn start "" ([this] - (start this {})) + (start this {})) ([this opts] - (log/debug "Starting connection manager for the scheduler ...") - (->> opts - (into default-scheduler-opts) - (http-conn-mgr/make-reusable-conn-manager) - (assoc this :conn-mgr)))) + (log/debug "Starting connection manager for the scheduler ...") + (->> opts + (into default-scheduler-opts) + (http-conn-mgr/make-reusable-conn-manager) + (assoc this :conn-mgr)))) (defn stop "" @@ -35,187 +35,247 @@ Some API endpoints may break the 1 to 1 pattern of :type and payload key." [^Keyword type] - (condp = type - :request (str (util/keyword->lower type) "s") - (util/keyword->lower type))) + (condp = type + :request (str (util/keyword->lower type) "s") + (util/keyword->lower type))) (comment (payload-key :request) ; plural expected (payload-key :accept) ; singular expected - (payload-key :subscribe)) ; singular expected + (payload-key :subscribe) ; singular expected + (payload-key :acknowledge_operation_status)) (defn call ([this ^Keyword type] - (call this type nil nil)) + (call this type nil nil)) ([this ^Keyword type payload framework-id] - (call this type payload framework-id http/json-content-type)) + (call this type payload framework-id http/json-content-type)) ([this ^Keyword type payload framework-id content-type] - (call this type payload framework-id content-type {})) + (call this type payload framework-id content-type {})) ([this ^Keyword type payload framework-id content-type opts] - (let [data {:type (util/keyword->upper type) - (payload-key type) payload}] - (http/post - this - scheduler-path - :body (json/write-str (merge data (or framework-id {}))) - :opts (into opts {:content-type content-type - :accept content-type}))))) + (let [data {:type (util/keyword->upper type) + (payload-key type) payload}] + (http/post + this + scheduler-path + :body (json/write-str (merge data (or framework-id {}))) + :opts (into opts {:content-type content-type + :accept content-type}))))) (defn accept ([this payload stream-id framework-id] - (accept this payload stream-id framework-id http/json-content-type)) + (accept this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :accept - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :accept + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn acknowledge ([this payload stream-id framework-id] - (acknowledge this payload stream-id framework-id http/json-content-type)) + (acknowledge this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :acknowledge - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :acknowledge + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) + +(defn acknowledge-operation-status + ([this payload stream-id framework-id] + (acknowledge-operation-status this payload stream-id framework-id http/json-content-type)) + ([this payload stream-id framework-id content-type] + (call + this + :acknowledge_operation_status + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn decline ([this payload stream-id framework-id] - (decline this payload stream-id framework-id http/json-content-type)) + (decline this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :decline - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :decline + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn kill-task ([this payload stream-id framework-id] - (kill-task this payload stream-id framework-id http/json-content-type)) + (kill-task this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :kill - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :kill + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn message ([this payload stream-id framework-id] - (message this payload stream-id framework-id http/json-content-type)) + (message this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :message - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :message + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn reconcile ([this payload stream-id framework-id] - (reconcile this payload stream-id framework-id http/json-content-type)) + (reconcile this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :reconcile - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :reconcile + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) + +(defn reconcile-operations + ([this payload stream-id framework-id] + (reconcile-operations this payload stream-id framework-id http/json-content-type)) + ([this payload stream-id framework-id content-type] + (call + this + :reconcile_operations + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn request ([this payload stream-id framework-id] - (request this payload stream-id framework-id http/json-content-type)) + (request this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :request ; type is request, json keyword is requests (plural) - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :request ; type is request, json keyword is requests (plural) + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn revive ([this payload stream-id framework-id] - (revive this payload stream-id framework-id http/json-content-type)) + (revive this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :revive - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :revive + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn shutdown-executor ([this payload stream-id framework-id] - (shutdown-executor this payload stream-id framework-id http/json-content-type)) + (shutdown-executor this payload stream-id framework-id http/json-content-type)) ([this payload stream-id framework-id content-type] - (call - this - :shutdown - payload - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :shutdown + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn subscribe ([this payload] - (subscribe this payload http/json-content-type)) + (subscribe this payload http/json-content-type)) ([this payload content-type] - (call - this - :subscribe - payload - nil - content-type - {:as :stream - :streaming? true - :chunked? true - :connection http/keep-alive - :connection-manager (:conn-mgr this)}))) + (call + this + :subscribe + payload + nil + content-type + {:as :stream + :streaming? true + :chunked? true + :connection http/keep-alive + :connection-manager (:conn-mgr this)}))) + +(defn suppress + ([this payload stream-id framework-id] + (suppress this payload stream-id framework-id http/json-content-type)) + ([this payload stream-id framework-id content-type] + (call + this + :suppress + payload + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) (defn teardown ([this stream-id framework-id] - (teardown this stream-id framework-id http/json-content-type)) + (teardown this stream-id framework-id http/json-content-type)) ([this stream-id framework-id content-type] - (call - this - :teardown - nil - framework-id - content-type - {:connection-manager (:conn-mgr this) - :headers {:mesos-stream-id stream-id}}))) + (call + this + :teardown + nil + framework-id + content-type + {:connection-manager (:conn-mgr this) + :headers {:mesos-stream-id stream-id}}))) + +(defn update-framework + ([this payload] + (update-framework this payload http/json-content-type)) + ([this payload content-type] + (call + this + :update_framework + payload + nil + content-type + {:as :stream + :streaming? true + :chunked? true + :connection http/keep-alive + :connection-manager (:conn-mgr this)}))) (def behaviour {:accept accept :acknowledge acknowledge + :acknowledge-operation-status acknowledge-operation-status :decline decline :kill-task kill-task :message message :reconcile reconcile + :reconcile-operations reconcile-operations :request request :revive revive - :subscribe subscribe :shutdown-executor shutdown-executor - :teardown teardown}) + :subscribe subscribe + :suppress suppress + :teardown teardown + :update-framework update-framework}) diff --git a/src/meson/protocols/master/scheduler.clj b/src/meson/protocols/master/scheduler.clj index 26048a7..bf24397 100644 --- a/src/meson/protocols/master/scheduler.clj +++ b/src/meson/protocols/master/scheduler.clj @@ -5,36 +5,61 @@ * http://mesos.apache.org/documentation/latest/scheduler-http-api/") (defprotocol IScheduler - "" - (accept [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + "Scheduler API Calls accepted by the master." + (accept [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler when it accepts offer(s) sent by the master. The `ACCEPT` request includes the type of operations (e.g., launch task, reserve resources, create volumes) that the scheduler wants to perform on the offers.") - (acknowledge [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (acknowledge [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to acknowledge a status update.") - (decline [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (acknowledge-operation-status [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] + "Sent by the scheduler to acknowledge an operation status update.") + (decline [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to explicitly decline offer(s) received. Note that this is same as sending an `ACCEPT` call with no operations.") - (kill-task [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (kill-task [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to kill a specific task. If the scheduler has a custom executor, the kill is forwarded to the executor; it is up to the executor to kill the task and send a `TASK_KILLED` (or `TASK_FAILED`) update.") - (message [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (message [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to send arbitrary binary data to the executor.") - (reconcile [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (reconcile [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to query the status of non-terminal tasks.") - (request [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (reconcile-operations [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] + "Sent by the scheduler to query the status of non-terminal and + terminal-but-unacknowledged operations.") + (request [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to request resources from the master/allocator.") - (revive [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (revive [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to remove any/all filters that it has previously set via `ACCEPT` or `DECLINE` calls.") - (shutdown-executor [this payload stream-id framework-id] [this payload stream-id framework-id content-type] + (shutdown-executor [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] "Sent by the scheduler to shutdown a specific custom executor.") - (subscribe [this payload] [this payload content-type] + (subscribe [this payload] + [this payload content-type] "This is the first step in the communication process between the scheduler and the master. This is also to be considered as subscription to the “/scheduler” events stream.") - (teardown [this stream-id framework-id] [this stream-id framework-id content-type] - "Sent by the scheduler when it wants to tear itself down.")) + (suppress [this payload stream-id framework-id] + [this payload stream-id framework-id content-type] + "Sent by the scheduler when it doesn’t need offers for a given set of its roles.") + (teardown [this stream-id framework-id] + [this stream-id framework-id content-type] + "Sent by the scheduler when it wants to tear itself down.") + (update-framework [this payload] + [this payload content-type] + "Sent by the scheduler to change fields of its FrameworkInfo and/or the set of + suppressed roles."))