Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet as an output format for collections #394

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 104 additions & 3 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import parquet from '@dsnp/parquetjs';
import minimist from 'minimist';
import { Transform } from 'stream';
import wkx from 'wkx';

const s3 = new S3.S3Client({
region: process.env.AWS_DEFAULT_REGION
Expand Down Expand Up @@ -106,9 +108,15 @@ async function collect(tmp, collection, oa) {
const zip = await zip_datas(tmp, collection_data, collection.name);

console.error(`ok - zip created: ${zip}`);
await upload_collection(zip, collection.name);
await upload_zip_collection(zip, collection.name);
console.error('ok - archive uploaded');

const pq = await parquet_datas(tmp, collection_data, collection.name);

console.error(`ok - parquet created: ${pq}`);
await upload_parquet_collection(pq, collection.name);
console.error('ok - parquet uploaded');

await oa.cmd('collection', 'update', {
':collection': collection.id,
size: fs.statSync(zip).size
Expand Down Expand Up @@ -191,7 +199,7 @@ async function get_source(oa, tmp, data, stats) {
return path.resolve(tmp, 'sources', dir, source);
}

async function upload_collection(file, name) {
async function upload_zip_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
Expand All @@ -215,7 +223,6 @@ async function upload_collection(file, name) {
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});


const r2uploader = new Upload({
client: r2,
params: {
Expand All @@ -229,7 +236,45 @@ async function upload_collection(file, name) {
await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`);
}

async function upload_parquet_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.Bucket,
Key: `${process.env.StackName}/collection-${name}.parquet`
}
});

await s3uploader.done();

console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`);

const r2 = new S3.S3Client({
region: 'auto',
credentials: {
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY
},
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});

const r2uploader = new Upload({
client: r2,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.R2Bucket,
Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`
}
});

await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`);
}

function zip_datas(tmp, datas, name) {
Expand Down Expand Up @@ -271,3 +316,59 @@ function zip_datas(tmp, datas, name) {
archive.finalize();
});
}

function parquet_datas(tmp, datas, name) {
return new Promise((resolve) => {
const schema = {
source_name: { type: 'UTF8' },
geometry: { type: 'BINARY' },
id: { type: 'UTF8' },
pid: { type: 'UTF8' },
number: { type: 'UTF8' },
street: { type: 'UTF8' },
unit: { type: 'UTF8' },
city: { type: 'UTF8' },
postcode: { type: 'UTF8' },
district: { type: 'UTF8' },
region: { type: 'UTF8' },
addrtype: { type: 'UTF8' },
notes: { type: 'UTF8' }
};
const writer = parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`));

for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);

// Read the file and parse it as linefeed-delimited JSON
const data_stream = fs.createReadStream(resolved_data_filename);
const data_lines = data_stream.pipe(split());
data_lines.on('data', (line) => {
const record = JSON.parse(line);
const properties = record.properties;
const wkbGeometry = wkx.Geometry.parseGeoJSON(record.geometry).toWkb();

writer.appendRow({
source_name: data,
geometry: wkbGeometry,
id: properties.id,
pid: properties.pid,
number: properties.number,
street: properties.street,
unit: properties.unit,
city: properties.city,
postcode: properties.postcode,
district: properties.district,
region: properties.region,
addrtype: properties.addrtype,
notes: properties.notes
});
});
data_lines.on('end', () => {
console.error(`ok - ${resolved_data_filename} processed and appended to parquet file`);
});
}

writer.close();
return resolve(path.resolve(tmp, `${name}.parquet`));
});
}
Loading
Loading