Skip to content

Commit

Permalink
Merge pull request #532 from 0xPolygonHermez/DatabaseKV
Browse files Browse the repository at this point in the history
Database key Value
  • Loading branch information
rickb80 authored Sep 4, 2023
2 parents 0f134a2 + 0c1b7c4 commit 07fce30
Show file tree
Hide file tree
Showing 21 changed files with 2,278 additions and 341 deletions.
8 changes: 8 additions & 0 deletions src/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void Config::load(json &config)
ParseU16(config, "hashDBServerPort", "HASHDB_SERVER_PORT", hashDBServerPort, 50061);
ParseString(config, "hashDBURL", "HASHDB_URL", hashDBURL, "local");
ParseBool(config, "hashDB64", "HASHDB64", hashDB64, false);
ParseU64(config, "kvDBMaxVersions", "HASHDB64_MAX_VERSIONS", kvDBMaxVersions, 131072);
ParseString(config, "dbCacheSynchURL", "DB_CACHE_SYNCH_URL", dbCacheSynchURL, "");
ParseU16(config, "aggregatorServerPort", "AGGREGATOR_SERVER_PORT", aggregatorServerPort, 50081);
ParseU16(config, "aggregatorClientPort", "AGGREGATOR_CLIENT_PORT", aggregatorClientPort, 50081);
Expand Down Expand Up @@ -793,6 +794,9 @@ void Config::load(json &config)
ParseString(config, "databaseURL", "DATABASE_URL", databaseURL, "local");
ParseString(config, "dbNodesTableName", "DB_NODES_TABLE_NAME", dbNodesTableName, "state.nodes");
ParseString(config, "dbProgramTableName", "DB_PROGRAM_TABLE_NAME", dbProgramTableName, "state.program");
ParseString(config, "dbKeyValueTableName", "DB_KEYVALUE_TABLE_NAME", dbKeyValueTableName, "state.keyvalue");
ParseString(config, "dbKeyVersionTableName", "DB_VERSION_TABLE_NAME", dbVersionTableName, "state.version");
ParseString(config, "dbLatestVersionTableName", "DB_LATEST_VERSION_TABLE_NAME", dbLatestVersionTableName, "state.latestversion");
ParseBool(config, "dbMultiWrite", "DB_MULTIWRITE", dbMultiWrite, true);
ParseU64(config, "dbMultiWriteSingleQuerySize", "DB_MULTIWRITE_SINGLE_QUERY_SIZE", dbMultiWriteSingleQuerySize, 20*1024*1024);
ParseBool(config, "dbConnectionsPool", "DB_CONNECTIONS_POOL", dbConnectionsPool, true);
Expand Down Expand Up @@ -945,6 +949,7 @@ void Config::print(void)
zklog.info(" hashDBServerPort=" + to_string(hashDBServerPort));
zklog.info(" hashDBURL=" + hashDBURL);
zklog.info(" hashDB64=" + to_string(hashDB64));
zklog.info(" kvDBMaxVersions=" + to_string(kvDBMaxVersions));
zklog.info(" dbCacheSynchURL=" + dbCacheSynchURL);
zklog.info(" aggregatorServerPort=" + to_string(aggregatorServerPort));
zklog.info(" aggregatorClientPort=" + to_string(aggregatorClientPort));
Expand Down Expand Up @@ -990,6 +995,9 @@ void Config::print(void)
zklog.info(" databaseURL=" + databaseURL.substr(0, 5) + "...");
zklog.info(" dbNodesTableName=" + dbNodesTableName);
zklog.info(" dbProgramTableName=" + dbProgramTableName);
zklog.info(" dbKeyValueTableName=" + dbKeyValueTableName);
zklog.info(" dbVersionTableName=" + dbVersionTableName);
zklog.info(" dbLatestVersionTableName=" + dbLatestVersionTableName);
zklog.info(" dbMultiWrite=" + to_string(dbMultiWrite));
zklog.info(" dbMultiWriteSingleQuerySize=" + to_string(dbMultiWriteSingleQuerySize));
zklog.info(" dbConnectionsPool=" + to_string(dbConnectionsPool));
Expand Down
4 changes: 4 additions & 0 deletions src/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Config
uint16_t hashDBServerPort;
string hashDBURL;
bool hashDB64;
uint64_t kvDBMaxVersions;
string dbCacheSynchURL;

uint16_t aggregatorServerPort;
Expand Down Expand Up @@ -148,6 +149,9 @@ class Config
string databaseURL;
string dbNodesTableName;
string dbProgramTableName;
string dbKeyValueTableName;
string dbVersionTableName;
string dbLatestVersionTableName;
bool dbMultiWrite;
uint64_t dbMultiWriteSingleQuerySize;
bool dbConnectionsPool;
Expand Down
5 changes: 4 additions & 1 deletion src/config/zkresult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ struct
{ ZKR_SM_MAIN_MEMALIGN_WRITE8_MISMATCH, "ZKR_SM_MAIN_MEMALIGN_WRITE8_MISMATCH" },
{ ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH, "ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH" },
{ ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE" },
{ ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE" }
{ ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE" },
{ ZKR_DB_VERSION_NOT_FOUND, "ZKR_DB_VERSION_NOT_FOUND" }


};

string zkresult2string (int code)
Expand Down
2 changes: 2 additions & 0 deletions src/config/zkresult.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ typedef enum : int
ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH = 78, // Main state memory align read ROM operation check failed
ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE = 79, // Main state Keccak hash check found read out of range
ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE = 80, // Main state Poseidon hash check found read out of range
ZKR_DB_VERSION_NOT_FOUND = 81, // Version not found in KeyValue database

} zkresult;

string zkresult2string (int code);
Expand Down
4 changes: 2 additions & 2 deletions src/hashdb/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,13 +586,13 @@ zkresult Database::readRemote(bool bProgram, const string &key, string &value)
exitProcess();
}

pqxx::row const row = rows[0];
const pqxx::row& row = rows[0];
if (row.size() != 2)
{
zklog.error("Database::readRemote() table=" + tableName + " got an invalid number of colums for the row: " + to_string(row.size()));
exitProcess();
}
pqxx::field const fieldData = row[1];
const pqxx::field& fieldData = row[1];
value = removeBSXIfExists(fieldData.c_str());
}
catch (const std::exception &e)
Expand Down
107 changes: 40 additions & 67 deletions src/hashdb/database_associative_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,82 +89,53 @@ void DatabaseMTAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac
void DatabaseMTAssociativeCache::addKeyValue(Goldilocks::Element (&key)[4], const vector<Goldilocks::Element> &value, bool update)
{
lock_guard<recursive_mutex> guard(mlock);
bool emptySlot = false;
bool present = false;
uint32_t cacheIndex;
uint32_t tableIndexEmpty=0;

//
// Try to add in one of my 4 slots
// Check if present in one of the four slots
//
for (int i = 0; i < 4; ++i)
{
uint32_t tableIndex = (uint32_t)(key[i].fe & indexesMask);
uint32_t cacheIndexRaw = indexes[tableIndex];
uint32_t cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey, cacheIndexValue;
bool write = false;

if (emptyCacheSlot(cacheIndexRaw))
{
write = true;
indexes[tableIndex] = currentCacheIndex;
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);

cacheIndex = indexes[tableIndex] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexValue = cacheIndex * 12;
}
else
{
cacheIndexKey = cacheIndex * 4;
cacheIndexValue = cacheIndex * 12;
cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;

if (keys[cacheIndexKey + 0].fe == key[0].fe &&
if (!emptyCacheSlot(cacheIndexRaw)){
if( keys[cacheIndexKey + 0].fe == key[0].fe &&
keys[cacheIndexKey + 1].fe == key[1].fe &&
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe)
{
write = update;
}
else
{
continue;
keys[cacheIndexKey + 3].fe == key[3].fe){
if(update == false) return;
present = true;
break;
}
}else if (emptySlot == false){
emptySlot = true;
tableIndexEmpty = tableIndex;
}
if (write)
{
keys[cacheIndexKey + 0].fe = key[0].fe;
keys[cacheIndexKey + 1].fe = key[1].fe;
keys[cacheIndexKey + 2].fe = key[2].fe;
keys[cacheIndexKey + 3].fe = key[3].fe;
values[cacheIndexValue + 0] = value[0];
values[cacheIndexValue + 1] = value[1];
values[cacheIndexValue + 2] = value[2];
values[cacheIndexValue + 3] = value[3];
values[cacheIndexValue + 4] = value[4];
values[cacheIndexValue + 5] = value[5];
values[cacheIndexValue + 6] = value[6];
values[cacheIndexValue + 7] = value[7];
if (value.size() > 8)
{
values[cacheIndexValue + 8] = value[8];
values[cacheIndexValue + 9] = value[9];
values[cacheIndexValue + 10] = value[10];
values[cacheIndexValue + 11] = value[11];
}else{
values[cacheIndexValue + 8] = Goldilocks::zero();
values[cacheIndexValue + 9] = Goldilocks::zero();
values[cacheIndexValue + 10] = Goldilocks::zero();
values[cacheIndexValue + 11] = Goldilocks::zero();
}
return;
}else{
return;
}

//
// Evaluate cacheIndexKey and
//
if(!present){
if(emptySlot == true){
indexes[tableIndexEmpty] = currentCacheIndex;
}
cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
}
uint64_t cacheIndexKey, cacheIndexValue;
cacheIndexKey = cacheIndex * 4;
cacheIndexValue = cacheIndex * 12;

//
// forced entry insertion
// Add value
//
uint32_t cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexValue = cacheIndex * 12;
keys[cacheIndexKey + 0].fe = key[0].fe;
keys[cacheIndexKey + 1].fe = key[1].fe;
keys[cacheIndexKey + 2].fe = key[2].fe;
Expand All @@ -189,14 +160,16 @@ void DatabaseMTAssociativeCache::addKeyValue(Goldilocks::Element (&key)[4], cons
values[cacheIndexValue + 10] = Goldilocks::zero();
values[cacheIndexValue + 11] = Goldilocks::zero();
}

//
// Forced index insertion
//
int iters = 0;
uint32_t usedRawCacheIndexes[10];
usedRawCacheIndexes[0] = currentCacheIndex-1;
forcedInsertion(usedRawCacheIndexes, iters);

if(!present && !emptySlot){
int iters = 0;
uint32_t usedRawCacheIndexes[10];
usedRawCacheIndexes[0] = currentCacheIndex-1;
forcedInsertion(usedRawCacheIndexes, iters);
}
}

void DatabaseMTAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)[10], int &iters)
Expand Down Expand Up @@ -256,7 +229,7 @@ void DatabaseMTAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)

}

bool DatabaseMTAssociativeCache::findKey(Goldilocks::Element (&key)[4], vector<Goldilocks::Element> &value)
bool DatabaseMTAssociativeCache::findKey(const Goldilocks::Element (&key)[4], vector<Goldilocks::Element> &value)
{
lock_guard<recursive_mutex> guard(mlock);
attempts++;
Expand Down
3 changes: 1 addition & 2 deletions src/hashdb/database_associative_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "zkmax.hpp"

using namespace std;
using json = nlohmann::json;
class DatabaseMTAssociativeCache
{
private:
Expand Down Expand Up @@ -40,7 +39,7 @@ class DatabaseMTAssociativeCache

void postConstruct(int log2IndexesSize_, int log2CacheSize_, string name_);
void addKeyValue(Goldilocks::Element (&key)[4], const vector<Goldilocks::Element> &value, bool update);
bool findKey(Goldilocks::Element (&key)[4], vector<Goldilocks::Element> &value);
bool findKey(const Goldilocks::Element (&key)[4], vector<Goldilocks::Element> &value);
inline bool enabled() const { return (log2IndexesSize > 0); };
inline uint32_t getCacheSize() const { return cacheSize; };
inline uint32_t getIndexesSize() const { return indexesSize; };
Expand Down
11 changes: 6 additions & 5 deletions src/hashdb/database_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ bool DatabaseCache::addKeyValue(const string &key, const void * value, const boo
return true;
}

if (attempts%1000000 == 0)
{
zklog.info("DatabaseCache::addKeyValue() name=" + name + " count=" + to_string(cacheMap.size()) + " maxSize=" + to_string(maxSize) + " currentSize=" + to_string(currentSize) + " attempts=" + to_string(attempts) + " hits=" + to_string(hits) + " hit ratio=" + to_string(double(hits)*100.0/double(zkmax(attempts,1))) + "%");
}

DatabaseCacheRecord * record;
// If key already exists in the cache return. The findKey also sets the record in the head of the cache
if (findKey(key, record))
Expand Down Expand Up @@ -84,6 +79,12 @@ bool DatabaseCache::addKeyValue(const string &key, const void * value, const boo
bool DatabaseCache::findKey(const string &key, DatabaseCacheRecord* &record)
{
attempts++;

if (attempts%1000000 == 0)
{
zklog.info("DatabaseCache::addKeyValue() name=" + name + " count=" + to_string(cacheMap.size()) + " maxSize=" + to_string(maxSize) + " currentSize=" + to_string(currentSize) + " attempts=" + to_string(attempts) + " hits=" + to_string(hits) + " hit ratio=" + to_string(double(hits)*100.0/double(zkmax(attempts,1))) + "%");
}

unordered_map<string, DatabaseCacheRecord*>::iterator it = cacheMap.find(key);

if (it != cacheMap.end())
Expand Down
90 changes: 90 additions & 0 deletions src/hashdb/database_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@ void DatabaseMap::add(MTMap &db)
if (callbackOnChange) onChangeCallback();
}

void DatabaseMap::add(MT64Map &db)
{
lock_guard<recursive_mutex> guard(mlock);

mt64DB.insert(db.begin(), db.end());
if (callbackOnChange) onChangeCallback();
}

void DatabaseMap::add(MT64KVMap &db)
{
lock_guard<recursive_mutex> guard(mlock);

mt64KVDB.insert(db.begin(), db.end());
if (callbackOnChange) onChangeCallback();
}

void DatabaseMap::add(MT64VersionMap &db)
{
lock_guard<recursive_mutex> guard(mlock);

mt64VersionDB.insert(db.begin(), db.end());
if (callbackOnChange) onChangeCallback();
}

void DatabaseMap::add(ProgramMap &db)
{
lock_guard<recursive_mutex> guard(mlock);
Expand All @@ -37,6 +61,51 @@ bool DatabaseMap::findMT(const string& key, vector<Goldilocks::Element> &value)
return false;
}

bool DatabaseMap::findMT64(const string& key, string &value)
{
lock_guard<recursive_mutex> guard(mlock);

DatabaseMap::MT64Map::iterator it = mt64DB.find(key);

if (it != mt64DB.end())
{
value = it->second;
return true;
}

return false;
}

bool DatabaseMap::findMT64KV(const string& key, mpz_class &value)
{
lock_guard<recursive_mutex> guard(mlock);

DatabaseMap::MT64KVMap::iterator it = mt64KVDB.find(key);

if (it != mt64KVDB.end())
{
value = it->second;
return true;
}

return false;
}

bool DatabaseMap::findMT64Version(const string& key, uint64_t &value)
{
lock_guard<recursive_mutex> guard(mlock);

DatabaseMap::MT64VersionMap::iterator it = mt64VersionDB.find(key);

if (it != mt64VersionDB.end())
{
value = it->second;
return true;
}

return false;
}

bool DatabaseMap::findProgram(const string& key, vector<uint8_t> &value)
{
lock_guard<recursive_mutex> guard(mlock);
Expand All @@ -59,6 +128,27 @@ DatabaseMap::MTMap DatabaseMap::getMTDB()
return mtDB;
}

DatabaseMap::MT64Map DatabaseMap::getMT64DB()
{
lock_guard<recursive_mutex> guard(mlock);

return mt64DB;
}

DatabaseMap::MT64KVMap DatabaseMap::getMT64KVDB()
{
lock_guard<recursive_mutex> guard(mlock);

return mt64KVDB;
}

DatabaseMap::MT64VersionMap DatabaseMap::getMT64VersionDB()
{
lock_guard<recursive_mutex> guard(mlock);

return mt64VersionDB;
}

DatabaseMap::ProgramMap DatabaseMap::getProgramDB()
{
lock_guard<recursive_mutex> guard(mlock);
Expand Down
Loading

0 comments on commit 07fce30

Please sign in to comment.