-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aerospike ingestion source #11838
base: master
Are you sure you want to change the base?
Aerospike ingestion source #11838
Conversation
(reads entire set if not provided) | ||
""" | ||
|
||
query = client.query(as_set.ns, as_set.set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likely needs try.. except handling
if self.config.tls_cafile is not None: | ||
client_config["tls"]["cafile"] = self.config.tls_cafile | ||
|
||
self.aerospike_client = aerospike.client(client_config).connect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs try-except handling with self.report.report_failure if you are unable to connect (with clear error "message" and "title" fields which will inform the user of what has gone wrong -- e.g. bad username, bad password, etc)
""" | ||
try: | ||
type_string = PYTHON_TYPE_TO_AEROSPIKE_TYPE[field_type] | ||
except KeyError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
xdr_sets = {} | ||
|
||
# traverse sets in sorted order so output is consistent | ||
for curr_set in sorted(ns_sets, key=lambda x: x.set): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is getting a bit long / hard to read.. consider breaking down into multiple sub-methods
set_xdr = xdr_sets.get(curr_set.set) | ||
custom_properties = {} | ||
if set_xdr: | ||
custom_properties["xdr_dcs"] = ",".join(set_xdr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any other custom properties that would be useful?
custom_properties["xdr_dcs"] = ",".join(set_xdr) | ||
|
||
dataset_properties = DatasetPropertiesClass( | ||
name=curr_set.set, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No description / table comments? No last updated time, or created time?
key=lambda x: ( | ||
-x["count"], | ||
x["delimited_name"], | ||
), # Negate `count` for descending order, `delimited_name` stays the same for ascending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these fields guaranteed to be present?
), # Negate `count` for descending order, `delimited_name` stays the same for ascending | ||
)[0:max_schema_size]: | ||
field = SchemaField( | ||
fieldPath=schema_field["delimited_name"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these fields guaranteed to be present? do we need more defensive coding here?
sets_dc: Dict[str, List[str]] = {key: [] for key in sets} | ||
shipped_sets = [] | ||
dcs = ( | ||
self.aerospike_client.info_random_node("get-config:context=xdr") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try..except around these so if any single call fails we don't fail the ingestion source without any useful message about what we were trying to do?
logger.debug("No DCs found") | ||
return sets_dc | ||
for dc in dcs: | ||
xdr_info: str = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: would recommend splitting into a readable sub-method: process_dc
or whatever yu are doing in here..
.split("\n")[0] | ||
) | ||
xdr = { | ||
pair.split("=")[0]: pair.split("=")[1] for pair in xdr_info.split(";") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these splits + index accesses safe?
shipped_sets = [ | ||
as_set for as_set in sets if as_set not in ignored_sets | ||
] | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These nested if thens are quite hard to read in python. Any time we get beyond 2-3 layers on indentation, we should consider a new top-level, well-named method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking quite good! I had some mostly stylistic comments on the primary aerospike file
Checklist