Skip to content

Latest commit

 

History

History
813 lines (720 loc) · 24.8 KB

core.org

File metadata and controls

813 lines (720 loc) · 24.8 KB

Contents

Namespace: thi.ng.fabric.core

This namespace provides…

Protocols

(defprotocol IVertex
  (vertex-id [_])
  (connect-to! [_ v sig-fn edge-opts])
  (disconnect-neighbor! [_ v])
  (disconnect-all! [_])
  (neighbors [_])
  (collect! [_])
  (collect-final! [_])
  (score-collect [_])
  (signal! [_ handler])
  (score-signal [_])
  (receive-signal [_ src sig])
  (uncollected-signals [_])
  (signal-map [_])
  (new-edge-count [_])
  (set-value! [_ val])
  (update-value! [_ f])
  (previous-value [_]))

(defprotocol IComputeGraph
  (add-vertex! [_ val vspec] [_ val vspec ctor])
  (update-vertex! [_ v f])
  (remove-vertex! [_ v])
  (vertex-for-id [_ id])
  (vertices [_])
  (add-edge! [_ src dest f opts])
  (remove-edge! [_ src dest]))

(defprotocol IStateful
  (state [_])
  (update-state! [_ f]))

(defprotocol IWatch
  (notify-watches [_ evt])
  (add-watch! [_ type id f])
  (remove-watch! [_ type id]))

(defprotocol IGraphComponent
  (component-type [_])
  (add-to-graph! [_ g])
  (remove-from-graph! [_ g] [_ g parent]))

(defprotocol IGraphExecutor
  (execute! [_])
  (notify! [_])
  (stop! [_]))

Type implementations

Vertex

(deftype Vertex
    [id value prev-val state uncollected signal-map new-edges outs
     score-signal-fn score-collect-fn collect-fn collect-final-fn]
  #?@(:clj
       [clojure.lang.IDeref
        (deref
         [_] @value)]
       :cljs
       [IDeref
        (-deref
         [_] @value)])
  IVertex
  (vertex-id
    [_] id)
  (set-value!
    [_ val] (reset! value val) _)
  (update-value!
    [_ f] (swap! value f) _)
  (previous-value
    [_] @prev-val)
  (collect!
    [_]
    (collect-fn _)
    (reset! uncollected [])
    _)
  (collect-final!
    [_]
    (when collect-final-fn
      (collect-final-fn _)
      _))
  (score-collect
    [_] (score-collect-fn _))
  (connect-to!
    [_ v sig-fn opts]
    (swap! outs assoc v [sig-fn opts])
    (swap! new-edges inc)
    (debug id "edge to" (thi.ng.fabric.core/vertex-id v) "(" (pr-str opts) ") new:" @new-edges)
    _)
  (neighbors
    [_] (keys @outs))
  (disconnect-neighbor!
    [_ v]
    (when v
      (debug id "disconnect from" (thi.ng.fabric.core/vertex-id v))
      (swap! outs dissoc v))
    _)
  (disconnect-all!
    [_]
    (run! #(disconnect-neighbor! _ %) (keys @outs))
    _)
  (new-edge-count
    [_] @new-edges)
  (score-signal
    [_] (score-signal-fn _))
  (signal!
    [_ handler]
    (reset! prev-val @value)
    (reset! new-edges 0)
    (handler _ @outs))
  (receive-signal
    [_ src sig]
    (if-not (= sig (@signal-map src))
      (do (swap! uncollected conj sig)
          (swap! signal-map assoc src sig)
          true)
      (debug id " ignoring unchanged signal: " (pr-str sig))))
  (signal-map
    [_] @signal-map)
  (uncollected-signals
    [_] @uncollected)
  IStateful
  (state
    [_] @state)
  (update-state!
    [_ f] (swap! state f) _))

ComputeGraph

(defrecord InMemoryGraph [state watches]
  IComputeGraph
  (add-vertex!
    [_ val vspec]
    (add-vertex! _ val vspec vertex))
  (add-vertex!
    [_ val vspec ctor]
    (let [nv (volatile! nil)]
      (swap! state
             #(let [id (::next-id %)
                    v  (ctor id val vspec)]
                (vreset! nv v)
                (-> %
                    (update ::next-id (::id-gen-fn %))
                    (update ::vertices assoc id v))))
      (notify-watches* @watches [:add-vertex @nv])
      @nv))
  (remove-vertex!
    [_ v]
    (when (get-in @state [::vertices (vertex-id v)])
      (notify-watches* @watches [:remove-vertex v])
      (swap! state update ::vertices dissoc (vertex-id v))
      ;;(disconnect-all! v)
      true))
  (update-vertex!
    [_ v f]
    (update-value! v f)
    (notify-watches* @watches [:update-vertex v])
    _)
  (vertex-for-id
    [_ id] (get-in @state [::vertices id]))
  (vertices
    [_] (-> @state ::vertices vals))
  (add-edge!
    [_ src dest sig-fn opts]
    (connect-to! src dest sig-fn opts)
    (notify-watches* @watches [:add-edge src dest sig-fn opts])
    _)
  (remove-edge!
    [_ src dest]
    (disconnect-neighbor! src dest)
    (notify-watches* @watches [:remove-edge src dest])
    _)
  IWatch
  (add-watch!
    [_ type id f]
    (info "adding watch" type id f)
    (swap! watches assoc-in [type id] f)
    _)
  (remove-watch!
    [_ type id]
    (info "removing watch" type id)
    (swap! watches update type dissoc id)
    _)
  (notify-watches
    [_ evt] (notify-watches* @watches evt)))

Type constructors

Default Vertex

(def default-vertex-opts
  {::score-collect-fn default-score-collect
   ::score-signal-fn  default-score-signal
   ::collect-fn       collect-into})

(defn vertex
  [id val opts]
  (let [opts (merge default-vertex-opts opts)
        opts' (dissoc opts
                      ::score-signal-fn ::score-collect-fn
                      ::collect-fn ::collect-final-fn)]
    (Vertex.
     id
     (atom val)                 ;; value
     (atom nil)                 ;; prev
     (atom opts')               ;; state
     (atom [])                  ;; uncollected
     (atom {})                  ;; sig-map
     (atom 0)                   ;; new-edges
     (atom {})                  ;; outs
     (::score-signal-fn opts)
     (::score-collect-fn opts)
     (::collect-fn opts)
     (::collect-final-fn opts))))

Graph

refactor ID gen and support data.int-map for vertex lookups

(defn compute-graph
  ([]
   (compute-graph {::id-gen-fn (fnil inc -1)}))
  ([opts]
   (let [start-id ((::id-gen-fn opts) nil)
         state    (merge opts {::vertices {} ::next-id start-id})]
     (InMemoryGraph.
      (atom state)
      (atom {})))))

Signal & collection functions

(defn collect-pure
  [collect-fn]
  (fn [vertex]
    (update-value! vertex #(collect-fn % (uncollected-signals vertex)))))

(def collect-into (collect-pure into))

(defn signal-forward
  [vertex _] @vertex)

(defn default-score-signal
  "Computes vertex signal score. Returns 0 if value equals prev-val,
  else returns 1."
  [vertex] (if (= @vertex (previous-value vertex)) 0 1))

(defn score-signal-with-new-edges
  "Computes vertex signal score. Returns number of *new* outgoing
  edges plus 1 if value not equals prev-val. New edge counter is reset
  each time signal! is called."
  [vertex] (+ (new-edge-count vertex) (if (= @vertex (previous-value vertex)) 0 1)))

(defn default-score-collect
  "Computes vertex collect score, here simply the number of
  uncollected signal values."
  [vertex] (count (uncollected-signals vertex)))

(defn sync-vertex-signal
  [vertex outs]
  (let [id (vertex-id vertex)]
    (loop [active (transient []), outs outs]
      (let [[v [f opts]] (first outs)]
        (if v
          (let [signal (f vertex opts)]
            (if-not (nil? signal)
              (if (receive-signal v id signal)
                (recur (conj! active v) (next outs))
                (recur active (next outs)))
              (do (debug "signal fn for" (vertex-id v) "returned nil, skipping...")
                  (recur active (next outs)))))
          (persistent! active))))))

Execution contexts

Shared context helpers

(defn execution-result
  [type sigs colls t0 & [opts]]
  (let [rt (millis-since t0)]
    (->> {:collections colls
          :signals     sigs
          :type        type
          :runtime     rt
          :time-per-op (if (or (pos? sigs) (pos? colls)) (/ rt (+ sigs colls)) 0)}
         (merge opts))))

(defn async-execution-result
  [type out sigs colls t0 & [opts]]
  (go (>! out (execution-result type colls sigs t0 opts))))

(defn collect-final-all!
  [vertices] (run! collect-final! vertices))

(defn sub-pass-combine
  ([] [#{} 0])
  ([acc] acc)
  ([[active ax] [active' x]] [(into active active') (+ ax x)]))

(defn single-pass-combine
  ([] [#{} 0 0])
  ([acc] acc)
  ([[act as ac] [act' s c]] [(into act act') (+ as s) (+ ac c)]))

(defn add-context-watches!
  [g watch-id queue watches]
  (run! (fn [[id f]] (add-watch! g id watch-id (f queue))) watches))

(defn remove-context-watches!
  [g watch-id watches]
  (run! (fn [[id]] (remove-watch! g id watch-id)) watches))

Synchronous execution

Processing passes

(defn sync-signal-pass-simple
  [thresh]
  (fn [vertices]
    (loop [sigs 0, verts vertices]
      (if verts
        (let [v (first verts)]
          (if (> (score-signal v) thresh)
            (do (debug (vertex-id v) "signaling")
                (recur (long (+ sigs (count (signal! v sync-vertex-signal)))) (next verts)))
            (recur sigs (next verts))))
        sigs))))

(defn sync-collect-pass-simple
  [thresh]
  (fn [vertices]
    (loop [colls 0, verts vertices]
      (if verts
        (let [v (first verts)]
          (if (> (score-collect v) thresh)
            (do (debug (vertex-id v) "collecting")
                (collect! v)
                (recur (inc colls) (next verts)))
            (recur colls (next verts))))
        colls))))

(defn parallel-signal-pass-simple
  [thresh]
  (fn [vertices]
    (r/fold
     + (fn [sigs v]
         (if (> (score-signal v) thresh)
           (+ sigs (count (signal! v sync-vertex-signal)))
           sigs))
     vertices)))

(defn parallel-collect-pass-simple
  [thresh]
  (fn [vertices]
    (r/fold
     + (fn [colls v]
         (if (> (score-collect v) thresh)
           (do (collect! v) (inc colls))
           colls))
     vertices)))

Default config

(defn default-sync-context-opts
  []
  {:collect-thresh 0
   :signal-thresh  0
   :max-iter       1e6
   :signal-fn      parallel-signal-pass-simple
   :collect-fn     parallel-collect-pass-simple
   :collect-final? false
   :watches        {:remove-vertex
                    (fn [queue]
                      (fn [[__ v]] (signal! v sync-vertex-signal)))}})

Implementation

(defn sync-execution-context
  [opts]
  (let [ctx       (merge (default-sync-context-opts) opts)
        g         (:graph ctx)
        coll-fn   ((:collect-fn ctx) (:collect-thresh ctx))
        sig-fn    ((:signal-fn ctx) (:signal-thresh ctx))
        max-iter  (:max-iter ctx)
        watch-id  (keyword (gensym))
        ->result  (fn [type iter sigs colls t0]
                    (remove-context-watches! g watch-id (:watches ctx))
                    (execution-result type sigs colls t0 {:iterations iter}))]
    (reify
      #?@(:clj
           [clojure.lang.IDeref
            (deref [_] ctx)]
           :cljs
           [IDeref
            (-deref [_] ctx)])
      IGraphExecutor
      (stop! [_] (err/unsupported!))
      (notify! [_] (err/unsupported!))
      (execute!
        [_]
        (add-context-watches! g watch-id nil (:watches ctx))
        (let [t0 (now)]
          (loop [i 0, colls 0, sigs 0]
            (if (<= i max-iter)
              (let [verts  (vertices g)
                    sigs'  (sig-fn verts)
                    colls' (coll-fn verts)]
                (if (or (pos? sigs') (pos? colls'))
                  (recur (inc i) (long (+ colls colls')) (long (+ sigs sigs')))
                  (do (when (:collect-final? ctx) (collect-final-all! verts))
                      (->result :converged i sigs colls t0))))
              (->result :max-iter-reached i sigs colls t0))))))))

Scheduled

Default config

(def ^:private default-context-watches
  {:remove-vertex
   (fn [queue]
     (fn [[__ v]] (swap! queue into (signal! v sync-vertex-signal))))
   :add-edge
   (fn [queue]
     (fn [[__ src dest]] (swap! queue into [src dest])))})

(defn default-scheduled-context-opts
  []
  {:collect-thresh 0
   :signal-thresh  0
   :processor      parallel-two-pass-processor
   :scheduler      two-pass-scheduler
   ;;:processor      eager-probabilistic-single-pass-processor
   ;;:scheduler      single-pass-scheduler
   :max-ops        1e6
   :threads        #?(:clj (+ 2 (.availableProcessors (Runtime/getRuntime))) :cljs 1)
   :collect-final? false
   :watches        default-context-watches})

Implementation

(defn scheduled-execution-context
  [opts]
  (let [ctx       (merge (default-scheduled-context-opts) opts)
        g         (:graph ctx)
        c-thresh  (:collect-thresh ctx)
        s-thresh  (:signal-thresh ctx)
        max-ops   (:max-ops ctx)
        scheduler ((:scheduler ctx) ctx)
        threads   (:threads ctx)
        watch-id  (keyword (gensym))
        v-filter  (filter #(or (> (score-signal %) s-thresh) (> (score-collect %) c-thresh)))
        ->result  (fn [type sigs colls t0]
                    (remove-context-watches! g watch-id (:watches ctx))
                    (execution-result type sigs colls t0))]
    (reify
      #?@(:clj
           [clojure.lang.IDeref
            (deref [_] ctx)]
           :cljs
           [IDeref
            (-deref [_] ctx)])
      IGraphExecutor
      (stop! [_] (err/unsupported!))
      (notify! [_] (err/unsupported!))
      (execute!
        [_]
        (let [t0     (now)
              active (atom (sequence v-filter (vertices g)))
              queue  (atom #{})]
          (add-context-watches! g watch-id queue (:watches ctx))
          (loop [colls 0, sigs 0]
            ;;(warn :active-count (count @active))
            (if (seq @active)
              (if (<= (+ colls sigs) max-ops)
                (let [grp-size            (max 512 (long (/ (count @active) threads)))
                      [act' sigs' colls'] (scheduler grp-size @active)]
                  ;;(warn :auto (count @queue))
                  (reset! active (into (into #{} v-filter act') @queue))
                  (reset! queue #{})
                  (recur (long (+ colls colls')) (long (+ sigs sigs'))))
                (->result :max-ops-reached sigs colls t0))
              (do (when (:collect-final? ctx) (collect-final-all! (vertices g)))
                  (->result :converged sigs colls t0)))))))))

Async

Default config

(defn default-async-context-opts
  []
  {:collect-thresh 0
   :signal-thresh  0
   :processor      parallel-two-pass-processor
   :scheduler      two-pass-scheduler
   ;;:processor      eager-probabilistic-single-pass-processor
   ;;:scheduler      single-pass-scheduler
   :max-ops        1e6
   :auto-stop      false
   :threads        #?(:clj (inc (.availableProcessors (Runtime/getRuntime))) :cljs 1)
   :collect-final? false
   :watches        default-context-watches})

Implementation

(defn async-execution-context
  [opts]
  (let [ctx         (merge (default-async-context-opts) opts)
        ctx         (update ctx :result #(or % (async/chan)))
        g           (:graph ctx)
        c-thresh    (:collect-thresh ctx)
        s-thresh    (:signal-thresh ctx)
        max-ops     (:max-ops ctx)
        scheduler   ((:scheduler ctx) ctx)
        threads     (:threads ctx)
        res-chan    (:result ctx)
        notify-chan (async/chan (async/dropping-buffer 1))
        running?    (volatile! false)
        watch-id    (keyword (gensym))
        v-filter    (filter #(or (> (score-signal %) s-thresh) (> (score-collect %) c-thresh)))
        ->result    (fn [type sigs colls t0] (go (>! res-chan (execution-result type sigs colls t0))))]
    (reify
      #?@(:clj
           [clojure.lang.IDeref
            (deref [_] (assoc ctx :running @running?))]
           :cljs
           [IDeref
            (-deref [_] (assoc ctx :running @running?))])
      IGraphExecutor
      (stop!
        [_]
        (remove-context-watches! g watch-id (:watches ctx))
        (async/close! notify-chan)
        (vreset! running? false))
      (notify!
        [_] (go (>! notify-chan true)) nil)
      (execute!
        [_]
        (if-not @running?
          (do
            (vreset! running? true)
            (let [t0     (volatile! (now))
                  active (volatile! (sequence v-filter (vertices g)))
                  queue  (atom #{})]
              (add-context-watches! g watch-id queue (:watches ctx))
              (go-loop [colls 0, sigs 0]
                ;;(warn :active-count (count @active))
                (if (and @running? (seq @active))
                  (if (<= (+ colls sigs) max-ops)
                    (let [grp-size            (max 512 (long (/ (count @active) threads)))
                          [act' sigs' colls'] (scheduler grp-size @active)
                          curr-q              @queue]
                      ;;(warn :queue (count curr-q))
                      (vreset! active (into (into #{} v-filter act') curr-q))
                      (compare-and-set! queue curr-q #{})
                      (recur (long (+ colls colls')) (long (+ sigs sigs'))))
                    (do (stop! _)
                        (->result :max-ops-reached sigs colls @t0)))
                  (if @running?
                    (do (when (:collect-final? ctx) (collect-final-all! (vertices g)))
                        (->result :converged sigs colls @t0)
                        (if (:auto-stop ctx)
                          (stop! _)
                          (when (<! notify-chan)
                            (vreset! t0 (now))
                            (vreset! active (sequence v-filter (vertices g)))
                            (reset! queue #{})
                            ;;(warn :rerun (count @active))
                            (recur 0 0))))
                    (->result :stopped sigs colls @t0))))
              res-chan))
          (do (warn "execution context already running, use notify! to re-trigger...")
              res-chan))))))

Scheduled & async context processing

Single-pass scheduler

(defn probabilistic-single-pass-processor
  [s-thresh c-thresh]
  (fn [[active sigs colls] v]
    (let [active (conj active v)]
      (if (< (rand) 0.5)
        (if (> (score-signal v) s-thresh)
          (let [vsigs (signal! v sync-vertex-signal)]
            [(into active vsigs) (+ sigs (count vsigs)) colls])
          [active sigs colls])
        (if (> (score-collect v) c-thresh)
          (do (collect! v)
              [active sigs (inc colls)])
          [active sigs colls])))))

(defn eager-probabilistic-single-pass-processor
  [s-thresh c-thresh]
  (fn [[active sigs colls] v]
    (let [active (conj active v)]
      (if (< (rand) 0.5)
        (if (> (score-signal v) s-thresh)
          (let [vsigs (signal! v sync-vertex-signal)]
            [(into active vsigs) (+ sigs (count vsigs)) colls])
          [active sigs colls])
        (if (> (score-collect v) c-thresh)
          (do (collect! v)
              (if (> (score-signal v) s-thresh)
                (let [vsigs (signal! v sync-vertex-signal)]
                  [(into active vsigs) (+ sigs (count vsigs)) (inc colls)])
                [active sigs (inc colls)]))
          [active sigs colls])))))

(defn single-pass-scheduler
  [ctx]
  (let [processor ((:processor ctx) (:signal-thresh ctx) (:collect-thresh ctx))]
    (fn [workgroup-size vertices]
      (r/fold workgroup-size single-pass-combine processor #?(:clj vertices :cljs (seq vertices))))))

Two-pass scheduler

(defn parallel-signal-pass
  [thresh]
  (fn [workgroup-size vertices]
    (r/fold
     workgroup-size
     sub-pass-combine
     (fn [acc v]
       (if (> (score-signal v) thresh)
         (let [vsigs (signal! v sync-vertex-signal)]
           [(into (acc 0) vsigs) (+ (acc 1) (count vsigs))])
         acc))
     vertices)))

(defn parallel-collect-pass
  [thresh]
  (fn [workgroup-size vertices]
    (r/fold
     workgroup-size
     sub-pass-combine
     (fn [acc v]
       (if (> (score-collect v) thresh)
         (do (collect! v)
             [(conj (acc 0) v) (inc (acc 1))])
         acc))
     vertices)))

(defn parallel-two-pass-processor
  [s-thresh c-thresh]
  (let [sig-fn  (parallel-signal-pass s-thresh)
        coll-fn (parallel-collect-pass c-thresh)]
    (fn [workgroup-size vertices]
      (let [#?@(:cljs [vertices (seq vertices)])
            [s-act sigs]  (sig-fn workgroup-size vertices)
            [c-act colls] (coll-fn workgroup-size vertices)]
        [(into s-act c-act) sigs colls]))))

(defn two-pass-scheduler
  [ctx]
  ((:processor ctx) (:signal-thresh ctx) (:collect-thresh ctx)))

General helper functions

(defn now
  [] #?(:clj (System/nanoTime) :cljs (.getTime (js/Date.))))

(defn millis-since
  [t0] #?(:clj (* (- (now) t0) 1e-6) :cljs (- (now) t0)))

(defn notify-watches*
  [watches evt]
  (let [watches (watches (first evt))]
    (when watches
      (loop [watches (vals watches)]
        (when watches
          ((first watches) evt)
          (recur (next watches)))))))

(defn none-or-single-user?
  [src user]
  (let [n (neighbors src)]
    (if (seq n)
      (and (== 1 (count n)) (or (nil? user) (= user (first n))))
      true)))

(defn random-id
  [] (keyword (gensym)))

CLJS polyfill

#?(:cljs
   (when-not (satisfies? IReduce LazyTransformer)
     (defn- seq-reduce*
       ([f coll]
        (if-let [s (seq coll)]
          (reduce f (first s) (next s))
          (f)))
       ([f val coll]
        (loop [val val, coll (seq coll)]
          (if coll
            (let [nval (f val (first coll))]
              (if (reduced? nval)
                @nval
                (recur nval (next coll))))
            val))))

     (extend-type LazyTransformer
       IReduce
       (-reduce
         ([coll f] (seq-reduce* f coll))
         ([coll f start] (seq-reduce* f start coll))))))

Complete namespace definition

(ns thi.ng.fabric.core
  #?@(:clj
      [(:require
        [thi.ng.xerror.core :as err]
        [taoensso.timbre :refer [debug info warn]]
        [clojure.core.reducers :as r]
        [clojure.core.async :as async :refer [go go-loop <! >!]])]
      :cljs
      [(:require-macros
        [cljs.core.async.macros :refer [go go-loop]]
        [cljs-log.core :refer [debug info warn]])
       (:require
        [thi.ng.xerror.core :as err]
        [clojure.core.reducers :as r]
        [cljs.core.async :as async :refer [<! >!]])]))

<<polyfill>>

<<protos>>

<<helpers>>

<<vertex>>

<<sig-coll>>

<<ctor-vertex>>

<<graph>>

<<ctor-graph>>

<<ctx-processing>>

<<ctx-sync>>

<<ctx-scheduled>>

<<ctx-async>>