diff --git a/Makefile b/Makefile index 8ee402bb..174e9d3a 100644 --- a/Makefile +++ b/Makefile @@ -124,6 +124,7 @@ test: \ test_append_vector \ test_attribute_store \ test_deque_map \ + test_helpers \ test_pbf_reader \ test_pooled_string \ test_relation_roles \ @@ -147,6 +148,11 @@ test_deque_map: \ test/deque_map.test.o $(CXX) $(CXXFLAGS) -o test.deque_map $^ $(INC) $(LIB) $(LDFLAGS) && ./test.deque_map +test_helpers: \ + src/helpers.o \ + test/helpers.test.o + $(CXX) $(CXXFLAGS) -o test.helpers $^ $(INC) $(LIB) $(LDFLAGS) && ./test.helpers + test_options_parser: \ src/options_parser.o \ test/options_parser.test.o @@ -158,7 +164,6 @@ test_pooled_string: \ test/pooled_string.test.o $(CXX) $(CXXFLAGS) -o test.pooled_string $^ $(INC) $(LIB) $(LDFLAGS) && ./test.pooled_string - test_relation_roles: \ src/relation_roles.o \ test/relation_roles.test.o diff --git a/include/geojson_processor.h b/include/geojson_processor.h index ffef3dba..51d45d01 100644 --- a/include/geojson_processor.h +++ b/include/geojson_processor.h @@ -31,6 +31,9 @@ class GeoJSONProcessor { OsmLuaProcessing &osmLuaProcessing; std::mutex attributeMutex; + void readFeatureCollection(class LayerDef &layer, uint layerNum); + void readFeatureLines(class LayerDef &layer, uint layerNum); + template void processFeature(rapidjson::GenericObject feature, class LayerDef &layer, uint layerNum); diff --git a/include/helpers.h b/include/helpers.h index de490874..9df8c825 100644 --- a/include/helpers.h +++ b/include/helpers.h @@ -28,6 +28,14 @@ inline std::vector split_string(std::string &inputStr, char sep) { return res; } +struct OffsetAndLength { + uint64_t offset; + uint64_t length; +}; + +uint64_t getFileSize(std::string filename); +std::vector getNewlineChunks(const std::string &filename, uint64_t chunks); + void decompress_string(std::string& output, const char* input, uint32_t inputSize, bool asGzip = false); double bboxElementFromStr(const std::string& number); diff --git a/src/geojson_processor.cpp b/src/geojson_processor.cpp index 38965718..a175baf8 100644 --- a/src/geojson_processor.cpp +++ b/src/geojson_processor.cpp @@ -1,5 +1,6 @@ #include "geojson_processor.h" +#include "helpers.h" #include #include @@ -14,8 +15,14 @@ namespace geom = boost::geometry; // Read GeoJSON, and create OutputObjects for all objects within the specified bounding box void GeoJSONProcessor::read(class LayerDef &layer, uint layerNum) { + if (ends_with(layer.source, "JSONL") || ends_with(layer.source, "jsonl") || ends_with(layer.source, "jsonseq") || ends_with(layer.source, "JSONSEQ")) + return readFeatureLines(layer, layerNum); - // Parse the JSON file into a RapidJSON document + readFeatureCollection(layer, layerNum); +} + +void GeoJSONProcessor::readFeatureCollection(class LayerDef &layer, uint layerNum) { + // Read a JSON file containing a single GeoJSON FeatureCollection object. rapidjson::Document doc; FILE* fp = fopen(layer.source.c_str(), "r"); char readBuffer[65536]; @@ -38,6 +45,34 @@ void GeoJSONProcessor::read(class LayerDef &layer, uint layerNum) { pool.join(); } +void GeoJSONProcessor::readFeatureLines(class LayerDef &layer, uint layerNum) { + // Read a JSON file containing multiple GeoJSON items, newline-delimited. + std::vector chunks = getNewlineChunks(layer.source, threadNum * 4); + + // Process each feature + boost::asio::thread_pool pool(threadNum); + for (auto &chunk : chunks) { + boost::asio::post(pool, [&]() { + FILE* fp = fopen(layer.source.c_str(), "r"); + if (fseek(fp, chunk.offset, SEEK_SET) != 0) throw std::runtime_error("unable to seek to " + std::to_string(chunk.offset) + " in " + layer.source); + char readBuffer[65536]; + rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer)); + + while(is.Tell() < chunk.length) { + auto doc = rapidjson::Document(); + doc.ParseStream(is); + if (doc.HasParseError()) { throw std::runtime_error("Invalid JSON file."); } + processFeature(std::move(doc.GetObject()), layer, layerNum); + + // Skip whitespace. + while(is.Tell() < chunk.length && isspace(is.Peek())) is.Take(); + } + fclose(fp); + }); + } + pool.join(); +} + template void GeoJSONProcessor::processFeature(rapidjson::GenericObject feature, class LayerDef &layer, uint layerNum) { diff --git a/src/helpers.cpp b/src/helpers.cpp index df210b95..69aa7853 100644 --- a/src/helpers.cpp +++ b/src/helpers.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -7,8 +8,13 @@ #include #include +#include #include "helpers.h" +#ifdef _MSC_VER +#define stat64 __stat64 +#endif + #define MOD_GZIP_ZLIB_WINDOWSIZE 15 #define MOD_GZIP_ZLIB_CFACTOR 9 #define MOD_GZIP_ZLIB_BSIZE 8096 @@ -152,3 +158,61 @@ std::string boost_validity_error(unsigned failure) { default: return "something mysterious wrong with it, Boost validity_failure_type " + to_string(failure); } } + +uint64_t getFileSize(std::string filename) { + struct stat64 statBuf; + int rc = stat64(filename.c_str(), &statBuf); + + if (rc == 0) return statBuf.st_size; + + throw std::runtime_error("unable to stat " + filename); +} + +// Given a file, attempt to divide it into N chunks, with each chunk separated +// by a newline. +// +// Useful for dividing a JSON lines file into blocks suitable for parallel processing. +std::vector getNewlineChunks(const std::string &filename, uint64_t chunks) { + std::vector rv; + + const uint64_t size = getFileSize(filename); + const uint64_t chunkSize = std::max(size / chunks, 1ul); + FILE* fp = fopen(filename.c_str(), "r"); + + // Our approach is naive: skip chunkSize bytes, scan for a newline, repeat. + // + // Per UTF-8's ascii transparency property, a newline is guaranteed not to form + // part of any multi-byte character, so the byte '\n' reliably indicates a safe + // place to start a new chunk. + uint64_t offset = 0; + uint64_t length = 0; + char buffer[8192]; + while (offset < size) { + // The last chunk will not be a full `chunkSize`. + length = std::min(chunkSize, size - offset); + + if (fseek(fp, offset + length, SEEK_SET) != 0) throw std::runtime_error("unable to seek to " + std::to_string(offset) + " in " + filename); + + bool foundNewline = false; + + while(!foundNewline) { + size_t read = fread(buffer, 1, sizeof(buffer), fp); + if (read == 0) break; + for (int i = 0; i < read; i++) { + if (buffer[i] == '\n') { + length += i; + foundNewline = true; + break; + } + } + + if (!foundNewline) length += read; + } + + rv.push_back({offset, length}); + offset += length; + } + + fclose(fp); + return rv; +} diff --git a/src/tilemaker.cpp b/src/tilemaker.cpp index b18b726f..ec61f84b 100644 --- a/src/tilemaker.cpp +++ b/src/tilemaker.cpp @@ -244,7 +244,7 @@ int main(const int argc, const char* argv[]) { if (!hasClippingBox) { cerr << "Can't read shapefiles unless a bounding box is provided." << endl; exit(EXIT_FAILURE); - } else if (ends_with(layer.source, "json") || ends_with(layer.source, "JSON")) { + } else if (ends_with(layer.source, "json") || ends_with(layer.source, "jsonl") || ends_with(layer.source, "JSON") || ends_with(layer.source, "JSONL") || ends_with(layer.source, "jsonseq") || ends_with(layer.source, "JSONSEQ")) { cout << "Reading GeoJSON " << layer.name << endl; geoJSONProcessor.read(layers.layers[layerNum], layerNum); } else { diff --git a/test/helpers.test.cpp b/test/helpers.test.cpp new file mode 100644 index 00000000..79532a97 --- /dev/null +++ b/test/helpers.test.cpp @@ -0,0 +1,59 @@ +#include +#include "external/minunit.h" +#include "helpers.h" + +MU_TEST(test_get_chunks) { + { + auto rv = getNewlineChunks("test/test.jsonl", 1); + mu_check(rv.size() == 1); + mu_check(rv[0].offset == 0); + mu_check(rv[0].length == 24); + } + + { + auto rv = getNewlineChunks("test/test.jsonl", 2); + mu_check(rv.size() == 2); + mu_check(rv[0].offset == 0); + mu_check(rv[0].length == 12); + mu_check(rv[1].offset == 12); + mu_check(rv[1].length == 12); + } + + // Dividing into 3 chunks gives a lop-sided result; one of the chunks + // consists only of whitespace. This is OK. + { + auto rv = getNewlineChunks("test/test.jsonl", 3); + mu_check(rv.size() == 3); + mu_check(rv[0].offset == 0); + mu_check(rv[0].length == 12); + mu_check(rv[1].offset == 12); + mu_check(rv[1].length == 11); + mu_check(rv[2].offset == 23); + mu_check(rv[2].length == 1); + } + + // Dividing into many more chunks than is possible devolves into + // one chunk per newline. + { + auto rv = getNewlineChunks("test/test.jsonl", 128); + mu_check(rv.size() == 4); + mu_check(rv[0].offset == 0); + mu_check(rv[0].length == 2); + mu_check(rv[1].offset == 2); + mu_check(rv[1].length == 10); + mu_check(rv[2].offset == 12); + mu_check(rv[2].length == 11); + mu_check(rv[3].offset == 23); + mu_check(rv[3].length == 1); + } +} + +MU_TEST_SUITE(test_suite_get_chunks) { + MU_RUN_TEST(test_get_chunks); +} + +int main() { + MU_RUN_SUITE(test_suite_get_chunks); + MU_REPORT(); + return MU_EXIT_CODE; +} diff --git a/test/test.jsonl b/test/test.jsonl new file mode 100644 index 00000000..915b080c --- /dev/null +++ b/test/test.jsonl @@ -0,0 +1,3 @@ +{} +{"foo":1} +{"x":true}