Skip to content

Commit

Permalink
Merge pull request #281 from github/kag728/FixType
Browse files Browse the repository at this point in the history
Stop removing unprefixed parameters from document in bulk requests
  • Loading branch information
kag728 authored Sep 29, 2023
2 parents 9cc893c + b49c8cf commit 960dce3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 108 deletions.
47 changes: 1 addition & 46 deletions lib/elastomer_client/client/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,7 @@ def call
# and apply any override document parameters.
def prepare_params(document, params)
params = convert_special_keys(params)
if document.is_a? Hash
params = from_document(document).merge(params)
end

params.delete(:_id) if params[:_id].nil? || params[:_id].to_s.empty?
params.delete("_id") if params["_id"].nil? || params["_id"].to_s.empty?

Expand All @@ -343,49 +341,6 @@ def prepare_params(document, params)
params
end

# Internal: Extract special keys for bulk indexing from the given
# `document`. The keys and their values are returned as a Hash from this
# method. If a value is `nil` then it will be ignored.
# This will cover all cases to properly convert parameters for ES5-ES8,
# whether or not the prefix should be underscored or not.
#
# document - The document Hash
#
# Returns extracted key/value pairs as a Hash.
def from_document(document)
opts = {}

SPECIAL_KEYS.each do |key|
next if key == "type" && client.version_support.es_version_8_plus?

omit_prefix = (
client.version_support.es_version_7_plus? &&
UNPREFIXED_SPECIAL_KEYS.include?(key)
)

prefixed_key = "_" + key
converted_key = (omit_prefix ? "" : "_") + key

if document.key?(prefixed_key)
opts[converted_key.to_sym] = document.delete(prefixed_key)
end

if document.key?(prefixed_key.to_sym)
opts[converted_key.to_sym] = document.delete(prefixed_key.to_sym)
end

if document.key?(key)
opts[converted_key.to_sym] = document.delete(key)
end

if document.key?(key.to_sym)
opts[converted_key.to_sym] = document.delete(key.to_sym)
end
end

opts
end

# Internal: Convert incoming Ruby symbol keys to their special underscore
# versions. Maintains API compaibility with the `Docs` API for `index`,
# `create`, `update` and `delete`.
Expand Down
97 changes: 36 additions & 61 deletions test/client/bulk_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@

it "supports a nice block syntax" do
h = @index.bulk do |b|
b.index ({_id: 1, _type: "book", author: "Author 1", title: "Book 1"})
b.index ({_id: nil, _type: "book", author: "Author 2", title: "Book 2"})
b.index({ author: "Author 1", title: "Book 1" }, { _id: 1, _type: "book" })
b.index({ author: "Author 2", title: "Book 2" }, { _id: nil, _type: "book" })
end
items = h["items"]

Expand All @@ -104,7 +104,7 @@
assert_equal "Author 2", h["_source"]["author"]

h = @index.bulk do |b|
b.index ({_id: "", _type: "book", author: "Author 3", title: "Book 3"})
b.index({ author: "Author 3", title: "Book 3" }, _id: "", _type: "book")
b.delete ({_id: book_id, _type: "book"})
end
items = h["items"]
Expand Down Expand Up @@ -176,23 +176,23 @@
book_title = $client.version_support.es_version_7_plus? ? "A"*52 : "A"*34
ary << @index.bulk(request_size: 300) do |b|
2.times { |num|
document = {_id: num, _type: "book", author: "Author 1", title: book_title}
ary << b.index(document)
document = { author: "Author 1", title: book_title}
ary << b.index(document, { _id: num, _type: "book" })
}
ary.compact!

assert_equal 0, ary.length

7.times { |num|
document = {_id: num+2, _type: "book", author: "Author 1", title: book_title}
ary << b.index(document)
document = { author: "Author 1", title: book_title }
ary << b.index(document, { _id: num+2, _type: "book" })
}
ary.compact!

assert_equal 4, ary.length

document = {_id: 10, _type: "book", author: "Author 1", title: book_title}
ary << b.index(document)
document = {author: "Author 1", title: book_title}
ary << b.index(document, {_id: 10, _type: "book"})
end
ary.compact!

Expand All @@ -213,23 +213,23 @@
ary = []
ary << @index.bulk(action_count: 3) do |b|
2.times { |num|
document = {_id: num, _type: "book", author: "Author 1", title: "This is book number #{num}"}
ary << b.index(document)
document = {author: "Author 1", title: "This is book number #{num}"}
ary << b.index(document, {_id: num, _type: "book"})
}
ary.compact!

assert_equal 0, ary.length

7.times { |num|
document = {_id: num+2, _type: "book", author: "Author 1", title: "This is book number #{num+2}"}
ary << b.index(document)
document = {author: "Author 1", title: "This is book number #{num+2}"}
ary << b.index(document, {_id: num+2, _type: "book"})
}
ary.compact!

assert_equal 2, ary.length

document = {_id: 10, _type: "book", author: "Author 1", title: "This is book number 10"}
ary << b.index(document)
document = {author: "Author 1", title: "This is book number 10"}
ary << b.index(document, {_id: 10, _type: "book"})
end
ary.compact!

Expand All @@ -246,23 +246,23 @@
end
end

it "rejects documents that excceed the maximum request size" do
it "rejects documents that exceed the maximum request size" do
client = ElastomerClient::Client.new(**$client_params.merge(max_request_size: 300))
index = client.index(@name)

ary = []
book_title = $client.version_support.es_version_7_plus? ? "A"*52 : "A"*34
ary << index.bulk(request_size: 300) do |b|
2.times { |num|
document = document_wrapper("book", {_id: num, author: "Author 1", title: book_title})
ary << b.index(document)
document = {author: "Author 1", title: book_title}
ary << b.index(document, document_wrapper("book", { _id: num }))
}
ary.compact!

assert_equal 0, ary.length

document = document_wrapper("book", {_id: 342, author: "Author 1", message: "A"*290})
assert_raises(ElastomerClient::Client::RequestSizeError) { b.index(document) }
document = { author: "Author 1", message: "A"*290 }
assert_raises(ElastomerClient::Client::RequestSizeError) { b.index(document, document_wrapper("book", { _id: 342 })) }
end
ary.compact!

Expand Down Expand Up @@ -324,29 +324,11 @@
assert_equal "Book 2", @index.docs("book").get(id: id2)["_source"]["title"]
end

it "doesn't override parameters with properties from the document" do
document = { _id: 1, _type: "book", author: "Author 1", title: "Book 1" }
params = { id: 2 }

response = @index.bulk do |b|
b.index document, params
end

assert_kind_of Integer, response["took"]

items = response["items"]

assert_bulk_index(items[0])

refute_found @index.docs("book").get(id: 1)
assert_equal "Book 1", @index.docs("book").get(id: 2)["_source"]["title"]
end

it "supports the routing parameter on index actions" do
document = { _id: 1, _type: "book", title: "Book 1" }
document = { title: "Book 1" }

response = @index.bulk do |b|
b.index document, { routing: "custom" }
b.index document, { routing: "custom", _id: 1, _type: "book" }
end

items = response["items"]
Expand All @@ -356,25 +338,18 @@
assert_equal "custom", @index.docs("book").get(id: 1)["_routing"]
end

it "supports the routing parameter within documents with underscore" do
document = { _id: 1, _type: "book", _routing: "custom", title: "Book 1" }
it "supports the routing parameter within params in ES5 and ES8" do
document = { title: "Book 1" }

response = @index.bulk do |b|
b.index document
params = { _id: 1, _type: "book" }
if $client.version_support.es_version_7_plus?
params[:routing] = "custom"
else
params[:_routing] = "custom"
end

items = response["items"]

assert_kind_of Integer, response["took"]
assert_bulk_index(items[0])
assert_equal "custom", @index.docs("book").get(id: 1)["_routing"]
end

it "supports the routing parameter within documents without underscore" do
document = { _id: 1, _type: "book", routing: "custom", title: "Book 1" }

response = @index.bulk do |b|
b.index document
b.index document, params
end

items = response["items"]
Expand All @@ -386,9 +361,9 @@

it "streams bulk responses" do
ops = [
[:index, document_wrapper("book", { title: "Book 1" }), { _id: 1, _index: @index.name }],
[:index, document_wrapper("book", { title: "Book 2" }), { _id: 2, _index: @index.name }],
[:index, document_wrapper("book", { title: "Book 3" }), { _id: 3, _index: @index.name }]
[:index, { title: "Book 1" }, document_wrapper("book", { _id: 1, _index: @index.name })],
[:index, { title: "Book 2" }, document_wrapper("book", { _id: 2, _index: @index.name })],
[:index, { title: "Book 3" }, document_wrapper("book", { _id: 3, _index: @index.name })],
]
responses = $client.bulk_stream_responses(ops, { action_count: 2 }).to_a

Expand All @@ -400,9 +375,9 @@

it "streams bulk items" do
ops = [
[:index, document_wrapper("book", { title: "Book 1" }), { _id: 1, _index: @index.name }],
[:index, document_wrapper("book", { title: "Book 2" }), { _id: 2, _index: @index.name }],
[:index, document_wrapper("book", { title: "Book 3" }), { _id: 3, _index: @index.name }]
[:index, { title: "Book 1" }, document_wrapper("book", { _id: 1, _index: @index.name })],
[:index, { title: "Book 2" }, document_wrapper("book", { _id: 2, _index: @index.name })],
[:index, { title: "Book 3" }, document_wrapper("book", { _id: 3, _index: @index.name })],
]
items = []
$client.bulk_stream_items(ops, { action_count: 2 }) { |item| items << item }
Expand Down
16 changes: 15 additions & 1 deletion test/client/docs_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@

it "supports bulk operations with the same parameters as docs" do
response = @docs.bulk do |b|
populate!(b)
populate_with_params!(b)
end

assert_kind_of Integer, response["took"]
Expand Down Expand Up @@ -823,6 +823,20 @@ def populate!(docs = @docs)

@index.refresh
end

def populate_with_params!(docs = @docs)
docs.index({
title: "Book1 by author 1",
author: "Author1"
}, { _id: 1, _type: "book" })

docs.index({
title: "Book2 by author 2",
author: "Author2"
}, { _id: 2, _type: "book" })

@index.refresh
end
# rubocop:enable Metrics/MethodLength

end

0 comments on commit 960dce3

Please sign in to comment.