Skip to content

Commit

Permalink
add avro container example
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Baele authored and Frank Baele committed May 21, 2020
1 parent c411947 commit 778de6a
Show file tree
Hide file tree
Showing 4 changed files with 730 additions and 1 deletion.
117 changes: 117 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*

# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json

# Runtime data
pids
*.pid
*.seed
*.pid.lock

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage
*.lcov

# nyc test coverage
.nyc_output

# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# Bower dependency directory (https://bower.io/)
bower_components

# node-waf configuration
.lock-wscript

# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release

# Dependency directories
avro_container/node_modules/
jspm_packages/

# Snowpack dependency directory (https://snowpack.dev/)
web_modules/

# TypeScript cache
*.tsbuildinfo

# Optional npm cache directory
.npm

# Optional eslint cache
.eslintcache

# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/

# Optional REPL history
.node_repl_history

# Output of 'npm pack'
*.tgz

# Yarn Integrity file
.yarn-integrity

# dotenv environment variables file
.env
.env.test

# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache

# Next.js build output
.next
out

# Nuxt.js build / generate output
.nuxt
dist

# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public

# vuepress build output
.vuepress/dist

# Serverless directories
.serverless/

# FuseBox cache
.fusebox/

# DynamoDB Local files
.dynamodb/

# TernJS port file
.tern-port

# Stores VSCode versions used for testing VSCode extensions
.vscode-test

# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
.idea
129 changes: 129 additions & 0 deletions avro_container/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import avsc from 'avsc';

const schema = {
type: 'record',
name: 'user',
fields: [
{name: 'first_name', type: 'string'},
{name: 'last_name', type: 'string'},
// By setting the default to null and allowing the null value
// we can create an optional field in Avro
{name: 'middle_name', default: null, type: ['null', 'string']},
// Array with nested objects
{
name: 'children',
default: null,
type: [
'null',
{
type: 'array',
items: [
{
type: 'record',
name: 'child',
fields: [
{name: 'first_name', type: 'string'}
],
},
],
},
],
},
]
}
const type = avsc.Type.forSchema(schema);

const encoder = avsc.createFileEncoder('./users.avro', schema);

function write(item) {
return new Promise((resolve, reject) => {
const result = type.isValid(item, {
errorHook: err => {
err.forEach(key => {
console.log('Invalid field name: ' + key);
console.log('Invalid field content: ' + JSON.stringify(item[key], null, 2));
});
reject();
},
});
if (result) {
encoder.write(item, resolve);
}
});
}

write({
"first_name": "Frank",
"last_name": "Baele",
"children": [{
"first_name": "Bort"
}]
})

write({
"first_name": "Peter",
"last_name": "Peterson",
"children": [{
"first_name": "Jef"
}]
})

encoder.end();

// Reading part
import fs from 'fs';

const src = fs.createReadStream('./users.avro');
// src.pipe(new avsc.streams.BlockDecoder())
// .on('data', function (item) {
// console.log(item);
// })
// .on('end', () => {
// console.log('All records read')
// });

// Snappy example

import snappy from 'snappy';
import crc32 from 'buffer-crc32';

const snappyEncoder = avsc.createFileEncoder('./snappy.avro', schema, {
snappy: function (buf, cb) {
const checksum = crc32(buf);
snappy.compress(buf, function (err, deflated) {
if (err) {
cb(err);
return;
}
const block = Buffer.alloc(deflated.length + 4);
deflated.copy(block);
checksum.copy(block, deflated.length);
cb(null, block);
});
}
})

snappyEncoder.write({
"first_name": "Frank",
"last_name": "Baele",
"children": [{
"first_name": "Bort"
}]
});

snappyEncoder.end();

avsc.createFileDecoder('./snappy.avro', {
snappy: function (buf, cb) {
// Avro appends checksums to compressed blocks, which we skip here.
return snappy.uncompress(buf.slice(0, buf.length - 4), cb);
}
})
.on('metadata', function (type) {
console.log(type)
})
.on('data', function (val) {
console.log(val)
});

// Gzip example
Loading

0 comments on commit 778de6a

Please sign in to comment.