Skip to content

Commit

Permalink
fix: change the type of oid in pg_namespace to u32 (#4541)
Browse files Browse the repository at this point in the history
* fix:  change the type of oid in pg_namespace to u32

* fix: header and correct logic of update oid
  • Loading branch information
J0HN50N133 authored Aug 10, 2024
1 parent 9532ffb commit 6694d2a
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 24 deletions.
17 changes: 12 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ rskafka = "0.5"
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ moka = { workspace = true, features = ["future", "sync"] }
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
rustc-hash.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/src/system_schema/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use pg_namespace::PGNamespace;
use table::TableRef;
pub use table_names::*;

use self::pg_namespace::oid_map::{PGNamespaceOidMap, PGNamespaceOidMapRef};
use super::memory_table::MemoryTable;
use super::utils::tables::u32_column;
use super::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
Expand All @@ -52,6 +53,9 @@ pub struct PGCatalogProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
tables: HashMap<String, TableRef>,

// Workaround to store mapping of schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}

impl SystemSchemaProvider for PGCatalogProvider {
Expand Down Expand Up @@ -85,6 +89,7 @@ impl PGCatalogProvider {
catalog_name,
catalog_manager,
tables: HashMap::new(),
namespace_oid_map: Arc::new(PGNamespaceOidMap::new()),
};
provider.build_tables();
provider
Expand Down Expand Up @@ -122,10 +127,12 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
table_names::PG_NAMESPACE => Some(Arc::new(PGNamespace::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_CLASS => Some(Arc::new(PGClass::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
_ => None,
}
Expand Down
24 changes: 19 additions & 5 deletions src/catalog/src/system_schema/pg_catalog/pg_class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;

use super::pg_namespace::oid_map::PGNamespaceOidMapRef;
use super::{OID_COLUMN_NAME, PG_CLASS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
Expand Down Expand Up @@ -60,22 +61,30 @@ pub(super) struct PGClass {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}

impl PGClass {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(RELNAME),
string_column(RELNAMESPACE),
u32_column(RELNAMESPACE),
string_column(RELKIND),
u32_column(RELOWNER),
]))
Expand All @@ -86,6 +95,7 @@ impl PGClass {
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}
Expand Down Expand Up @@ -155,10 +165,11 @@ struct PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,

oid: UInt32VectorBuilder,
relname: StringVectorBuilder,
relnamespace: StringVectorBuilder,
relnamespace: UInt32VectorBuilder,
relkind: StringVectorBuilder,
relowner: UInt32VectorBuilder,
}
Expand All @@ -168,15 +179,17 @@ impl PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,

oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
}
Expand Down Expand Up @@ -217,6 +230,7 @@ impl PGClassBuilder {
table: &str,
kind: &str,
) {
let namespace_oid = self.namespace_oid_map.get_oid(schema);
let row = [
(OID_COLUMN_NAME, &Value::from(oid)),
(RELNAMESPACE, &Value::from(schema)),
Expand All @@ -230,7 +244,7 @@ impl PGClassBuilder {
}

self.oid.push(Some(oid));
self.relnamespace.push(Some(schema));
self.relnamespace.push(Some(namespace_oid));
self.relname.push(Some(table));
self.relkind.push(Some(kind));
self.relowner.push(Some(DUMMY_OWNER_ID));
Expand Down
33 changes: 24 additions & 9 deletions src/catalog/src/system_schema/pg_catalog/pg_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(super) mod oid_map;

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
Expand All @@ -25,16 +27,16 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;

use super::{OID_COLUMN_NAME, PG_NAMESPACE};
use super::{PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::utils::tables::string_column;
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;

Expand All @@ -48,21 +50,29 @@ pub(super) struct PGNamespace {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

// Workaround to convert schema_name to a numeric id
oid_map: PGNamespaceOidMapRef,
}

impl PGNamespace {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
oid_map,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
// TODO(J0HN50N133): we do not have a numeric schema id, use schema name as a workaround. Use a proper schema id once we have it.
string_column(OID_COLUMN_NAME),
u32_column(OID_COLUMN_NAME),
string_column(NSPNAME),
]))
}
Expand All @@ -72,6 +82,7 @@ impl PGNamespace {
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.oid_map.clone(),
)
}
}
Expand Down Expand Up @@ -138,8 +149,9 @@ struct PGNamespaceBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,

oid: StringVectorBuilder,
oid: UInt32VectorBuilder,
nspname: StringVectorBuilder,
}

Expand All @@ -148,12 +160,14 @@ impl PGNamespaceBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
oid: StringVectorBuilder::with_capacity(INIT_CAPACITY),
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
nspname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
Expand All @@ -178,14 +192,15 @@ impl PGNamespaceBuilder {
}

fn add_namespace(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row = [
(OID_COLUMN_NAME, &Value::from(schema_name)),
(OID_COLUMN_NAME, &Value::from(oid)),
(NSPNAME, &Value::from(schema_name)),
];
if !predicates.eval(&row) {
return;
}
self.oid.push(Some(schema_name));
self.oid.push(Some(oid));
self.nspname.push(Some(schema_name));
}
}
Loading

0 comments on commit 6694d2a

Please sign in to comment.