From 83eb6e7f79944989d3480871794846b3821bdc42 Mon Sep 17 00:00:00 2001 From: Yevgeni Tsodikov Date: Sun, 14 Jan 2024 15:43:02 +0200 Subject: [PATCH] Separate `AerospikeBatchOps` into another namespace --- CHANGELOG.md | 10 +++++ project.clj | 4 +- .../clojure/aerospike_clj/batch_client.clj | 42 +++++++++++++++++++ src/main/clojure/aerospike_clj/client.clj | 35 +++------------- 4 files changed, 60 insertions(+), 31 deletions(-) create mode 100644 src/main/clojure/aerospike_clj/batch_client.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index 10d3f8d..81c998d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +### [4.0.0] - 2024-01-14 + +#### Changed + +* Introduce a `aerospike-clj.batch-client` namespace. + This namespace extends the `aerospike-clj.client` namespace with batch operations using the `AerospikeBatchOps` + protocol. +* This will allow clients to use the `aerospike-clj.client` namespace with older versions of the Aerospike Java client + library, and use the `aerospike-clj.batch-client` namespace with newer versions of the Aerospike Java client library. + ### [3.1.0] - 2023-08-22 #### Added diff --git a/project.clj b/project.clj index fcd7902..7eefb1f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/aerospike-clj "3.1.0" +(defproject com.appsflyer/aerospike-clj "4.0.0-SNAPSHOT" :description "An Aerospike Clojure client." :url "https://github.com/AppsFlyer/aerospike-clj" :license {:name "Eclipse Public License" @@ -23,7 +23,7 @@ [cheshire "5.11.0"] [tortue/spy "2.14.0"] [com.fasterxml.jackson.core/jackson-databind "2.11.2"] - [clj-kondo "2023.09.07"] + [clj-kondo "2023.12.15"] [com.clojure-goes-fast/clj-java-decompiler "0.3.4"]] :eftest {:multithread? false :report eftest.report.junit/report diff --git a/src/main/clojure/aerospike_clj/batch_client.clj b/src/main/clojure/aerospike_clj/batch_client.clj new file mode 100644 index 0000000..0238c8b --- /dev/null +++ b/src/main/clojure/aerospike_clj/batch_client.clj @@ -0,0 +1,42 @@ +(ns aerospike-clj.batch-client + (:require [aerospike-clj.aerospike-record :as record] + [aerospike-clj.client :as client] + [aerospike-clj.collections :as collections] + [aerospike-clj.protocols :as pt] + [promesa.core :as p]) + (:import (aerospike_clj.client SimpleAerospikeClient) + (aerospike_clj.listeners AsyncBatchOperateListListener) + (com.aerospike.client AerospikeClient BatchRecord) + (com.aerospike.client.async EventLoop EventLoops) + (com.aerospike.client.listener BatchOperateListListener) + (com.aerospike.client.policy BatchPolicy) + (java.util List))) + +(defn- batch-record->map [^BatchRecord batch-record] + (let [k (.key batch-record)] + (-> (record/record->map (.record batch-record)) + (assoc :index (.toString (.userKey k))) + (assoc :set (.setName k)) + (assoc :result-code (.resultCode batch-record))))) + +(extend-type SimpleAerospikeClient + pt/AerospikeBatchOps + (batch-operate [this batch-records] + (pt/batch-operate this batch-records {})) + + (batch-operate [this batch-records conf] + (let [op-future (p/deferred) + policy (:policy conf) + batch-list (if (list? batch-records) + batch-records + (into [] batch-records)) + start-time (System/nanoTime) + transcoder (:transcoder conf identity)] + (.operate ^AerospikeClient (.-client this) + ^EventLoop (.next ^EventLoops (.-el this)) + ^BatchOperateListListener (AsyncBatchOperateListListener. op-future) + ^BatchPolicy policy + ^List batch-list) + (-> op-future + (p/then' (comp transcoder #(collections/->list batch-record->map %)) (.-completion-executor this)) + (client/register-events (.-client-events this) :batch-operate nil start-time conf))))) diff --git a/src/main/clojure/aerospike_clj/client.clj b/src/main/clojure/aerospike_clj/client.clj index 7345ea7..a1ccc28 100644 --- a/src/main/clojure/aerospike_clj/client.clj +++ b/src/main/clojure/aerospike_clj/client.clj @@ -13,19 +13,18 @@ [clojure.tools.logging :as log] [promesa.core :as p] [promesa.exec :as p-exec]) - (:import (aerospike_clj.listeners AsyncBatchListListener AsyncBatchOperateListListener AsyncDeleteListener + (:import (aerospike_clj.listeners AsyncBatchListListener AsyncDeleteListener AsyncExistsArrayListener AsyncExistsListener AsyncInfoListener AsyncRecordListener AsyncRecordSequenceListener AsyncWriteListener) - (com.aerospike.client BatchRecord Host Key) + (com.aerospike.client Host Key) (com.aerospike.client AerospikeClient BatchRead Bin Key Operation) (com.aerospike.client.async EventLoop EventLoops NioEventLoops) (com.aerospike.client.cluster Node) - (com.aerospike.client.listener BatchOperateListListener) (com.aerospike.client.policy BatchPolicy ClientPolicy InfoPolicy Policy RecordExistsAction ScanPolicy WritePolicy) (java.time Instant) - (java.util Arrays List) + (java.util Arrays) (java.util.concurrent Executor))) (def @@ -66,7 +65,7 @@ (p/catch (fn [op-exception] (pt/on-failure client-events op-name op-exception index op-start-time)))))) -(defn- register-events [op-future default-client-events op-name index op-start-time conf] +(defn register-events [op-future default-client-events op-name index op-start-time conf] (let [client-events (:client-events conf default-client-events)] (if (empty? client-events) op-future @@ -80,7 +79,7 @@ (create-key ^Key [this as-namespace set-name] (as-key/create-key this as-namespace set-name))) -(defn- batch-record->map [^BatchRecord batch-record] +(defn batch-read->map [^BatchRead batch-record] (let [k (.key batch-record)] (-> (record/record->map (.record batch-record)) (assoc :index (.toString (.userKey k))) @@ -183,7 +182,7 @@ ^BatchPolicy (:policy conf) batch-reads-arr) (-> op-future - (p/then' #(collections/->list batch-record->map %) completion-executor) + (p/then' #(collections/->list batch-read->map %) completion-executor) (p/then' (:transcoder conf identity)) (register-events client-events :read-batch nil start-time conf)))) @@ -367,28 +366,6 @@ (p/then' record/record->map completion-executor) (register-events client-events :operate index start-time conf))))) - pt/AerospikeBatchOps - (batch-operate [this batch-records] - (pt/batch-operate this batch-records {})) - - (batch-operate [_this batch-records conf] - (let [op-future (p/deferred) - policy (:policy conf) - batch-list (if (list? batch-records) - batch-records - (into [] batch-records)) - start-time (System/nanoTime) - transcoder (:transcoder conf identity)] - (.operate ^AerospikeClient client - ^EventLoop (.next ^EventLoops el) - ^BatchOperateListListener (AsyncBatchOperateListListener. op-future) - ^BatchPolicy policy - ^List batch-list) - (-> op-future - (p/then' (comp transcoder #(collections/->list batch-record->map %)) completion-executor) - (register-events client-events :batch-operate nil start-time conf)))) - - pt/AerospikeSetOps (scan-set [_this aero-namespace set-name conf] (when-not (fn? (:callback conf))