Skip to content

Commit

Permalink
Separate AerospikeBatchOps into another namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
evg-tso committed Jan 14, 2024
1 parent 3d58d08 commit 83eb6e7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### [4.0.0] - 2024-01-14

#### Changed

* Introduce a `aerospike-clj.batch-client` namespace.
This namespace extends the `aerospike-clj.client` namespace with batch operations using the `AerospikeBatchOps`
protocol.
* This will allow clients to use the `aerospike-clj.client` namespace with older versions of the Aerospike Java client
library, and use the `aerospike-clj.batch-client` namespace with newer versions of the Aerospike Java client library.

### [3.1.0] - 2023-08-22

#### Added
Expand Down
4 changes: 2 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/aerospike-clj "3.1.0"
(defproject com.appsflyer/aerospike-clj "4.0.0-SNAPSHOT"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand All @@ -23,7 +23,7 @@
[cheshire "5.11.0"]
[tortue/spy "2.14.0"]
[com.fasterxml.jackson.core/jackson-databind "2.11.2"]
[clj-kondo "2023.09.07"]
[clj-kondo "2023.12.15"]
[com.clojure-goes-fast/clj-java-decompiler "0.3.4"]]
:eftest {:multithread? false
:report eftest.report.junit/report
Expand Down
42 changes: 42 additions & 0 deletions src/main/clojure/aerospike_clj/batch_client.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
(ns aerospike-clj.batch-client
(:require [aerospike-clj.aerospike-record :as record]
[aerospike-clj.client :as client]
[aerospike-clj.collections :as collections]
[aerospike-clj.protocols :as pt]
[promesa.core :as p])
(:import (aerospike_clj.client SimpleAerospikeClient)
(aerospike_clj.listeners AsyncBatchOperateListListener)
(com.aerospike.client AerospikeClient BatchRecord)
(com.aerospike.client.async EventLoop EventLoops)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy)
(java.util List)))

(defn- batch-record->map [^BatchRecord batch-record]
(let [k (.key batch-record)]
(-> (record/record->map (.record batch-record))
(assoc :index (.toString (.userKey k)))
(assoc :set (.setName k))
(assoc :result-code (.resultCode batch-record)))))

(extend-type SimpleAerospikeClient
pt/AerospikeBatchOps
(batch-operate [this batch-records]
(pt/batch-operate this batch-records {}))

(batch-operate [this batch-records conf]
(let [op-future (p/deferred)
policy (:policy conf)
batch-list (if (list? batch-records)
batch-records
(into [] batch-records))
start-time (System/nanoTime)
transcoder (:transcoder conf identity)]
(.operate ^AerospikeClient (.-client this)
^EventLoop (.next ^EventLoops (.-el this))
^BatchOperateListListener (AsyncBatchOperateListListener. op-future)
^BatchPolicy policy
^List batch-list)
(-> op-future
(p/then' (comp transcoder #(collections/->list batch-record->map %)) (.-completion-executor this))
(client/register-events (.-client-events this) :batch-operate nil start-time conf)))))
35 changes: 6 additions & 29 deletions src/main/clojure/aerospike_clj/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
[clojure.tools.logging :as log]
[promesa.core :as p]
[promesa.exec :as p-exec])
(:import (aerospike_clj.listeners AsyncBatchListListener AsyncBatchOperateListListener AsyncDeleteListener
(:import (aerospike_clj.listeners AsyncBatchListListener AsyncDeleteListener
AsyncExistsArrayListener AsyncExistsListener AsyncInfoListener
AsyncRecordListener AsyncRecordSequenceListener AsyncWriteListener)
(com.aerospike.client BatchRecord Host Key)
(com.aerospike.client Host Key)
(com.aerospike.client AerospikeClient BatchRead Bin Key Operation)
(com.aerospike.client.async EventLoop EventLoops NioEventLoops)
(com.aerospike.client.cluster Node)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy ClientPolicy InfoPolicy
Policy RecordExistsAction ScanPolicy
WritePolicy)
(java.time Instant)
(java.util Arrays List)
(java.util Arrays)
(java.util.concurrent Executor)))

(def
Expand Down Expand Up @@ -66,7 +65,7 @@
(p/catch (fn [op-exception]
(pt/on-failure client-events op-name op-exception index op-start-time))))))

(defn- register-events [op-future default-client-events op-name index op-start-time conf]
(defn register-events [op-future default-client-events op-name index op-start-time conf]
(let [client-events (:client-events conf default-client-events)]
(if (empty? client-events)
op-future
Expand All @@ -80,7 +79,7 @@
(create-key ^Key [this as-namespace set-name]
(as-key/create-key this as-namespace set-name)))

(defn- batch-record->map [^BatchRecord batch-record]
(defn batch-read->map [^BatchRead batch-record]
(let [k (.key batch-record)]
(-> (record/record->map (.record batch-record))
(assoc :index (.toString (.userKey k)))
Expand Down Expand Up @@ -183,7 +182,7 @@
^BatchPolicy (:policy conf)
batch-reads-arr)
(-> op-future
(p/then' #(collections/->list batch-record->map %) completion-executor)
(p/then' #(collections/->list batch-read->map %) completion-executor)
(p/then' (:transcoder conf identity))
(register-events client-events :read-batch nil start-time conf))))

Expand Down Expand Up @@ -367,28 +366,6 @@
(p/then' record/record->map completion-executor)
(register-events client-events :operate index start-time conf)))))

pt/AerospikeBatchOps
(batch-operate [this batch-records]
(pt/batch-operate this batch-records {}))

(batch-operate [_this batch-records conf]
(let [op-future (p/deferred)
policy (:policy conf)
batch-list (if (list? batch-records)
batch-records
(into [] batch-records))
start-time (System/nanoTime)
transcoder (:transcoder conf identity)]
(.operate ^AerospikeClient client
^EventLoop (.next ^EventLoops el)
^BatchOperateListListener (AsyncBatchOperateListListener. op-future)
^BatchPolicy policy
^List batch-list)
(-> op-future
(p/then' (comp transcoder #(collections/->list batch-record->map %)) completion-executor)
(register-events client-events :batch-operate nil start-time conf))))


pt/AerospikeSetOps
(scan-set [_this aero-namespace set-name conf]
(when-not (fn? (:callback conf))
Expand Down

0 comments on commit 83eb6e7

Please sign in to comment.