-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BulkResponse
wrapper for improved decoding of HTTP bulk responses
#649
Draft
amotl
wants to merge
4
commits into
master
Choose a base branch
from
bulk-response-wrapper
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 2 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
a96e375
BulkResponse: Add wrapper for improved decoding of HTTP bulk responses
amotl 6a58353
BulkResponse: Be more strict on empty records or results
amotl af32409
BulkResponse: Add documentation
amotl aaf9ca6
Chore: Fix documentation `Makefile` for machines w/o the `python` cmd
amotl File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import typing as t | ||
from functools import cached_property | ||
|
||
|
||
class BulkResultItem(t.TypedDict): | ||
""" | ||
Define the shape of a CrateDB bulk request response item. | ||
""" | ||
|
||
rowcount: int | ||
|
||
|
||
class BulkResponse: | ||
""" | ||
Manage a response to a CrateDB bulk request. | ||
Accepts a list of bulk arguments (parameter list) and a list of bulk response items. | ||
|
||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations | ||
""" | ||
|
||
def __init__( | ||
self, | ||
records: t.List[t.Dict[str, t.Any]], | ||
results: t.List[BulkResultItem]): | ||
if records is None: | ||
raise ValueError("Processing a bulk response without records is an invalid operation") | ||
if results is None: | ||
raise ValueError("Processing a bulk response without results is an invalid operation") | ||
self.records = records | ||
self.results = results | ||
|
||
@cached_property | ||
def failed_records(self) -> t.List[t.Dict[str, t.Any]]: | ||
""" | ||
Compute list of failed records. | ||
|
||
CrateDB signals failed inserts using `rowcount=-2`. | ||
|
||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling | ||
""" | ||
errors: t.List[t.Dict[str, t.Any]] = [] | ||
for record, status in zip(self.records, self.results): | ||
if status["rowcount"] == -2: | ||
errors.append(record) | ||
return errors | ||
|
||
@cached_property | ||
def record_count(self) -> int: | ||
""" | ||
Compute bulk size / length of parameter list. | ||
""" | ||
if not self.records: | ||
return 0 | ||
return len(self.records) | ||
|
||
@cached_property | ||
def success_count(self) -> int: | ||
""" | ||
Compute number of succeeding records within a batch. | ||
""" | ||
return self.record_count - self.failed_count | ||
|
||
@cached_property | ||
def failed_count(self) -> int: | ||
""" | ||
Compute number of failed records within a batch. | ||
""" | ||
return len(self.failed_records) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import sys | ||
import unittest | ||
|
||
from crate import client | ||
from crate.client.exceptions import ProgrammingError | ||
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline | ||
from crate.testing.settings import crate_host | ||
|
||
|
||
class BulkOperationTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
setUpCrateLayerBaseline(self) | ||
|
||
def tearDown(self): | ||
tearDownDropEntitiesBaseline(self) | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_executemany_with_bulk_response_partial(self): | ||
|
||
# Import at runtime is on purpose, to permit skipping the test case. | ||
from crate.client.result import BulkResponse | ||
|
||
connection = client.connect(crate_host) | ||
cursor = connection.cursor() | ||
|
||
# Run SQL DDL. | ||
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);") | ||
|
||
# Run a batch insert that only partially succeeds. | ||
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")] | ||
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records) | ||
|
||
# Verify CrateDB response. | ||
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}]) | ||
|
||
# Verify decoded response. | ||
bulk_response = BulkResponse(invalid_records, result) | ||
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")]) | ||
self.assertEqual(bulk_response.record_count, 2) | ||
self.assertEqual(bulk_response.success_count, 1) | ||
self.assertEqual(bulk_response.failed_count, 1) | ||
|
||
cursor.execute("REFRESH TABLE foobar;") | ||
cursor.execute("SELECT * FROM foobar;") | ||
result = cursor.fetchall() | ||
self.assertEqual(result, [[1, "Hotzenplotz 1"]]) | ||
|
||
cursor.close() | ||
connection.close() | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_executemany_empty(self): | ||
|
||
connection = client.connect(crate_host) | ||
cursor = connection.cursor() | ||
|
||
# Run SQL DDL. | ||
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);") | ||
|
||
# Run a batch insert that is empty. | ||
with self.assertRaises(ProgrammingError) as cm: | ||
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", []) | ||
self.assertEqual( | ||
str(cm.exception), | ||
"SQLParseException[The query contains a parameter placeholder $1, " | ||
"but there are only 0 parameter values]") | ||
|
||
cursor.close() | ||
connection.close() | ||
|
||
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher") | ||
def test_bulk_response_empty_records_or_results(self): | ||
|
||
# Import at runtime is on purpose, to permit skipping the test case. | ||
from crate.client.result import BulkResponse | ||
|
||
with self.assertRaises(ValueError) as cm: | ||
BulkResponse(records=None, results=None) | ||
self.assertEqual( | ||
str(cm.exception), | ||
"Processing a bulk response without records is an invalid operation") | ||
|
||
with self.assertRaises(ValueError) as cm: | ||
BulkResponse(records=[], results=None) | ||
self.assertEqual( | ||
str(cm.exception), | ||
"Processing a bulk response without results is an invalid operation") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this more carefully, I don't like that BulkResponse is something you need con construct manually. Couldn't we directly return it from the insert execution, instead of a list of
BulkResultItem
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi. I don't think we can do anything like this here, because the Python database driver must adhere to the Python Database API Specification, so the
BulkResponse
is just meant as an optional extension to it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
af32409, just added, provides a bit of documentation for that extension in the section about bulk operations.