-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QLDB streaming to Kinesis sends invalid ION data #405
Comments
Could you please provide the strategy you are using to obtain the base64 blob? I assume you're using the aws-kcl package. It looks like you might have the Kinesis partition key and sequence number inside of that blob. |
I am just using the ion-js library: /**
* A lambda that streams to Redshift from QLDB via Kinesis stream
*/
export const lambdaHandler: KinesisStreamHandler = async (event) => {
const envStage = process.env.envStage ?? '';
await initClient();
await executeQuery(SQL_CODE);
logger.info(`Records count for ${envStage}`, { count: event.Records.length });
event.Records.forEach(async (record, index) => {
logger.info('DEBUG: Received event:', JSON.stringify(event, null, 2));
await processRecords([record.kinesis]);
});
logger.info(`DEBUG: Successfully processed ${event.Records.length} records.`);
};
export async function processRecords(records: KinesisStreamRecordPayload[]) {
await Promise.all(
records.map(async (record) => {
// Kinesis data is base64 encoded so decode here
let payload;
let ionRecord;
try {
logger.info(`processRecords attempting to load ion record: ${record.data}`);
payload = Buffer.from(record.data, 'base64');
// payload is the actual ion binary record published by QLDB to the stream
ionRecord = load(payload) as Value;
} catch (error) {
logger.error(`processRecords failed to load ion record error:${error}`);
throw error;
}
logger.info(`ionRecord is ${JSON.stringify(ionRecord, null, 2)}`);
const recordType = ionRecord?.get('recordType')?.stringValue();
// Only process records where the record type is REVISION_DETAILS
if (recordType !== REVISION_DETAILS) {
logger.info(
`processRecords Skipping record of type ${dumpPrettyText(
recordType,
)}`,
);
logger.info(`processRecords The other record is:`, ionRecord);
} else {
logger.info('processRecords The Ion Record is:', ionRecord);
logger.info('processRecords Revision_details record found');
await processIon(ionRecord);
}
}),
);
} Ok, so not using the aws-kcl library is the issue then? |
I think so. Lambda is likely interpreting your |
oh take a look at the full code snippet now updated above @battesonb, I included the lambda handler that includes I couldn't find how kcl library would handle this any different |
Are you using Kinesis Data Stream Record aggregations? The stream in the QLDB console will list whether it's enabled. There's an NPM package called aws-kinesis-agg for deaggregating batched results. This would explain why it's an intermittent issue. You'd essentially just do the following: import { deaggregateSync, UserRecord } from "aws-kinesis-agg";
// inside your handler
deaggregateSync(record.kinesis, true, (err, records) => {
if (records) {
processRecords(records); // Note the type change
} // else handle the error
}); https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-deaggregation.html Alternatively, you can turn off record aggregations. |
The first few bytes of your buffer line up with the magic number specified in the repository: https://github.com/awslabs/kinesis-aggregation/blob/master/node/lib/common.js#L17
|
@battesonb I see that I have this enabled. I will try your suggestion and get back to you. I appreciate your help on this |
Ok looks like that was definitely the root cause. Thank you @battesonb! |
I am currently working with QLDB -> Kinesis stream -> AWS lambda (node.js 18+) and I am encountering the following when loading an ion record in the lambda:
Package versions are:
Here is an example code to reproduce this issue:
After talking to the ion-js team they mentioned the following is the root cause: amazon-ion/ion-js#753 (comment)
This issue happens intermittently as well. Any help or insight would be greatly appreciated.
The text was updated successfully, but these errors were encountered: