diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index 5ae510aab23b..aa9fd103a50c 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -58,7 +58,7 @@ async fn main() -> Result<()> { // context will by default have MemoryCatalogList ctx.register_catalog_list(catlist.clone()); - // intitialize our catalog and schemas + // initialize our catalog and schemas let catalog = DirCatalog::new(); let parquet_schema = DirSchema::create( &state, diff --git a/docs/source/index.rst b/docs/source/index.rst index 4f45771173bf..acc17ebeff0f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -63,6 +63,23 @@ The `developer’s guide`_ contains information on how to contribute. user-guide/configs user-guide/faq +.. _toc.library-user-guide: + +.. toctree:: + :maxdepth: 1 + :caption: Library User Guide + + library-user-guide/index + library-user-guide/using-the-sql-api + library-user-guide/working-with-exprs + library-user-guide/using-the-dataframe-api + library-user-guide/building-logical-plans + library-user-guide/catalogs + library-user-guide/adding-udfs + library-user-guide/custom-table-providers + library-user-guide/extending-operators + library-user-guide/execution-plans + .. _toc.contributor-guide: .. toctree:: diff --git a/docs/source/library-user-guide/adding-udfs.md b/docs/source/library-user-guide/adding-udfs.md new file mode 100644 index 000000000000..45d5afc1f07a --- /dev/null +++ b/docs/source/library-user-guide/adding-udfs.md @@ -0,0 +1,22 @@ + + +# Adding User Defined Functions: Scalar/Window/Aggregate + +Coming Soon diff --git a/docs/source/library-user-guide/building-logical-plans.md b/docs/source/library-user-guide/building-logical-plans.md new file mode 100644 index 000000000000..406f4881129c --- /dev/null +++ b/docs/source/library-user-guide/building-logical-plans.md @@ -0,0 +1,22 @@ + + +# Building Logical Plans + +Coming Soon diff --git a/docs/source/library-user-guide/catalogs.md b/docs/source/library-user-guide/catalogs.md new file mode 100644 index 000000000000..226e28332453 --- /dev/null +++ b/docs/source/library-user-guide/catalogs.md @@ -0,0 +1,179 @@ + + +# Catalogs, Schemas, and Tables + +This section describes how to create and manage catalogs, schemas, and tables in DataFusion. For those wanting to dive into the code quickly please see the [example](../../../datafusion-examples/examples/catalog.rs). + +## General Concepts + +Catalogs, schemas, and tables are organized in a hierarchy. A catalog contains schemas, and a schema contains tables. + +DataFusion comes with a basic in memory catalog functionality in the [`catalog` module]. You can use these in memory implementations as is, or extend DataFusion with your own catalog implementations, for example based on local files or files on remote object storage. + +[`catalog` module]: https://docs.rs/datafusion/latest/datafusion/catalog/index.html + +Similarly to other concepts in DataFusion, you'll implement various traits to create your own catalogs, schemas, and tables. The following sections describe the traits you'll need to implement. + +The `CatalogProvider` trait has methods to set a schema to a name, get a schema by name, and list all schemas. The `SchemaProvider`, which can be registered with a `CatalogProvider`, has methods to set a table to a name, get a table by name, list all tables, deregister a table, and check for a table's existence. The `TableProvider` trait has methods to scan underlying data and use it in DataFusion. The `TableProvider` trait is covered in more detail [here](./custom-table-providers.md). + +In the following example, we'll implement an in memory catalog, starting with the `SchemaProvider` trait as we need one to register with the `CatalogProvider`. + +## Implementing `MemorySchemaProvider` + +The `MemorySchemaProvider` is a simple implementation of the `SchemaProvider` trait. It stores state (i.e. tables) in a `DashMap`, which then underlies the `SchemaProvider` trait. + +```rust +pub struct MemorySchemaProvider { + tables: DashMap>, +} +``` + +`tables` is the key-value pair described above. The underlying state could also be another data structure or other storage mechanism such as a file or transactional database. + +Then we implement the `SchemaProvider` trait for `MemorySchemaProvider`. + +```rust +#[async_trait] +impl SchemaProvider for MemorySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables + .iter() + .map(|table| table.key().clone()) + .collect() + } + + async fn table(&self, name: &str) -> Option> { + self.tables.get(name).map(|table| table.value().clone()) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + if self.table_exist(name.as_str()) { + return Err(DataFusionError::Execution(format!( + "The table {name} already exists" + ))); + } + Ok(self.tables.insert(name, table)) + } + + fn deregister_table(&self, name: &str) -> Result>> { + Ok(self.tables.remove(name).map(|(_, table)| table)) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} +``` + +Without getting into a `CatalogProvider` implementation, we can create a `MemorySchemaProvider` and register `TableProvider`s with it. + +```rust +let schema_provider = Arc::new(MemorySchemaProvider::new()); +let table_provider = _; // create a table provider + +schema_provider.register_table("table_name".to_string(), table_provider); + +let table = schema_provider.table("table_name").unwrap(); +``` + +### Asynchronous `SchemaProvider` + +It's often useful to fetch metadata about which tables are in a schema, from a remote source. For example, a schema provider could fetch metadata from a remote database. To support this, the `SchemaProvider` trait has an asynchronous `table` method. + +The trait is roughly the same except for the `table` method, and the addition of the `#[async_trait]` attribute. + +```rust +#[async_trait] +impl SchemaProvider for Schema { + async fn table(&self, name: &str) -> Option> { + // fetch metadata from remote source + } +} +``` + +### Implementing `MemoryCatalogProvider` + +As mentioned, the `CatalogProvider` can manage the schemas in a catalog, and the `MemoryCatalogProvider` is a simple implementation of the `CatalogProvider` trait. It stores schemas in a `DashMap`. + +```rust +pub struct MemoryCatalogList { + /// Collection of catalogs containing schemas and ultimately TableProviders + pub catalogs: DashMap>, +} +``` + +With that the `CatalogProvider` trait can be implemented. + +```rust +impl CatalogProvider for MemoryCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas.iter().map(|s| s.key().clone()).collect() + } + + fn schema(&self, name: &str) -> Option> { + self.schemas.get(name).map(|s| s.value().clone()) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + Ok(self.schemas.insert(name.into(), schema)) + } + + fn deregister_schema( + &self, + name: &str, + cascade: bool, + ) -> Result>> { + /// `cascade` is not used here, but can be used to control whether + /// to delete all tables in the schema or not. + if let Some(schema) = self.schema(name) { + let (_, removed) = self.schemas.remove(name).unwrap(); + Ok(Some(removed)) + } else { + Ok(None) + } + } +} +``` + +Again, this is fairly straightforward, as there's an underlying data structure to store the state, via key-value pairs. + +## Recap + +To recap, you need to: + +1. Implement the `TableProvider` trait to create a table provider, or use an existing one. +2. Implement the `SchemaProvider` trait to create a schema provider, or use an existing one. +3. Implement the `CatalogProvider` trait to create a catalog provider, or use an existing one. diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md new file mode 100644 index 000000000000..30721d6a5ba6 --- /dev/null +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -0,0 +1,161 @@ + + +# Custom Table Provider + +Like other areas of DataFusion, you extend DataFusion's functionality by implementing a trait. The `TableProvider` and associated traits, have methods that allow you to implement a custom table provider, i.e. use DataFusion's other functionality with your custom data source. + +This section will also touch on how to have DataFusion use the new `TableProvider` implementation. + +## Table Provider and Scan + +The `scan` method on the `TableProvider` is likely its most important. It returns an `ExecutionPlan` that DataFusion will use to read the actual data during execution o the query. + +### Scan + +As mentioned, `scan` returns an execution plan, and in particular a `Result>`. The core of this is returning something that can be dynamically dispatched to an `ExecutionPlan`. And as per the general DataFusion idea, we'll need to implement it. + +#### Execution Plan + +The `ExecutionPlan` trait at its core is a way to get a stream of batches. The aptly-named `execute` method returns a `Result`, which should be a stream of `RecordBatch`es that can be sent across threads, and has a schema that matches the data to be contained in those batches. + +There are many different types of `SendableRecordBatchStream` implemented in DataFusion -- you can use a pre existing one, such as `MemoryStream` (if your `RecordBatch`es are all in memory) or implement your own custom logic, depending on your usecase. + +Looking at the [example in this repo][ex], the execute method: + +```rust +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, +} + +impl ExecutionPlan for CustomExec { + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let users: Vec = { + let db = self.db.inner.lock().unwrap(); + db.data.values().cloned().collect() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new( + self.projected_schema.clone(), + vec![ + Arc::new(id_array.finish()), + Arc::new(account_array.finish()), + ], + )?], + self.schema(), + None, + )?)) + } +} +``` + +This: + +1. Gets the users from the database +2. Constructs the individual output arrays (columns) +3. Returns a `MemoryStream` of a single `RecordBatch` with the arrays + +I.e. returns the "physical" data. For other examples, refer to the [`CsvExec`][csv] and [`ParquetExec`][parquet] for more complex implementations. + +With the `ExecutionPlan` implemented, we can now implement the `scan` method of the `TableProvider`. + +#### Scan Revisited + +The `scan` method of the `TableProvider` returns a `Result>`. We can use the `Arc` to return a reference-counted pointer to the `ExecutionPlan` we implemented. In the example, this is done by: + +```rust +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + schema: SchemaRef, + ) -> Result> { + Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + return self.create_physical_plan(projection, self.schema()).await; + } +} +``` + +With this, and the implementation of the omitted methods, we can now use the `CustomDataSource` as a `TableProvider` in DataFusion. + +## Using the Custom Table Provider + +In order to use the custom table provider, we need to register it with DataFusion. This is done by creating a `TableProvider` and registering it with the `ExecutionContext`. + +```rust +let mut ctx = ExecutionContext::new(); + +let custom_table_provider = CustomDataSource::new(); +ctx.register_table("custom_table", Arc::new(custom_table_provider)); +``` + +This will allow you to use the custom table provider in DataFusion. For example, you could use it in a SQL query to get a `DataFrame`. + +```rust +let df = ctx.sql("SELECT id, bank_account FROM custom_table")?; +``` + +## Recap + +To recap, in order to implement a custom table provider, you need to: + +1. Implement the `TableProvider` trait +2. Implement the `ExecutionPlan` trait +3. Register the `TableProvider` with the `ExecutionContext` + +## Next Steps + +As mentioned the [csv] and [parquet] implementations are good examples of how to implement a `TableProvider`. The [example in this repo][ex] is a good example of how to implement a `TableProvider` that uses a custom data source. + +More abstractly, see the following traits for more information on how to implement a custom `TableProvider` for a file format: + +- `FileOpener` - a trait for opening a file and inferring the schema +- `FileFormat` - a trait for reading a file format +- `ListingTableProvider` - a useful trait for implementing a `TableProvider` that lists files in a directory + +[ex]: https://github.com/apache/arrow-datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion-examples/examples/custom_datasource.rs#L214C1-L276 +[csv]: https://github.com/apache/arrow-datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/csv.rs#L57-L70 +[parquet]: https://github.com/apache/arrow-datafusion/blob/a5e86fae3baadbd99f8fd0df83f45fde22f7b0c6/datafusion/core/src/datasource/physical_plan/parquet.rs#L77-L104 diff --git a/docs/source/library-user-guide/execution-plans.md b/docs/source/library-user-guide/execution-plans.md new file mode 100644 index 000000000000..516915168585 --- /dev/null +++ b/docs/source/library-user-guide/execution-plans.md @@ -0,0 +1,22 @@ + + +# Execution Plans + +More information will be added here soon, but for now, see the [Custom Table Provider](./custom-table-providers.md) documentation for an example implementation. diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md new file mode 100644 index 000000000000..631bdc67975a --- /dev/null +++ b/docs/source/library-user-guide/extending-operators.md @@ -0,0 +1,22 @@ + + +# Extending DataFusion's operators: custom LogicalPlan and Execution Plans + +Coming soon diff --git a/docs/source/library-user-guide/index.md b/docs/source/library-user-guide/index.md new file mode 100644 index 000000000000..47257e0c926e --- /dev/null +++ b/docs/source/library-user-guide/index.md @@ -0,0 +1,26 @@ + + +# Introduction + +The library user guide explains how to use the DataFusion library as a dependency in your Rust project. Please check out the user-guide for more details on how to use DataFusion's SQL and DataFrame APIs, or the contributor guide for details on how to contribute to DataFusion. + +If you haven't reviewed the [architecture section in the docs][docs], it's a useful place to get the lay of the land before starting down a specific path. + +[docs]: https://docs.rs/datafusion/latest/datafusion/#architecture diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md new file mode 100644 index 000000000000..fdf309980dc2 --- /dev/null +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -0,0 +1,22 @@ + + +# Using the DataFrame API + +Coming Soon diff --git a/docs/source/library-user-guide/using-the-sql-api.md b/docs/source/library-user-guide/using-the-sql-api.md new file mode 100644 index 000000000000..f4e85ee4e3a9 --- /dev/null +++ b/docs/source/library-user-guide/using-the-sql-api.md @@ -0,0 +1,22 @@ + + +# Using the SQL API + +Coming Soon diff --git a/docs/source/library-user-guide/working-with-exprs.md b/docs/source/library-user-guide/working-with-exprs.md new file mode 100644 index 000000000000..b1a26cdfcb51 --- /dev/null +++ b/docs/source/library-user-guide/working-with-exprs.md @@ -0,0 +1,22 @@ + + +# Working with Exprs + +Coming Soon