Skip to content

Commit

Permalink
Merge pull request #651 from cldellow/geojsonl
Browse files Browse the repository at this point in the history
support GeoJSON lines format
  • Loading branch information
systemed authored Jan 23, 2024
2 parents deae9ef + f7f53b3 commit 3af62f6
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 3 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ test: \
test_append_vector \
test_attribute_store \
test_deque_map \
test_helpers \
test_pbf_reader \
test_pooled_string \
test_relation_roles \
Expand All @@ -147,6 +148,11 @@ test_deque_map: \
test/deque_map.test.o
$(CXX) $(CXXFLAGS) -o test.deque_map $^ $(INC) $(LIB) $(LDFLAGS) && ./test.deque_map

test_helpers: \
src/helpers.o \
test/helpers.test.o
$(CXX) $(CXXFLAGS) -o test.helpers $^ $(INC) $(LIB) $(LDFLAGS) && ./test.helpers

test_options_parser: \
src/options_parser.o \
test/options_parser.test.o
Expand All @@ -158,7 +164,6 @@ test_pooled_string: \
test/pooled_string.test.o
$(CXX) $(CXXFLAGS) -o test.pooled_string $^ $(INC) $(LIB) $(LDFLAGS) && ./test.pooled_string


test_relation_roles: \
src/relation_roles.o \
test/relation_roles.test.o
Expand Down
3 changes: 3 additions & 0 deletions include/geojson_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class GeoJSONProcessor {
OsmLuaProcessing &osmLuaProcessing;
std::mutex attributeMutex;

void readFeatureCollection(class LayerDef &layer, uint layerNum);
void readFeatureLines(class LayerDef &layer, uint layerNum);

template <bool Flag, typename T>
void processFeature(rapidjson::GenericObject<Flag, T> feature, class LayerDef &layer, uint layerNum);

Expand Down
8 changes: 8 additions & 0 deletions include/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ inline std::vector<std::string> split_string(std::string &inputStr, char sep) {
return res;
}

struct OffsetAndLength {
uint64_t offset;
uint64_t length;
};

uint64_t getFileSize(std::string filename);
std::vector<OffsetAndLength> getNewlineChunks(const std::string &filename, uint64_t chunks);

void decompress_string(std::string& output, const char* input, uint32_t inputSize, bool asGzip = false);
double bboxElementFromStr(const std::string& number);

Expand Down
37 changes: 36 additions & 1 deletion src/geojson_processor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "geojson_processor.h"

#include "helpers.h"
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>

Expand All @@ -14,8 +15,14 @@ namespace geom = boost::geometry;

// Read GeoJSON, and create OutputObjects for all objects within the specified bounding box
void GeoJSONProcessor::read(class LayerDef &layer, uint layerNum) {
if (ends_with(layer.source, "JSONL") || ends_with(layer.source, "jsonl") || ends_with(layer.source, "jsonseq") || ends_with(layer.source, "JSONSEQ"))
return readFeatureLines(layer, layerNum);

// Parse the JSON file into a RapidJSON document
readFeatureCollection(layer, layerNum);
}

void GeoJSONProcessor::readFeatureCollection(class LayerDef &layer, uint layerNum) {
// Read a JSON file containing a single GeoJSON FeatureCollection object.
rapidjson::Document doc;
FILE* fp = fopen(layer.source.c_str(), "r");
char readBuffer[65536];
Expand All @@ -38,6 +45,34 @@ void GeoJSONProcessor::read(class LayerDef &layer, uint layerNum) {
pool.join();
}

void GeoJSONProcessor::readFeatureLines(class LayerDef &layer, uint layerNum) {
// Read a JSON file containing multiple GeoJSON items, newline-delimited.
std::vector<OffsetAndLength> chunks = getNewlineChunks(layer.source, threadNum * 4);

// Process each feature
boost::asio::thread_pool pool(threadNum);
for (auto &chunk : chunks) {
boost::asio::post(pool, [&]() {
FILE* fp = fopen(layer.source.c_str(), "r");
if (fseek(fp, chunk.offset, SEEK_SET) != 0) throw std::runtime_error("unable to seek to " + std::to_string(chunk.offset) + " in " + layer.source);
char readBuffer[65536];
rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));

while(is.Tell() < chunk.length) {
auto doc = rapidjson::Document();
doc.ParseStream<rapidjson::kParseStopWhenDoneFlag>(is);
if (doc.HasParseError()) { throw std::runtime_error("Invalid JSON file."); }
processFeature(std::move(doc.GetObject()), layer, layerNum);

// Skip whitespace.
while(is.Tell() < chunk.length && isspace(is.Peek())) is.Take();
}
fclose(fp);
});
}
pool.join();
}

template <bool Flag, typename T>
void GeoJSONProcessor::processFeature(rapidjson::GenericObject<Flag, T> feature, class LayerDef &layer, uint layerNum) {

Expand Down
64 changes: 64 additions & 0 deletions src/helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
#include <string>
#include <stdexcept>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <cstring>
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string.hpp>

#include <sys/stat.h>
#include "helpers.h"

#ifdef _MSC_VER
#define stat64 __stat64
#endif

#define MOD_GZIP_ZLIB_WINDOWSIZE 15
#define MOD_GZIP_ZLIB_CFACTOR 9
#define MOD_GZIP_ZLIB_BSIZE 8096
Expand Down Expand Up @@ -152,3 +158,61 @@ std::string boost_validity_error(unsigned failure) {
default: return "something mysterious wrong with it, Boost validity_failure_type " + to_string(failure);
}
}

uint64_t getFileSize(std::string filename) {
struct stat64 statBuf;
int rc = stat64(filename.c_str(), &statBuf);

if (rc == 0) return statBuf.st_size;

throw std::runtime_error("unable to stat " + filename);
}

// Given a file, attempt to divide it into N chunks, with each chunk separated
// by a newline.
//
// Useful for dividing a JSON lines file into blocks suitable for parallel processing.
std::vector<OffsetAndLength> getNewlineChunks(const std::string &filename, uint64_t chunks) {
std::vector<OffsetAndLength> rv;

const uint64_t size = getFileSize(filename);
const uint64_t chunkSize = std::max<uint64_t>(size / chunks, 1ul);
FILE* fp = fopen(filename.c_str(), "r");

// Our approach is naive: skip chunkSize bytes, scan for a newline, repeat.
//
// Per UTF-8's ascii transparency property, a newline is guaranteed not to form
// part of any multi-byte character, so the byte '\n' reliably indicates a safe
// place to start a new chunk.
uint64_t offset = 0;
uint64_t length = 0;
char buffer[8192];
while (offset < size) {
// The last chunk will not be a full `chunkSize`.
length = std::min(chunkSize, size - offset);

if (fseek(fp, offset + length, SEEK_SET) != 0) throw std::runtime_error("unable to seek to " + std::to_string(offset) + " in " + filename);

bool foundNewline = false;

while(!foundNewline) {
size_t read = fread(buffer, 1, sizeof(buffer), fp);
if (read == 0) break;
for (int i = 0; i < read; i++) {
if (buffer[i] == '\n') {
length += i;
foundNewline = true;
break;
}
}

if (!foundNewline) length += read;
}

rv.push_back({offset, length});
offset += length;
}

fclose(fp);
return rv;
}
2 changes: 1 addition & 1 deletion src/tilemaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ int main(const int argc, const char* argv[]) {
if (!hasClippingBox) {
cerr << "Can't read shapefiles unless a bounding box is provided." << endl;
exit(EXIT_FAILURE);
} else if (ends_with(layer.source, "json") || ends_with(layer.source, "JSON")) {
} else if (ends_with(layer.source, "json") || ends_with(layer.source, "jsonl") || ends_with(layer.source, "JSON") || ends_with(layer.source, "JSONL") || ends_with(layer.source, "jsonseq") || ends_with(layer.source, "JSONSEQ")) {
cout << "Reading GeoJSON " << layer.name << endl;
geoJSONProcessor.read(layers.layers[layerNum], layerNum);
} else {
Expand Down
59 changes: 59 additions & 0 deletions test/helpers.test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include <iostream>
#include "external/minunit.h"
#include "helpers.h"

MU_TEST(test_get_chunks) {
{
auto rv = getNewlineChunks("test/test.jsonl", 1);
mu_check(rv.size() == 1);
mu_check(rv[0].offset == 0);
mu_check(rv[0].length == 24);
}

{
auto rv = getNewlineChunks("test/test.jsonl", 2);
mu_check(rv.size() == 2);
mu_check(rv[0].offset == 0);
mu_check(rv[0].length == 12);
mu_check(rv[1].offset == 12);
mu_check(rv[1].length == 12);
}

// Dividing into 3 chunks gives a lop-sided result; one of the chunks
// consists only of whitespace. This is OK.
{
auto rv = getNewlineChunks("test/test.jsonl", 3);
mu_check(rv.size() == 3);
mu_check(rv[0].offset == 0);
mu_check(rv[0].length == 12);
mu_check(rv[1].offset == 12);
mu_check(rv[1].length == 11);
mu_check(rv[2].offset == 23);
mu_check(rv[2].length == 1);
}

// Dividing into many more chunks than is possible devolves into
// one chunk per newline.
{
auto rv = getNewlineChunks("test/test.jsonl", 128);
mu_check(rv.size() == 4);
mu_check(rv[0].offset == 0);
mu_check(rv[0].length == 2);
mu_check(rv[1].offset == 2);
mu_check(rv[1].length == 10);
mu_check(rv[2].offset == 12);
mu_check(rv[2].length == 11);
mu_check(rv[3].offset == 23);
mu_check(rv[3].length == 1);
}
}

MU_TEST_SUITE(test_suite_get_chunks) {
MU_RUN_TEST(test_get_chunks);
}

int main() {
MU_RUN_SUITE(test_suite_get_chunks);
MU_REPORT();
return MU_EXIT_CODE;
}
3 changes: 3 additions & 0 deletions test/test.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{}
{"foo":1}
{"x":true}

0 comments on commit 3af62f6

Please sign in to comment.