diff --git a/.github/workflows/rust.yaml b/.github/workflows/rust.yaml
index 63cdf38e..bdbf4059 100644
--- a/.github/workflows/rust.yaml
+++ b/.github/workflows/rust.yaml
@@ -38,7 +38,7 @@ jobs:
         name: Download duckdb
         with:
           repository: "duckdb/duckdb"
-          tag: "v0.5.0"
+          tag: "v0.5.1"
           fileName: ${{ matrix.duckdb }}
           out-file-path: .
 
diff --git a/Cargo.toml b/Cargo.toml
index 0d41936a..ea4f936a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "duckdb"
-version = "0.5.0"
+version = "0.5.1"
 authors = ["wangfenjin <wangfenj@gmail.com>"]
 edition = "2021"
 description = "Ergonomic wrapper for DuckDB"
@@ -69,7 +69,7 @@ tempdir = "0.3.7"
 
 [dependencies.libduckdb-sys]
 path = "libduckdb-sys"
-version = "0.5.0"
+version = "0.5.1"
 
 [package.metadata.docs.rs]
 features = []
diff --git a/libduckdb-sys/Cargo.toml b/libduckdb-sys/Cargo.toml
index 3ff34472..8eb8d431 100644
--- a/libduckdb-sys/Cargo.toml
+++ b/libduckdb-sys/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "libduckdb-sys"
-version = "0.5.0"
+version = "0.5.1"
 authors = ["wangfenjin <wangfenj@gmail.com>"]
 edition = "2021"
 build = "build.rs"
diff --git a/libduckdb-sys/duckdb/bindgen_bundled_version.rs b/libduckdb-sys/duckdb/bindgen_bundled_version.rs
index 52b09dc8..0ba42c10 100644
--- a/libduckdb-sys/duckdb/bindgen_bundled_version.rs
+++ b/libduckdb-sys/duckdb/bindgen_bundled_version.rs
@@ -106,6 +106,7 @@ pub const __DARWIN_NO_LONG_LONG: u32 = 0;
 pub const _DARWIN_FEATURE_64_BIT_INODE: u32 = 1;
 pub const _DARWIN_FEATURE_ONLY_UNIX_CONFORMANCE: u32 = 1;
 pub const _DARWIN_FEATURE_UNIX_CONFORMANCE: u32 = 3;
+pub const __has_ptrcheck: u32 = 0;
 pub const __PTHREAD_SIZE__: u32 = 8176;
 pub const __PTHREAD_ATTR_SIZE__: u32 = 56;
 pub const __PTHREAD_MUTEXATTR_SIZE__: u32 = 8;
@@ -202,6 +203,8 @@ pub const __MAC_11_5: u32 = 110500;
 pub const __MAC_11_6: u32 = 110600;
 pub const __MAC_12_0: u32 = 120000;
 pub const __MAC_12_1: u32 = 120100;
+pub const __MAC_12_2: u32 = 120200;
+pub const __MAC_12_3: u32 = 120300;
 pub const __IPHONE_2_0: u32 = 20000;
 pub const __IPHONE_2_1: u32 = 20100;
 pub const __IPHONE_2_2: u32 = 20200;
@@ -260,6 +263,8 @@ pub const __IPHONE_14_8: u32 = 140800;
 pub const __IPHONE_15_0: u32 = 150000;
 pub const __IPHONE_15_1: u32 = 150100;
 pub const __IPHONE_15_2: u32 = 150200;
+pub const __IPHONE_15_3: u32 = 150300;
+pub const __IPHONE_15_4: u32 = 150400;
 pub const __TVOS_9_0: u32 = 90000;
 pub const __TVOS_9_1: u32 = 90100;
 pub const __TVOS_9_2: u32 = 90200;
@@ -291,6 +296,8 @@ pub const __TVOS_14_7: u32 = 140700;
 pub const __TVOS_15_0: u32 = 150000;
 pub const __TVOS_15_1: u32 = 150100;
 pub const __TVOS_15_2: u32 = 150200;
+pub const __TVOS_15_3: u32 = 150300;
+pub const __TVOS_15_4: u32 = 150400;
 pub const __WATCHOS_1_0: u32 = 10000;
 pub const __WATCHOS_2_0: u32 = 20000;
 pub const __WATCHOS_2_1: u32 = 20100;
@@ -320,6 +327,8 @@ pub const __WATCHOS_7_6: u32 = 70600;
 pub const __WATCHOS_8_0: u32 = 80000;
 pub const __WATCHOS_8_1: u32 = 80100;
 pub const __WATCHOS_8_3: u32 = 80300;
+pub const __WATCHOS_8_4: u32 = 80400;
+pub const __WATCHOS_8_5: u32 = 80500;
 pub const MAC_OS_X_VERSION_10_0: u32 = 1000;
 pub const MAC_OS_X_VERSION_10_1: u32 = 1010;
 pub const MAC_OS_X_VERSION_10_2: u32 = 1020;
@@ -357,7 +366,7 @@ pub const MAC_OS_VERSION_12_0: u32 = 120000;
 pub const __DRIVERKIT_19_0: u32 = 190000;
 pub const __DRIVERKIT_20_0: u32 = 200000;
 pub const __DRIVERKIT_21_0: u32 = 210000;
-pub const __MAC_OS_X_VERSION_MAX_ALLOWED: u32 = 120100;
+pub const __MAC_OS_X_VERSION_MAX_ALLOWED: u32 = 120300;
 pub const __ENABLE_LEGACY_MAC_AVAILABILITY: u32 = 1;
 pub const __DARWIN_WCHAR_MIN: i32 = -2147483648;
 pub const _FORTIFY_SOURCE: u32 = 2;
@@ -14568,7 +14577,10 @@ extern "C" {
     pub fn valloc(arg1: size_t) -> *mut ::std::os::raw::c_void;
 }
 extern "C" {
-    pub fn aligned_alloc(__alignment: size_t, __size: size_t) -> *mut ::std::os::raw::c_void;
+    pub fn aligned_alloc(
+        __alignment: ::std::os::raw::c_ulong,
+        __size: ::std::os::raw::c_ulong,
+    ) -> *mut ::std::os::raw::c_void;
 }
 extern "C" {
     pub fn posix_memalign(
diff --git a/libduckdb-sys/duckdb/duckdb.cpp b/libduckdb-sys/duckdb/duckdb.cpp
index 815fcddd..643b0171 100644
--- a/libduckdb-sys/duckdb/duckdb.cpp
+++ b/libduckdb-sys/duckdb/duckdb.cpp
@@ -1219,8 +1219,8 @@ class FunctionExpression : public ParsedExpression {
 					return function_name + "(" + entry.children[0]->ToString() + ")";
 				}
 			} else if (entry.children.size() == 2) {
-				return "(" + entry.children[0]->ToString() + " " + function_name + " " + entry.children[1]->ToString() +
-				       ")";
+				return StringUtil::Format("(%s) %s (%s)", entry.children[0]->ToString(), function_name,
+				                          entry.children[1]->ToString());
 			}
 		}
 		// standard function call
@@ -1736,6 +1736,11 @@ class ChunkVectorInfo : public ChunkInfo {
 	void CommitAppend(transaction_t commit_id, idx_t start, idx_t end) override;
 
 	void Append(idx_t start, idx_t end, transaction_t commit_id);
+	//! Performs a delete in the ChunkVectorInfo - returns how many tuples were actually deleted
+	//! The number of rows that were actually deleted might be lower than the input count
+	//! In case we delete rows that were already deleted
+	//! Note that "rows" is written to to reflect the row ids that were actually deleted
+	//! i.e. after calling this function, rows will hold [0..actual_delete_count] row ids of the actually deleted tuples
 	idx_t Delete(Transaction &transaction, row_t rows[], idx_t count);
 	void CommitDelete(transaction_t commit_id, row_t rows[], idx_t count);
 
@@ -2450,7 +2455,88 @@ class DefaultTypeGenerator : public DefaultGenerator {
 
 } // namespace duckdb
 
+//===----------------------------------------------------------------------===//
+//                         DuckDB
+//
+// extension_functions.hpp
+//
+//
+//===----------------------------------------------------------------------===//
+
+
+
+
+
+namespace duckdb {
+
+struct ExtensionFunction {
+	char function[48];
+	char extension[48];
+};
+
+static constexpr ExtensionFunction EXTENSION_FUNCTIONS[] = {
+    {"->>", "json"},
+    {"array_to_json", "json"},
+    {"create_fts_index", "fts"},
+    {"dbgen", "tpch"},
+    {"drop_fts_index", "fts"},
+    {"dsdgen", "tpcds"},
+    {"excel_text", "excel"},
+    {"from_json", "json"},
+    {"from_json_strict", "json"},
+    {"from_substrait", "substrait"},
+    {"get_substrait", "substrait"},
+    {"get_substrait_json", "substrait"},
+    {"icu_calendar_names", "icu"},
+    {"icu_sort_key", "icu"},
+    {"json", "json"},
+    {"json_array", "json"},
+    {"json_array_length", "json"},
+    {"json_extract", "json"},
+    {"json_extract_path", "json"},
+    {"json_extract_path_text", "json"},
+    {"json_extract_string", "json"},
+    {"json_group_array", "json"},
+    {"json_group_object", "json"},
+    {"json_group_structure", "json"},
+    {"json_merge_patch", "json"},
+    {"json_object", "json"},
+    {"json_quote", "json"},
+    {"json_structure", "json"},
+    {"json_transform", "json"},
+    {"json_transform_strict", "json"},
+    {"json_type", "json"},
+    {"json_valid", "json"},
+    {"make_timestamptz", "icu"},
+    {"parquet_metadata", "parquet"},
+    {"parquet_scan", "parquet"},
+    {"parquet_schema", "parquet"},
+    {"pg_timezone_names", "icu"},
+    {"postgres_attach", "postgres_scanner"},
+    {"postgres_scan", "postgres_scanner"},
+    {"postgres_scan_pushdown", "postgres_scanner"},
+    {"read_json_objects", "json"},
+    {"read_ndjson_objects", "json"},
+    {"read_parquet", "parquet"},
+    {"row_to_json", "json"},
+    {"sqlite_attach", "sqlite_scanner"},
+    {"sqlite_scan", "sqlite_scanner"},
+    {"stem", "fts"},
+    {"text", "excel"},
+    {"to_json", "json"},
+    {"tpcds", "tpcds"},
+    {"tpcds_answers", "tpcds"},
+    {"tpcds_queries", "tpcds"},
+    {"tpch", "tpch"},
+    {"tpch_answers", "tpch"},
+    {"tpch_queries", "tpch"},
+    {"visualize_diff_profiling_output", "visualizer"},
+    {"visualize_json_profiling_output", "visualizer"},
+    {"visualize_last_profiling_output", "visualizer"},
+};
+} // namespace duckdb
 
+#include <algorithm>
 namespace duckdb {
 
 string SimilarCatalogEntry::GetQualifiedName() const {
@@ -2653,6 +2739,16 @@ SimilarCatalogEntry Catalog::SimilarEntryInSchemas(ClientContext &context, const
 	return {most_similar.first, most_similar.second, schema_of_most_similar};
 }
 
+string FindExtension(const string &function_name) {
+	auto size = sizeof(EXTENSION_FUNCTIONS) / sizeof(ExtensionFunction);
+	auto it = std::lower_bound(
+	    EXTENSION_FUNCTIONS, EXTENSION_FUNCTIONS + size, function_name,
+	    [](const ExtensionFunction &element, const string &value) { return element.function < value; });
+	if (it != EXTENSION_FUNCTIONS + size && it->function == function_name) {
+		return it->extension;
+	}
+	return "";
+}
 CatalogException Catalog::CreateMissingEntryException(ClientContext &context, const string &entry_name,
                                                       CatalogType type, const vector<SchemaCatalogEntry *> &schemas,
                                                       QueryErrorContext error_context) {
@@ -2666,7 +2762,12 @@ CatalogException Catalog::CreateMissingEntryException(ClientContext &context, co
 		}
 	});
 	auto unseen_entry = SimilarEntryInSchemas(context, entry_name, type, unseen_schemas);
-
+	auto extension_name = FindExtension(entry_name);
+	if (!extension_name.empty()) {
+		return CatalogException("Function with name %s is not on the catalog, but it exists in the %s extension. To "
+		                        "Install and Load the extension, run: INSTALL %s; LOAD %s;",
+		                        entry_name, extension_name, extension_name, extension_name);
+	}
 	string did_you_mean;
 	if (unseen_entry.Found() && unseen_entry.distance < entry.distance) {
 		did_you_mean = "\nDid you mean \"" + unseen_entry.GetQualifiedName() + "\"?";
@@ -3820,7 +3921,7 @@ class DataTable {
 	DataTable(ClientContext &context, DataTable &parent, idx_t changed_idx, const LogicalType &target_type,
 	          vector<column_t> bound_columns, Expression &cast_expr);
 	//! Constructs a DataTable as a delta on an existing data table but with one column added new constraint
-	DataTable(ClientContext &context, DataTable &parent, unique_ptr<Constraint> constraint);
+	DataTable(ClientContext &context, DataTable &parent, unique_ptr<BoundConstraint> constraint);
 
 	shared_ptr<DataTableInfo> info;
 
@@ -3905,7 +4006,9 @@ class DataTable {
 		this->is_root = true;
 	}
 
+	//! Get statistics of a physical column within the table
 	unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, column_t column_id);
+	//! Sets statistics of a physical column within the table
 	void SetStatistics(column_t column_id, const std::function<void(BaseStatistics &)> &set_fun);
 
 	//! Checkpoint the table to the specified table data writer
@@ -3923,7 +4026,7 @@ class DataTable {
 
 private:
 	//! Verify the new added constraints against current persistent&local data
-	void VerifyNewConstraint(ClientContext &context, DataTable &parent, const Constraint *constraint);
+	void VerifyNewConstraint(ClientContext &context, DataTable &parent, const BoundConstraint *constraint);
 	//! Verify constraints with a chunk from the Append containing all columns of the table
 	void VerifyAppendConstraints(TableCatalogEntry &table, ClientContext &context, DataChunk &chunk);
 	//! Verify constraints with a chunk from the Update containing only the specified column_ids
@@ -6419,6 +6522,19 @@ idx_t TableCatalogEntry::StandardColumnCount() const {
 	return count;
 }
 
+unique_ptr<BaseStatistics> TableCatalogEntry::GetStatistics(ClientContext &context, column_t column_id) {
+	if (column_id == COLUMN_IDENTIFIER_ROW_ID) {
+		return nullptr;
+	}
+	if (column_id >= columns.size()) {
+		throw InternalException("TableCatalogEntry::GetStatistics column_id out of range");
+	}
+	if (columns[column_id].Generated()) {
+		return nullptr;
+	}
+	return storage->GetStatistics(context, columns[column_id].StorageOid());
+}
+
 unique_ptr<CatalogEntry> TableCatalogEntry::AlterEntry(ClientContext &context, AlterInfo *info) {
 	D_ASSERT(!internal);
 	if (info->type != AlterType::ALTER_TABLE) {
@@ -6486,6 +6602,9 @@ static void RenameExpression(ParsedExpression &expr, RenameColumnInfo &info) {
 
 unique_ptr<CatalogEntry> TableCatalogEntry::RenameColumn(ClientContext &context, RenameColumnInfo &info) {
 	auto rename_idx = GetColumnIndex(info.old_name);
+	if (rename_idx == COLUMN_IDENTIFIER_ROW_ID) {
+		throw CatalogException("Cannot rename rowid column");
+	}
 	auto create_info = make_unique<CreateTableInfo>(schema->name, name);
 	create_info->temporary = temporary;
 	for (idx_t i = 0; i < columns.size(); i++) {
@@ -6588,6 +6707,9 @@ unique_ptr<CatalogEntry> TableCatalogEntry::AddColumn(ClientContext &context, Ad
 unique_ptr<CatalogEntry> TableCatalogEntry::RemoveColumn(ClientContext &context, RemoveColumnInfo &info) {
 	auto removed_index = GetColumnIndex(info.removed_column, info.if_column_exists);
 	if (removed_index == DConstants::INVALID_INDEX) {
+		if (!info.if_column_exists) {
+			throw CatalogException("Cannot drop column: rowid column cannot be dropped");
+		}
 		return nullptr;
 	}
 
@@ -6694,7 +6816,7 @@ unique_ptr<CatalogEntry> TableCatalogEntry::RemoveColumn(ClientContext &context,
 		return make_unique<TableCatalogEntry>(catalog, schema, (BoundCreateTableInfo *)bound_create_info.get(),
 		                                      storage);
 	}
-	auto new_storage = make_shared<DataTable>(context, *storage, removed_index);
+	auto new_storage = make_shared<DataTable>(context, *storage, columns[removed_index].StorageOid());
 	return make_unique<TableCatalogEntry>(catalog, schema, (BoundCreateTableInfo *)bound_create_info.get(),
 	                                      new_storage);
 }
@@ -6702,13 +6824,18 @@ unique_ptr<CatalogEntry> TableCatalogEntry::RemoveColumn(ClientContext &context,
 unique_ptr<CatalogEntry> TableCatalogEntry::SetDefault(ClientContext &context, SetDefaultInfo &info) {
 	auto create_info = make_unique<CreateTableInfo>(schema->name, name);
 	auto default_idx = GetColumnIndex(info.column_name);
+	if (default_idx == COLUMN_IDENTIFIER_ROW_ID) {
+		throw CatalogException("Cannot SET DEFAULT for rowid column");
+	}
 
 	// Copy all the columns, changing the value of the one that was specified by 'column_name'
 	for (idx_t i = 0; i < columns.size(); i++) {
 		auto copy = columns[i].Copy();
 		if (default_idx == i) {
 			// set the default value of this column
-			D_ASSERT(!copy.Generated()); // Shouldnt reach here - DEFAULT value isn't supported for Generated Columns
+			if (copy.Generated()) {
+				throw BinderException("Cannot SET DEFAULT for generated column \"%s\"", columns[i].Name());
+			}
 			copy.SetDefaultValue(info.expression ? info.expression->Copy() : nullptr);
 		}
 		create_info->columns.push_back(move(copy));
@@ -6733,6 +6860,9 @@ unique_ptr<CatalogEntry> TableCatalogEntry::SetNotNull(ClientContext &context, S
 	}
 
 	idx_t not_null_idx = GetColumnIndex(info.column_name);
+	if (columns[not_null_idx].Generated()) {
+		throw BinderException("Unsupported constraint for generated column!");
+	}
 	bool has_not_null = false;
 	for (idx_t i = 0; i < constraints.size(); i++) {
 		auto constraint = constraints[i]->Copy();
@@ -6756,8 +6886,9 @@ unique_ptr<CatalogEntry> TableCatalogEntry::SetNotNull(ClientContext &context, S
 		                                      storage);
 	}
 
-	// Return with new storage info
-	auto new_storage = make_shared<DataTable>(context, *storage, make_unique<NotNullConstraint>(not_null_idx));
+	// Return with new storage info. Note that we need the bound column index here.
+	auto new_storage = make_shared<DataTable>(context, *storage,
+	                                          make_unique<BoundNotNullConstraint>(columns[not_null_idx].StorageOid()));
 	return make_unique<TableCatalogEntry>(catalog, schema, (BoundCreateTableInfo *)bound_create_info.get(),
 	                                      new_storage);
 }
@@ -6863,12 +6994,19 @@ unique_ptr<CatalogEntry> TableCatalogEntry::ChangeColumnType(ClientContext &cont
 	auto expression = info.expression->Copy();
 	auto bound_expression = expr_binder.Bind(expression);
 	auto bound_create_info = binder->BindCreateTableInfo(move(create_info));
+	vector<column_t> storage_oids;
 	if (bound_columns.empty()) {
-		bound_columns.push_back(COLUMN_IDENTIFIER_ROW_ID);
+		storage_oids.push_back(COLUMN_IDENTIFIER_ROW_ID);
+	}
+	// transform to storage_oid
+	else {
+		for (idx_t i = 0; i < bound_columns.size(); i++) {
+			storage_oids.push_back(columns[bound_columns[i]].StorageOid());
+		}
 	}
 
-	auto new_storage =
-	    make_shared<DataTable>(context, *storage, change_idx, info.target_type, move(bound_columns), *bound_expression);
+	auto new_storage = make_shared<DataTable>(context, *storage, columns[change_idx].StorageOid(), info.target_type,
+	                                          move(storage_oids), *bound_expression);
 	auto result =
 	    make_unique<TableCatalogEntry>(catalog, schema, (BoundCreateTableInfo *)bound_create_info.get(), new_storage);
 	return move(result);
@@ -7116,7 +7254,7 @@ void TableCatalogEntry::CommitAlter(AlterInfo &info) {
 		}
 	}
 	D_ASSERT(removed_index != DConstants::INVALID_INDEX);
-	storage->CommitDropColumn(removed_index);
+	storage->CommitDropColumn(columns[removed_index].StorageOid());
 }
 
 void TableCatalogEntry::CommitDrop() {
@@ -7686,11 +7824,13 @@ bool CatalogSet::AlterEntry(ClientContext &context, const string &name, AlterInf
 				throw CatalogException(rename_err_msg, original_name, value->name);
 			}
 		}
+	}
+
+	if (value->name != original_name) {
+		// Do PutMapping and DeleteMapping after dependency check
 		PutMapping(context, value->name, entry_index);
 		DeleteMapping(context, original_name);
 	}
-	//! Check the dependency manager to verify that there are no conflicting dependencies with this alter
-	catalog.dependency_manager->AlterObject(context, entry, value.get());
 
 	value->timestamp = transaction.transaction_id;
 	value->child = move(entries[entry_index]);
@@ -7702,10 +7842,18 @@ bool CatalogSet::AlterEntry(ClientContext &context, const string &name, AlterInf
 	alter_info->Serialize(serializer);
 	BinaryData serialized_alter = serializer.GetData();
 
+	auto new_entry = value.get();
+
 	// push the old entry in the undo buffer for this transaction
 	transaction.PushCatalogEntry(value->child.get(), serialized_alter.data.get(), serialized_alter.size);
 	entries[entry_index] = move(value);
 
+	// Check the dependency manager to verify that there are no conflicting dependencies with this alter
+	// Note that we do this AFTER the new entry has been entirely set up in the catalog set
+	// that is because in case the alter fails because of a dependency conflict, we need to be able to cleanly roll back
+	// to the old entry.
+	catalog.dependency_manager->AlterObject(context, entry, new_entry);
+
 	return true;
 }
 
@@ -9370,7 +9518,7 @@ static void GetBitPosition(idx_t row_idx, idx_t &current_byte, uint8_t &current_
 }
 
 static void UnsetBit(uint8_t *data, idx_t current_byte, uint8_t current_bit) {
-	data[current_byte] &= ~(1 << current_bit);
+	data[current_byte] &= ~((uint64_t)1 << current_bit);
 }
 
 static void NextBit(idx_t &current_byte, uint8_t &current_bit) {
@@ -12426,6 +12574,8 @@ void Exception::ThrowAsTypeWithMessage(ExceptionType type, const string &message
 		throw ParameterNotAllowedException(message);
 	case ExceptionType::PARAMETER_NOT_RESOLVED:
 		throw ParameterNotResolvedException();
+	case ExceptionType::FATAL:
+		throw FatalException(message);
 	default:
 		throw Exception(type, message);
 	}
@@ -19691,9 +19841,15 @@ string FileSystem::ConvertSeparators(const string &path) {
 }
 
 string FileSystem::ExtractBaseName(const string &path) {
+	if (path.empty()) {
+		return string();
+	}
 	auto normalized_path = ConvertSeparators(path);
 	auto sep = PathSeparator();
-	auto vec = StringUtil::Split(StringUtil::Split(normalized_path, sep).back(), ".");
+	auto splits = StringUtil::Split(normalized_path, sep);
+	D_ASSERT(!splits.empty());
+	auto vec = StringUtil::Split(splits.back(), ".");
+	D_ASSERT(!vec.empty());
 	return vec[0];
 }
 
@@ -23352,12 +23508,16 @@ class HivePartitioning {
 public:
 	//! Parse a filename that follows the hive partitioning scheme
 	DUCKDB_API static std::map<string, string> Parse(string &filename);
+	DUCKDB_API static std::map<string, string> Parse(string &filename, duckdb_re2::RE2 &regex);
 	//! Prunes a list of filenames based on a set of filters, can be used by TableFunctions in the
 	//! pushdown_complex_filter function to skip files with filename-based filters. Also removes the filters that always
 	//! evaluate to true.
 	DUCKDB_API static void ApplyFiltersToFileList(vector<string> &files, vector<unique_ptr<Expression>> &filters,
 	                                              unordered_map<string, column_t> &column_map, idx_t table_index,
 	                                              bool hive_enabled, bool filename_enabled);
+
+	//! Returns the compiled regex pattern to match hive partitions
+	DUCKDB_API static const string REGEX_STRING;
 };
 
 } // namespace duckdb
@@ -23375,7 +23535,8 @@ namespace duckdb {
 
 static unordered_map<column_t, string> GetKnownColumnValues(string &filename,
                                                             unordered_map<string, column_t> &column_map,
-                                                            bool filename_col, bool hive_partition_cols) {
+                                                            duckdb_re2::RE2 &compiled_regex, bool filename_col,
+                                                            bool hive_partition_cols) {
 	unordered_map<column_t, string> result;
 
 	if (filename_col) {
@@ -23386,7 +23547,7 @@ static unordered_map<column_t, string> GetKnownColumnValues(string &filename,
 	}
 
 	if (hive_partition_cols) {
-		auto partitions = HivePartitioning::Parse(filename);
+		auto partitions = HivePartitioning::Parse(filename, compiled_regex);
 		for (auto &partition : partitions) {
 			auto lookup_column_id = column_map.find(partition.first);
 			if (lookup_column_id != column_map.end()) {
@@ -23424,10 +23585,10 @@ static void ConvertKnownColRefToConstants(unique_ptr<Expression> &expr,
 // 	- s3://bucket/var1=value1/bla/bla/var2=value2
 //  - http(s)://domain(:port)/lala/kasdl/var1=value1/?not-a-var=not-a-value
 //  - folder/folder/folder/../var1=value1/etc/.//var2=value2
-std::map<string, string> HivePartitioning::Parse(string &filename) {
-	std::map<string, string> result;
+const string HivePartitioning::REGEX_STRING = "[\\/\\\\]([^\\/\\?\\\\]+)=([^\\/\\n\\?\\\\]+)";
 
-	string regex = "[\\/\\\\]([^\\/\\?\\\\]+)=([^\\/\\n\\?\\\\]+)";
+std::map<string, string> HivePartitioning::Parse(string &filename, duckdb_re2::RE2 &regex) {
+	std::map<string, string> result;
 	duckdb_re2::StringPiece input(filename); // Wrap a StringPiece around it
 
 	string var;
@@ -23438,6 +23599,11 @@ std::map<string, string> HivePartitioning::Parse(string &filename) {
 	return result;
 }
 
+std::map<string, string> HivePartitioning::Parse(string &filename) {
+	duckdb_re2::RE2 regex(REGEX_STRING);
+	return Parse(filename, regex);
+}
+
 // TODO: this can still be improved by removing the parts of filter expressions that are true for all remaining files.
 //		 currently, only expressions that cannot be evaluated during pushdown are removed.
 void HivePartitioning::ApplyFiltersToFileList(vector<string> &files, vector<unique_ptr<Expression>> &filters,
@@ -23445,6 +23611,7 @@ void HivePartitioning::ApplyFiltersToFileList(vector<string> &files, vector<uniq
                                               bool hive_enabled, bool filename_enabled) {
 	vector<string> pruned_files;
 	vector<unique_ptr<Expression>> pruned_filters;
+	duckdb_re2::RE2 regex(REGEX_STRING);
 
 	if ((!filename_enabled && !hive_enabled) || filters.empty()) {
 		return;
@@ -23453,7 +23620,7 @@ void HivePartitioning::ApplyFiltersToFileList(vector<string> &files, vector<uniq
 	for (idx_t i = 0; i < files.size(); i++) {
 		auto &file = files[i];
 		bool should_prune_file = false;
-		auto known_values = GetKnownColumnValues(file, column_map, filename_enabled, hive_enabled);
+		auto known_values = GetKnownColumnValues(file, column_map, regex, filename_enabled, hive_enabled);
 
 		FilterCombiner combiner;
 		for (auto &filter : filters) {
@@ -23726,6 +23893,8 @@ class LocalFileSystem : public FileSystem {
 	//! Set the file pointer of a file handle to a specified location. Reads and writes will happen from this location
 	void SetFilePointer(FileHandle &handle, idx_t location);
 	idx_t GetFilePointer(FileHandle &handle);
+
+	vector<string> FetchFileWithoutGlob(const string &path, FileOpener *opener, bool absolute_path);
 };
 
 } // namespace duckdb
@@ -24607,6 +24776,26 @@ static void GlobFiles(FileSystem &fs, const string &path, const string &glob, bo
 	});
 }
 
+vector<string> LocalFileSystem::FetchFileWithoutGlob(const string &path, FileOpener *opener, bool absolute_path) {
+	vector<string> result;
+	if (FileExists(path) || IsPipe(path)) {
+		result.push_back(path);
+	} else if (!absolute_path) {
+		Value value;
+		if (opener->TryGetCurrentSetting("file_search_path", value)) {
+			auto search_paths_str = value.ToString();
+			std::vector<std::string> search_paths = StringUtil::Split(search_paths_str, ',');
+			for (const auto &search_path : search_paths) {
+				auto joined_path = JoinPath(search_path, path);
+				if (FileExists(joined_path) || IsPipe(joined_path)) {
+					result.push_back(joined_path);
+				}
+			}
+		}
+	}
+	return result;
+}
+
 vector<string> LocalFileSystem::Glob(const string &path, FileOpener *opener) {
 	if (path.empty()) {
 		return vector<string>();
@@ -24653,23 +24842,7 @@ vector<string> LocalFileSystem::Glob(const string &path, FileOpener *opener) {
 	// Check if the path has a glob at all
 	if (!HasGlob(path)) {
 		// no glob: return only the file (if it exists or is a pipe)
-		vector<string> result;
-		if (FileExists(path) || IsPipe(path)) {
-			result.push_back(path);
-		} else if (!absolute_path) {
-			Value value;
-			if (opener->TryGetCurrentSetting("file_search_path", value)) {
-				auto search_paths_str = value.ToString();
-				std::vector<std::string> search_paths = StringUtil::Split(search_paths_str, ',');
-				for (const auto &search_path : search_paths) {
-					auto joined_path = JoinPath(search_path, path);
-					if (FileExists(joined_path) || IsPipe(joined_path)) {
-						result.push_back(joined_path);
-					}
-				}
-			}
-		}
-		return result;
+		return FetchFileWithoutGlob(path, opener, absolute_path);
 	}
 	vector<string> previous_directories;
 	if (absolute_path) {
@@ -24703,7 +24876,12 @@ vector<string> LocalFileSystem::Glob(const string &path, FileOpener *opener) {
 				}
 			}
 		}
-		if (is_last_chunk || result.empty()) {
+		if (result.empty()) {
+			// no result found that matches the glob
+			// last ditch effort: search the path as a string literal
+			return FetchFileWithoutGlob(path, opener, absolute_path);
+		}
+		if (is_last_chunk) {
 			return result;
 		}
 		previous_directories = move(result);
@@ -27052,14 +27230,16 @@ struct IntervalToStringCast {
 			if (micros < 0) {
 				// negative time: append negative sign
 				buffer[length++] = '-';
+			} else {
 				micros = -micros;
 			}
-			int64_t hour = micros / Interval::MICROS_PER_HOUR;
-			micros -= hour * Interval::MICROS_PER_HOUR;
-			int64_t min = micros / Interval::MICROS_PER_MINUTE;
-			micros -= min * Interval::MICROS_PER_MINUTE;
-			int64_t sec = micros / Interval::MICROS_PER_SEC;
-			micros -= sec * Interval::MICROS_PER_SEC;
+			int64_t hour = -(micros / Interval::MICROS_PER_HOUR);
+			micros += hour * Interval::MICROS_PER_HOUR;
+			int64_t min = -(micros / Interval::MICROS_PER_MINUTE);
+			micros += min * Interval::MICROS_PER_MINUTE;
+			int64_t sec = -(micros / Interval::MICROS_PER_SEC);
+			micros += sec * Interval::MICROS_PER_SEC;
+			micros = -micros;
 
 			if (hour < 10) {
 				buffer[length++] = '0';
@@ -33182,7 +33362,7 @@ template <idx_t radix_bits>
 struct RadixPartitioningConstants {
 public:
 	static constexpr const idx_t NUM_RADIX_BITS = radix_bits;
-	static constexpr const idx_t NUM_PARTITIONS = 1 << NUM_RADIX_BITS;
+	static constexpr const idx_t NUM_PARTITIONS = (idx_t)1 << NUM_RADIX_BITS;
 	static constexpr const idx_t TMP_BUF_SIZE = 8;
 
 public:
@@ -33200,7 +33380,7 @@ struct RadixPartitioningConstants {
 struct RadixPartitioning {
 public:
 	static idx_t NumberOfPartitions(idx_t radix_bits) {
-		return 1 << radix_bits;
+		return (idx_t)1 << radix_bits;
 	}
 
 	//! Partition the data in block_collection/string_heap to multiple partitions
@@ -37960,6 +38140,9 @@ void RowOperations::UnswizzleHeapPointer(const RowLayout &layout, const data_ptr
 
 static inline void VerifyUnswizzledString(const RowLayout &layout, const idx_t &col_idx, const data_ptr_t &row_ptr) {
 #ifdef DEBUG
+	if (layout.GetTypes()[col_idx] == LogicalTypeId::BLOB) {
+		return;
+	}
 	idx_t entry_idx;
 	idx_t idx_in_entry;
 	ValidityBytes::GetEntryIndex(col_idx, entry_idx, idx_in_entry);
@@ -40297,7 +40480,10 @@ struct SortConstants {
 
 struct SortLayout {
 public:
+	SortLayout() {
+	}
 	explicit SortLayout(const vector<BoundOrderByNode> &orders);
+	SortLayout GetPrefixComparisonLayout(idx_t num_prefix_cols) const;
 
 public:
 	idx_t column_count;
@@ -42042,6 +42228,32 @@ SortLayout::SortLayout(const vector<BoundOrderByNode> &orders)
 	blob_layout.Initialize(blob_layout_types);
 }
 
+SortLayout SortLayout::GetPrefixComparisonLayout(idx_t num_prefix_cols) const {
+	SortLayout result;
+	result.column_count = num_prefix_cols;
+	result.all_constant = true;
+	result.comparison_size = 0;
+	for (idx_t col_idx = 0; col_idx < num_prefix_cols; col_idx++) {
+		result.order_types.push_back(order_types[col_idx]);
+		result.order_by_null_types.push_back(order_by_null_types[col_idx]);
+		result.logical_types.push_back(logical_types[col_idx]);
+
+		result.all_constant = result.all_constant && constant_size[col_idx];
+		result.constant_size.push_back(constant_size[col_idx]);
+
+		result.comparison_size += column_sizes[col_idx];
+		result.column_sizes.push_back(column_sizes[col_idx]);
+
+		result.prefix_lengths.push_back(prefix_lengths[col_idx]);
+		result.stats.push_back(stats[col_idx]);
+		result.has_null.push_back(has_null[col_idx]);
+	}
+	result.entry_size = entry_size;
+	result.blob_layout = blob_layout;
+	result.sorting_to_blob_col = sorting_to_blob_col;
+	return result;
+}
+
 LocalSortState::LocalSortState() : initialized(false) {
 }
 
@@ -44121,7 +44333,7 @@ class PhysicalHashAggregate : public PhysicalOperator {
 namespace duckdb {
 
 enum class UnicodeType { INVALID, ASCII, UNICODE };
-enum class UnicodeInvalidReason { BYTE_MISMATCH, NULL_BYTE };
+enum class UnicodeInvalidReason { BYTE_MISMATCH, NULL_BYTE, INVALID_UNICODE };
 
 class Utf8Proc {
 public:
@@ -52291,11 +52503,36 @@ Value Value::CreateValue(dtime_t value) {
 	return Value::TIME(value);
 }
 
+template <>
+Value Value::CreateValue(dtime_tz_t value) {
+	return Value::TIMETZ(value);
+}
+
 template <>
 Value Value::CreateValue(timestamp_t value) {
 	return Value::TIMESTAMP(value);
 }
 
+template <>
+Value Value::CreateValue(timestamp_sec_t value) {
+	return Value::TIMESTAMPSEC(value);
+}
+
+template <>
+Value Value::CreateValue(timestamp_ms_t value) {
+	return Value::TIMESTAMPMS(value);
+}
+
+template <>
+Value Value::CreateValue(timestamp_ns_t value) {
+	return Value::TIMESTAMPNS(value);
+}
+
+template <>
+Value Value::CreateValue(timestamp_tz_t value) {
+	return Value::TIMESTAMPTZ(value);
+}
+
 template <>
 Value Value::CreateValue(const char *value) {
 	return Value(string(value));
@@ -53868,19 +54105,6 @@ void Vector::Resize(idx_t cur_size, idx_t new_size) {
 	}
 }
 
-// FIXME Just like DECIMAL, it's important that type_info gets considered when determining whether or not to cast
-// just comparing internal type is not always enough
-static bool ValueShouldBeCast(const LogicalType &incoming, const LogicalType &target) {
-	if (incoming.InternalType() != target.InternalType()) {
-		return true;
-	}
-	if (incoming.id() == LogicalTypeId::DECIMAL && incoming.id() == target.id()) {
-		//! Compare the type_info
-		return incoming != target;
-	}
-	return false;
-}
-
 void Vector::SetValue(idx_t index, const Value &val) {
 	if (GetVectorType() == VectorType::DICTIONARY_VECTOR) {
 		// dictionary: apply dictionary and forward to child
@@ -53888,10 +54112,11 @@ void Vector::SetValue(idx_t index, const Value &val) {
 		auto &child = DictionaryVector::Child(*this);
 		return child.SetValue(sel_vector.get_index(index), val);
 	}
-	if (ValueShouldBeCast(val.type(), GetType())) {
+	if (val.type() != GetType()) {
 		SetValue(index, val.CastAs(GetType()));
 		return;
 	}
+	D_ASSERT(val.type().InternalType() == GetType().InternalType());
 
 	validity.EnsureWritable();
 	validity.Set(index, !val.IsNull());
@@ -54142,7 +54367,10 @@ Value Vector::GetValue(const Vector &v_p, idx_t index_p) {
 	auto value = GetValueInternal(v_p, index_p);
 	// set the alias of the type to the correct value, if there is a type alias
 	if (v_p.GetType().HasAlias()) {
-		value.type().SetAlias(v_p.GetType().GetAlias());
+		value.type().CopyAuxInfo(v_p.GetType());
+	}
+	if (v_p.GetType().id() != LogicalTypeId::AGGREGATE_STATE && value.type().id() != LogicalTypeId::AGGREGATE_STATE) {
+		D_ASSERT(v_p.GetType() == value.type());
 	}
 	return value;
 }
@@ -54934,6 +55162,24 @@ void StringVector::AddHeapReference(Vector &vector, Vector &other) {
 	StringVector::AddBuffer(vector, other.auxiliary);
 }
 
+Vector &MapVector::GetKeys(Vector &vector) {
+	auto &entries = StructVector::GetEntries(vector);
+	D_ASSERT(entries.size() == 2);
+	return *entries[0];
+}
+Vector &MapVector::GetValues(Vector &vector) {
+	auto &entries = StructVector::GetEntries(vector);
+	D_ASSERT(entries.size() == 2);
+	return *entries[1];
+}
+
+const Vector &MapVector::GetKeys(const Vector &vector) {
+	return GetKeys((Vector &)vector);
+}
+const Vector &MapVector::GetValues(const Vector &vector) {
+	return GetValues((Vector &)vector);
+}
+
 vector<unique_ptr<Vector>> &StructVector::GetEntries(Vector &vector) {
 	D_ASSERT(vector.GetType().id() == LogicalTypeId::STRUCT || vector.GetType().id() == LogicalTypeId::MAP);
 	if (vector.GetVectorType() == VectorType::DICTIONARY_VECTOR) {
@@ -56209,6 +56455,7 @@ struct ExtraTypeInfo {
 				if (!alias.empty()) {
 					return false;
 				}
+				//! We only need to compare aliases when both types have them in this case
 				return true;
 			}
 			if (alias != other_p->alias) {
@@ -56222,8 +56469,7 @@ struct ExtraTypeInfo {
 		if (type != other_p->type) {
 			return false;
 		}
-		auto &other = (ExtraTypeInfo &)*other_p;
-		return alias == other.alias && EqualsInternal(other_p);
+		return alias == other_p->alias && EqualsInternal(other_p);
 	}
 	//! Serializes a ExtraTypeInfo to a stand-alone binary blob
 	virtual void Serialize(FieldWriter &writer) const {};
@@ -56902,10 +57148,7 @@ LogicalType LogicalType::Deserialize(Deserializer &source) {
 	return LogicalType(id, move(info));
 }
 
-bool LogicalType::operator==(const LogicalType &rhs) const {
-	if (id_ != rhs.id_) {
-		return false;
-	}
+bool LogicalType::EqualTypeInfo(const LogicalType &rhs) const {
 	if (type_info_.get() == rhs.type_info_.get()) {
 		return true;
 	}
@@ -56917,6 +57160,13 @@ bool LogicalType::operator==(const LogicalType &rhs) const {
 	}
 }
 
+bool LogicalType::operator==(const LogicalType &rhs) const {
+	if (id_ != rhs.id_) {
+		return false;
+	}
+	return EqualTypeInfo(rhs);
+}
+
 } // namespace duckdb
 
 
@@ -67787,6 +68037,16 @@ bool DistinctAggregateData::IsDistinct(idx_t index) const {
 
 
 
+//===----------------------------------------------------------------------===//
+//                         DuckDB
+//
+// duckdb/parallel/base_pipeline_event.hpp
+//
+//
+//===----------------------------------------------------------------------===//
+
+
+
 //===----------------------------------------------------------------------===//
 //                         DuckDB
 //
@@ -67860,6 +68120,22 @@ class Event : public std::enable_shared_from_this<Event> {
 
 
 
+namespace duckdb {
+
+//! A BasePipelineEvent is used as the basis of any event that belongs to a specific pipeline
+class BasePipelineEvent : public Event {
+public:
+	BasePipelineEvent(shared_ptr<Pipeline> pipeline);
+	BasePipelineEvent(Pipeline &pipeline);
+
+	//! The pipeline that this event belongs to
+	shared_ptr<Pipeline> pipeline;
+};
+
+} // namespace duckdb
+
+
+
 namespace duckdb {
 
 PhysicalHashAggregate::PhysicalHashAggregate(ClientContext &context, vector<LogicalType> types,
@@ -68016,16 +68292,15 @@ void PhysicalHashAggregate::Combine(ExecutionContext &context, GlobalSinkState &
 	}
 }
 
-class HashAggregateFinalizeEvent : public Event {
+class HashAggregateFinalizeEvent : public BasePipelineEvent {
 public:
 	HashAggregateFinalizeEvent(const PhysicalHashAggregate &op_p, HashAggregateGlobalState &gstate_p,
 	                           Pipeline *pipeline_p)
-	    : Event(pipeline_p->executor), op(op_p), gstate(gstate_p), pipeline(pipeline_p) {
+	    : BasePipelineEvent(*pipeline_p), op(op_p), gstate(gstate_p) {
 	}
 
 	const PhysicalHashAggregate &op;
 	HashAggregateGlobalState &gstate;
-	Pipeline *pipeline;
 
 public:
 	void Schedule() override {
@@ -69362,15 +69637,14 @@ class DistinctAggregateFinalizeTask : public ExecutorTask {
 };
 
 // TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded
-class DistinctAggregateFinalizeEvent : public Event {
+class DistinctAggregateFinalizeEvent : public BasePipelineEvent {
 public:
 	DistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p, UngroupedAggregateGlobalState &gstate_p,
-	                               Pipeline *pipeline_p, ClientContext &context)
-	    : Event(pipeline_p->executor), op(op_p), gstate(gstate_p), pipeline(pipeline_p), context(context) {
+	                               Pipeline &pipeline_p, ClientContext &context)
+	    : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) {
 	}
 	const PhysicalUngroupedAggregate &op;
 	UngroupedAggregateGlobalState &gstate;
-	Pipeline *pipeline;
 	ClientContext &context;
 
 public:
@@ -69383,16 +69657,15 @@ class DistinctAggregateFinalizeEvent : public Event {
 	}
 };
 
-class DistinctCombineFinalizeEvent : public Event {
+class DistinctCombineFinalizeEvent : public BasePipelineEvent {
 public:
 	DistinctCombineFinalizeEvent(const PhysicalUngroupedAggregate &op_p, UngroupedAggregateGlobalState &gstate_p,
-	                             Pipeline *pipeline_p, ClientContext &client)
-	    : Event(pipeline_p->executor), op(op_p), gstate(gstate_p), pipeline(pipeline_p), client(client) {
+	                             Pipeline &pipeline_p, ClientContext &client)
+	    : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), client(client) {
 	}
 
 	const PhysicalUngroupedAggregate &op;
 	UngroupedAggregateGlobalState &gstate;
-	Pipeline *pipeline;
 	ClientContext &client;
 
 public:
@@ -69408,7 +69681,7 @@ class DistinctCombineFinalizeEvent : public Event {
 		SetTasks(move(tasks));
 
 		//! Now that all tables are combined, it's time to do the distinct aggregations
-		auto new_event = make_shared<DistinctAggregateFinalizeEvent>(op, gstate, pipeline, client);
+		auto new_event = make_shared<DistinctAggregateFinalizeEvent>(op, gstate, *pipeline, client);
 		this->InsertEvent(move(new_event));
 	}
 };
@@ -69437,12 +69710,12 @@ SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline
 		}
 	}
 	if (any_partitioned) {
-		auto new_event = make_shared<DistinctCombineFinalizeEvent>(*this, gstate, &pipeline, context);
+		auto new_event = make_shared<DistinctCombineFinalizeEvent>(*this, gstate, pipeline, context);
 		event.InsertEvent(move(new_event));
 	} else {
 		//! Hashtables aren't partitioned, they dont need to be joined first
 		//! So we can compute the aggregate already
-		auto new_event = make_shared<DistinctAggregateFinalizeEvent>(*this, gstate, &pipeline, context);
+		auto new_event = make_shared<DistinctAggregateFinalizeEvent>(*this, gstate, pipeline, context);
 		event.InsertEvent(move(new_event));
 	}
 	return SinkFinalizeType::READY;
@@ -69607,6 +69880,7 @@ class PhysicalWindow : public PhysicalOperator {
 
 
 
+
 //===----------------------------------------------------------------------===//
 //                         DuckDB
 //
@@ -69702,7 +69976,6 @@ class WindowSegmentTree {
 
 
 
-
 #include <algorithm>
 #include <cmath>
 #include <numeric>
@@ -69720,12 +69993,14 @@ class WindowGlobalHashGroup {
 
 	WindowGlobalHashGroup(BufferManager &buffer_manager, const Orders &partitions, const Orders &orders,
 	                      const Types &payload_types, idx_t max_mem, bool external)
-	    : memory_per_thread(max_mem), count(0), partition_layout(partitions) {
+	    : memory_per_thread(max_mem), count(0) {
 
 		RowLayout payload_layout;
 		payload_layout.Initialize(payload_types);
 		global_sort = make_unique<GlobalSortState>(buffer_manager, orders, payload_layout);
 		global_sort->external = external;
+
+		partition_layout = global_sort->sort_layout.GetPrefixComparisonLayout(partitions.size());
 	}
 
 	void Combine(LocalSortState &local_sort) {
@@ -70398,7 +70673,10 @@ struct WindowInputExpression {
 
 	inline bool CellIsNull(idx_t i) const {
 		D_ASSERT(!chunk.data.empty());
-		return FlatVector::IsNull(chunk.data[0], scalar ? 0 : i);
+		if (chunk.data[0].GetVectorType() == VectorType::CONSTANT_VECTOR) {
+			return ConstantVector::IsNull(chunk.data[0]);
+		}
+		return FlatVector::IsNull(chunk.data[0], i);
 	}
 
 	inline void CopyCell(Vector &target, idx_t target_offset) const {
@@ -71183,19 +71461,18 @@ class WindowMergeTask : public ExecutorTask {
 	WindowGlobalHashGroup &hash_group;
 };
 
-class WindowMergeEvent : public Event {
+class WindowMergeEvent : public BasePipelineEvent {
 public:
 	WindowMergeEvent(WindowGlobalSinkState &gstate_p, Pipeline &pipeline_p, WindowGlobalHashGroup &hash_group_p)
-	    : Event(pipeline_p.executor), gstate(gstate_p), pipeline(pipeline_p), hash_group(hash_group_p) {
+	    : BasePipelineEvent(pipeline_p), gstate(gstate_p), hash_group(hash_group_p) {
 	}
 
 	WindowGlobalSinkState &gstate;
-	Pipeline &pipeline;
 	WindowGlobalHashGroup &hash_group;
 
 public:
 	void Schedule() override {
-		auto &context = pipeline.GetClientContext();
+		auto &context = pipeline->GetClientContext();
 
 		// Schedule tasks equal to the number of threads, which will each merge multiple partitions
 		auto &ts = TaskScheduler::GetScheduler(context);
@@ -71210,7 +71487,7 @@ class WindowMergeEvent : public Event {
 
 	void FinishEvent() override {
 		hash_group.global_sort->CompleteMergeRound(true);
-		CreateMergeTasks(pipeline, *this, gstate, hash_group);
+		CreateMergeTasks(*pipeline, *this, gstate, hash_group);
 	}
 
 	static void CreateMergeTasks(Pipeline &pipeline, Event &event, WindowGlobalSinkState &state,
@@ -72649,6 +72926,11 @@ class ExtensionHelper {
 
 private:
 	static const vector<string> PathComponents();
+	//! For tagged releases we use the tag, else we use the git commit hash
+	static const string GetVersionDirectoryName();
+	//! Version tags occur with and without 'v', tag in extension path is always with 'v'
+	static const string NormalizeVersionTag(const string &version_tag);
+	static bool IsRelease(const string &version_tag);
 
 private:
 	static ExtensionLoadResult LoadExtensionInternal(DuckDB &db, const std::string &extension, bool initial_load);
@@ -75763,18 +76045,17 @@ class HashJoinFinalizeTask : public ExecutorTask {
 	bool parallel;
 };
 
-class HashJoinFinalizeEvent : public Event {
+class HashJoinFinalizeEvent : public BasePipelineEvent {
 public:
 	HashJoinFinalizeEvent(Pipeline &pipeline_p, HashJoinGlobalSinkState &sink)
-	    : Event(pipeline_p.executor), pipeline(pipeline_p), sink(sink) {
+	    : BasePipelineEvent(pipeline_p), sink(sink) {
 	}
 
-	Pipeline &pipeline;
 	HashJoinGlobalSinkState &sink;
 
 public:
 	void Schedule() override {
-		auto &context = pipeline.GetClientContext();
+		auto &context = pipeline->GetClientContext();
 		auto parallel_construct_count =
 		    context.config.verify_parallelism ? STANDARD_VECTOR_SIZE : PARALLEL_CONSTRUCT_COUNT;
 
@@ -75841,20 +76122,19 @@ class HashJoinPartitionTask : public ExecutorTask {
 	JoinHashTable &local_ht;
 };
 
-class HashJoinPartitionEvent : public Event {
+class HashJoinPartitionEvent : public BasePipelineEvent {
 public:
 	HashJoinPartitionEvent(Pipeline &pipeline_p, HashJoinGlobalSinkState &sink,
 	                       vector<unique_ptr<JoinHashTable>> &local_hts)
-	    : Event(pipeline_p.executor), pipeline(pipeline_p), sink(sink), local_hts(local_hts) {
+	    : BasePipelineEvent(pipeline_p), sink(sink), local_hts(local_hts) {
 	}
 
-	Pipeline &pipeline;
 	HashJoinGlobalSinkState &sink;
 	vector<unique_ptr<JoinHashTable>> &local_hts;
 
 public:
 	void Schedule() override {
-		auto &context = pipeline.GetClientContext();
+		auto &context = pipeline->GetClientContext();
 		vector<unique_ptr<Task>> partition_tasks;
 		partition_tasks.reserve(local_hts.size());
 		for (auto &local_ht : local_hts) {
@@ -75867,7 +76147,7 @@ class HashJoinPartitionEvent : public Event {
 	void FinishEvent() override {
 		local_hts.clear();
 		sink.hash_table->PrepareExternalFinalize();
-		sink.ScheduleFinalize(pipeline, *this);
+		sink.ScheduleFinalize(*pipeline, *this);
 	}
 };
 
@@ -79571,21 +79851,20 @@ class RangeJoinMergeTask : public ExecutorTask {
 	GlobalSortedTable &table;
 };
 
-class RangeJoinMergeEvent : public Event {
+class RangeJoinMergeEvent : public BasePipelineEvent {
 public:
 	using GlobalSortedTable = PhysicalRangeJoin::GlobalSortedTable;
 
 public:
 	RangeJoinMergeEvent(GlobalSortedTable &table_p, Pipeline &pipeline_p)
-	    : Event(pipeline_p.executor), table(table_p), pipeline(pipeline_p) {
+	    : BasePipelineEvent(pipeline_p), table(table_p) {
 	}
 
 	GlobalSortedTable &table;
-	Pipeline &pipeline;
 
 public:
 	void Schedule() override {
-		auto &context = pipeline.GetClientContext();
+		auto &context = pipeline->GetClientContext();
 
 		// Schedule tasks equal to the number of threads, which will each merge multiple partitions
 		auto &ts = TaskScheduler::GetScheduler(context);
@@ -79604,7 +79883,7 @@ class RangeJoinMergeEvent : public Event {
 		global_sort_state.CompleteMergeRound(true);
 		if (global_sort_state.sorted_blocks.size() > 1) {
 			// Multiple blocks remaining: Schedule the next round
-			table.ScheduleMergeTasks(pipeline, *this);
+			table.ScheduleMergeTasks(*pipeline, *this);
 		}
 	}
 };
@@ -79992,18 +80271,17 @@ class PhysicalOrderMergeTask : public ExecutorTask {
 	OrderGlobalState &state;
 };
 
-class OrderMergeEvent : public Event {
+class OrderMergeEvent : public BasePipelineEvent {
 public:
 	OrderMergeEvent(OrderGlobalState &gstate_p, Pipeline &pipeline_p)
-	    : Event(pipeline_p.executor), gstate(gstate_p), pipeline(pipeline_p) {
+	    : BasePipelineEvent(pipeline_p), gstate(gstate_p) {
 	}
 
 	OrderGlobalState &gstate;
-	Pipeline &pipeline;
 
 public:
 	void Schedule() override {
-		auto &context = pipeline.GetClientContext();
+		auto &context = pipeline->GetClientContext();
 
 		// Schedule tasks equal to the number of threads, which will each merge multiple partitions
 		auto &ts = TaskScheduler::GetScheduler(context);
@@ -80022,7 +80300,7 @@ class OrderMergeEvent : public Event {
 		global_sort_state.CompleteMergeRound();
 		if (global_sort_state.sorted_blocks.size() > 1) {
 			// Multiple blocks remaining: Schedule the next round
-			PhysicalOrder::ScheduleMergeTasks(pipeline, *this, gstate);
+			PhysicalOrder::ScheduleMergeTasks(*pipeline, *this, gstate);
 		}
 	}
 };
@@ -85397,10 +85675,17 @@ void PhysicalCreateIndex::GetData(ExecutionContext &context, DataChunk &chunk, G
 		return;
 	}
 
+	// convert virtual column ids to storage column ids
+	vector<column_t> storage_ids;
+	for (auto &column_id : column_ids) {
+		D_ASSERT(column_id < table.columns.size());
+		storage_ids.push_back(table.columns[column_id].StorageOid());
+	}
+
 	unique_ptr<Index> index;
 	switch (info->index_type) {
 	case IndexType::ART: {
-		index = make_unique<ART>(column_ids, unbound_expressions, info->constraint_type, *context.client.db);
+		index = make_unique<ART>(storage_ids, unbound_expressions, info->constraint_type, *context.client.db);
 		break;
 	}
 	default:
@@ -85705,11 +85990,10 @@ unique_ptr<GlobalSinkState> PhysicalCreateTableAs::GetGlobalSinkState(ClientCont
 SinkResultType PhysicalCreateTableAs::Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p,
                                            DataChunk &input) const {
 	auto &sink = (CreateTableAsGlobalState &)state;
-	if (sink.table) {
-		lock_guard<mutex> client_guard(sink.append_lock);
-		sink.table->storage->Append(*sink.table, context.client, input);
-		sink.inserted_count += input.size();
-	}
+	D_ASSERT(sink.table);
+	lock_guard<mutex> client_guard(sink.append_lock);
+	sink.table->storage->Append(*sink.table, context.client, input);
+	sink.inserted_count += input.size();
 	return SinkResultType::NEED_MORE_INPUT;
 }
 
@@ -86119,6 +86403,7 @@ void PhysicalRecursiveCTE::ExecuteRecursivePipelines(ExecutionContext &context)
 void PhysicalRecursiveCTE::BuildPipelines(Executor &executor, Pipeline &current, PipelineBuildState &state) {
 	op_state.reset();
 	sink_state.reset();
+	pipelines.clear();
 
 	// recursive CTE
 	state.SetPipelineSource(current, this);
@@ -86418,7 +86703,7 @@ PerfectAggregateHashTable::PerfectAggregateHashTable(Allocator &allocator, Buffe
 		total_required_bits += group_bits;
 	}
 	// the total amount of groups we allocate space for is 2^required_bits
-	total_groups = 1 << total_required_bits;
+	total_groups = (uint64_t)1 << total_required_bits;
 	// we don't need to store the groups in a perfect hash table, since the group keys can be deduced by their location
 	grouping_columns = group_types_p.size();
 	layout.Initialize(move(aggregate_objects_p));
@@ -86602,7 +86887,7 @@ static void ReconstructGroupVectorTemplated(uint32_t group_values[], Value &min,
 static void ReconstructGroupVector(uint32_t group_values[], Value &min, idx_t required_bits, idx_t shift,
                                    idx_t entry_count, Vector &result) {
 	// construct the mask for this entry
-	idx_t mask = (1 << required_bits) - 1;
+	idx_t mask = ((uint64_t)1 << required_bits) - 1;
 	switch (result.GetType().InternalType()) {
 	case PhysicalType::INT8:
 		ReconstructGroupVectorTemplated<int8_t>(group_values, min, mask, shift, entry_count, result);
@@ -87091,7 +87376,8 @@ class ComparisonExpression : public ParsedExpression {
 public:
 	template <class T, class BASE>
 	static string ToString(const T &entry) {
-		return entry.left->ToString() + " " + ExpressionTypeToOperator(entry.type) + " " + entry.right->ToString();
+		return StringUtil::Format("(%s) %s (%s)", entry.left->ToString(), ExpressionTypeToOperator(entry.type),
+		                          entry.right->ToString());
 	}
 };
 } // namespace duckdb
@@ -90925,7 +91211,7 @@ void RadixPartitionedHashTable::SetGroupingValues() {
 		for (idx_t i = 0; i < grouping.size(); i++) {
 			if (grouping_set.find(grouping[i]) == grouping_set.end()) {
 				// we don't group on this value!
-				grouping_value += 1 << (grouping.size() - (i + 1));
+				grouping_value += (int64_t)1 << (grouping.size() - (i + 1));
 			}
 		}
 		grouping_values.push_back(Value::BIGINT(grouping_value));
@@ -96483,7 +96769,21 @@ struct ModeIncluded {
 	const idx_t bias;
 };
 
-template <typename KEY_TYPE>
+struct ModeAssignmentStandard {
+	template <class INPUT_TYPE, class RESULT_TYPE>
+	static RESULT_TYPE Assign(Vector &result, INPUT_TYPE input) {
+		return RESULT_TYPE(input);
+	}
+};
+
+struct ModeAssignmentString {
+	template <class INPUT_TYPE, class RESULT_TYPE>
+	static RESULT_TYPE Assign(Vector &result, INPUT_TYPE input) {
+		return StringVector::AddString(result, input);
+	}
+};
+
+template <typename KEY_TYPE, typename ASSIGN_OP>
 struct ModeFunction {
 	template <class STATE>
 	static void Initialize(STATE *state) {
@@ -96596,7 +96896,7 @@ struct ModeFunction {
 		}
 
 		if (state->valid) {
-			rdata[rid] = RESULT_TYPE(*state->mode);
+			rdata[rid] = ASSIGN_OP::template Assign<INPUT_TYPE, RESULT_TYPE>(result, *state->mode);
 		} else {
 			rmask.Set(rid, false);
 		}
@@ -96612,10 +96912,10 @@ struct ModeFunction {
 	}
 };
 
-template <typename INPUT_TYPE, typename KEY_TYPE>
+template <typename INPUT_TYPE, typename KEY_TYPE, typename ASSIGN_OP = ModeAssignmentStandard>
 AggregateFunction GetTypedModeFunction(const LogicalType &type) {
 	using STATE = ModeState<KEY_TYPE>;
-	using OP = ModeFunction<KEY_TYPE>;
+	using OP = ModeFunction<KEY_TYPE, ASSIGN_OP>;
 	auto func = AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, INPUT_TYPE, OP>(type, type);
 	func.window = AggregateFunction::UnaryWindow<STATE, INPUT_TYPE, INPUT_TYPE, OP>;
 	return func;
@@ -96651,7 +96951,7 @@ AggregateFunction GetModeAggregate(const LogicalType &type) {
 		return GetTypedModeFunction<interval_t, interval_t>(type);
 
 	case PhysicalType::VARCHAR:
-		return GetTypedModeFunction<string_t, string>(type);
+		return GetTypedModeFunction<string_t, string, ModeAssignmentString>(type);
 
 	default:
 		throw NotImplementedException("Unimplemented mode aggregate");
@@ -98840,21 +99140,21 @@ AggregateFunction GetHistogramFunction(const LogicalType &type) {
 	case LogicalType::VARCHAR:
 		return GetMapType<HistogramStringFunctor, string, IS_ORDERED>(type);
 	case LogicalType::TIMESTAMP:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, timestamp_t, IS_ORDERED>(type);
 	case LogicalType::TIMESTAMP_TZ:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, timestamp_tz_t, IS_ORDERED>(type);
 	case LogicalType::TIMESTAMP_S:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, timestamp_sec_t, IS_ORDERED>(type);
 	case LogicalType::TIMESTAMP_MS:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, timestamp_ms_t, IS_ORDERED>(type);
 	case LogicalType::TIMESTAMP_NS:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, timestamp_ns_t, IS_ORDERED>(type);
 	case LogicalType::TIME:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, dtime_t, IS_ORDERED>(type);
 	case LogicalType::TIME_TZ:
-		return GetMapType<HistogramFunctor, int64_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, dtime_tz_t, IS_ORDERED>(type);
 	case LogicalType::DATE:
-		return GetMapType<HistogramFunctor, int32_t, IS_ORDERED>(type);
+		return GetMapType<HistogramFunctor, date_t, IS_ORDERED>(type);
 	default:
 		throw InternalException("Unimplemented histogram aggregate");
 	}
@@ -102418,7 +102718,8 @@ struct DateDiff {
 	struct WeekOperator {
 		template <class TA, class TB, class TR>
 		static inline TR Operation(TA startdate, TB enddate) {
-			return Date::Epoch(enddate) / Interval::SECS_PER_WEEK - Date::Epoch(startdate) / Interval::SECS_PER_WEEK;
+			return Date::Epoch(Date::GetMondayOfCurrentWeek(enddate)) / Interval::SECS_PER_WEEK -
+			       Date::Epoch(Date::GetMondayOfCurrentWeek(startdate)) / Interval::SECS_PER_WEEK;
 		}
 	};
 
@@ -108802,12 +109103,49 @@ static void ListAggregatesFunction(DataChunk &args, ExpressionState &state, Vect
 			    result, state_vector.state_vector, count);
 			break;
 		case PhysicalType::INT32:
-			FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, int32_t>(
-			    result, state_vector.state_vector, count);
+			if (key_type.id() == LogicalTypeId::DATE) {
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, date_t>(
+				    result, state_vector.state_vector, count);
+			} else {
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, int32_t>(
+				    result, state_vector.state_vector, count);
+			}
 			break;
 		case PhysicalType::INT64:
-			FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, int64_t>(
-			    result, state_vector.state_vector, count);
+			switch (key_type.id()) {
+			case LogicalTypeId::TIME:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, dtime_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIME_TZ:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, dtime_tz_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIMESTAMP:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, timestamp_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIMESTAMP_MS:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, timestamp_ms_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIMESTAMP_NS:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, timestamp_ns_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIMESTAMP_SEC:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, timestamp_sec_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			case LogicalTypeId::TIMESTAMP_TZ:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, timestamp_tz_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			default:
+				FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, int64_t>(
+				    result, state_vector.state_vector, count);
+				break;
+			}
 			break;
 		case PhysicalType::FLOAT:
 			FUNCTION_FUNCTOR::template ListExecuteFunction<FinalizeValueFunctor, float>(
@@ -109877,18 +110215,12 @@ void SinkDataChunk(Vector *child_vector, SelectionVector &sel, idx_t offset_list
 static void ListSortFunction(DataChunk &args, ExpressionState &state, Vector &result) {
 	D_ASSERT(args.ColumnCount() >= 1 && args.ColumnCount() <= 3);
 	auto count = args.size();
-	Vector &lists = args.data[0];
+	Vector &input_lists = args.data[0];
 
 	result.SetVectorType(VectorType::FLAT_VECTOR);
 	auto &result_validity = FlatVector::Validity(result);
 
-	for (auto &v : args.data) {
-		if (v.GetVectorType() != VectorType::FLAT_VECTOR && v.GetVectorType() != VectorType::CONSTANT_VECTOR) {
-			v.Flatten(count);
-		}
-	}
-
-	if (lists.GetType().id() == LogicalTypeId::SQLNULL) {
+	if (input_lists.GetType().id() == LogicalTypeId::SQLNULL) {
 		result_validity.SetInvalid(0);
 		return;
 	}
@@ -109903,15 +110235,18 @@ static void ListSortFunction(DataChunk &args, ExpressionState &state, Vector &re
 	LocalSortState local_sort_state;
 	local_sort_state.Initialize(global_sort_state, buffer_manager);
 
+	// this ensures that we do not change the order of the entries in the input chunk
+	VectorOperations::Copy(input_lists, result, count, 0, 0);
+
 	// get the child vector
-	auto lists_size = ListVector::GetListSize(lists);
-	auto &child_vector = ListVector::GetEntry(lists);
+	auto lists_size = ListVector::GetListSize(result);
+	auto &child_vector = ListVector::GetEntry(result);
 	UnifiedVectorFormat child_data;
 	child_vector.ToUnifiedFormat(lists_size, child_data);
 
 	// get the lists data
 	UnifiedVectorFormat lists_data;
-	lists.ToUnifiedFormat(count, lists_data);
+	result.ToUnifiedFormat(count, lists_data);
 	auto list_entries = (list_entry_t *)lists_data.data;
 
 	// create the lists_indices vector, this contains an element for each list's entry,
@@ -110008,8 +110343,6 @@ static void ListSortFunction(DataChunk &args, ExpressionState &state, Vector &re
 		child_vector.Flatten(sel_sorted_idx);
 	}
 
-	result.Reference(lists);
-
 	if (args.AllConstant()) {
 		result.SetVectorType(VectorType::CONSTANT_VECTOR);
 	}
@@ -110783,16 +111116,21 @@ static void MapExtractFunction(DataChunk &args, ExpressionState &state, Vector &
 	auto &map = args.data[0];
 	auto &key = args.data[1];
 
-	UnifiedVectorFormat offset_data;
+	UnifiedVectorFormat map_keys_data;
+	UnifiedVectorFormat key_data;
 
-	auto &children = StructVector::GetEntries(map);
+	auto &map_keys = MapVector::GetKeys(map);
+	auto &map_values = MapVector::GetValues(map);
+
+	map_keys.ToUnifiedFormat(args.size(), map_keys_data);
+	key.ToUnifiedFormat(args.size(), key_data);
 
-	children[0]->ToUnifiedFormat(args.size(), offset_data);
 	for (idx_t row = 0; row < args.size(); row++) {
-		idx_t row_index = offset_data.sel->get_index(row);
-		auto key_value = key.GetValue(row_index);
-		auto offsets = ListVector::Search(*children[0], key_value, offset_data.sel->get_index(row));
-		auto values = ListVector::GetValuesFromOffsets(*children[1], offsets);
+		idx_t row_index = map_keys_data.sel->get_index(row);
+		idx_t key_index = key_data.sel->get_index(row);
+		auto key_value = key.GetValue(key_index);
+		auto offsets = ListVector::Search(map_keys, key_value, row_index);
+		auto values = ListVector::GetValuesFromOffsets(map_values, offsets);
 		FillResult(values, result, row);
 	}
 
@@ -113687,6 +114025,24 @@ interval_t DivideOperator::Operation(interval_t left, int64_t right) {
 	return left;
 }
 
+struct BinaryNumericDivideWrapper {
+	template <class FUNC, class OP, class LEFT_TYPE, class RIGHT_TYPE, class RESULT_TYPE>
+	static inline RESULT_TYPE Operation(FUNC fun, LEFT_TYPE left, RIGHT_TYPE right, ValidityMask &mask, idx_t idx) {
+		if (left == NumericLimits<LEFT_TYPE>::Minimum() && right == -1) {
+			throw OutOfRangeException("Overflow in division of %d / %d", left, right);
+		} else if (right == 0) {
+			mask.SetInvalid(idx);
+			return left;
+		} else {
+			return OP::template Operation<LEFT_TYPE, RIGHT_TYPE, RESULT_TYPE>(left, right);
+		}
+	}
+
+	static bool AddsNulls() {
+		return true;
+	}
+};
+
 struct BinaryZeroIsNullWrapper {
 	template <class FUNC, class OP, class LEFT_TYPE, class RIGHT_TYPE, class RESULT_TYPE>
 	static inline RESULT_TYPE Operation(FUNC fun, LEFT_TYPE left, RIGHT_TYPE right, ValidityMask &mask, idx_t idx) {
@@ -113728,13 +114084,13 @@ template <class OP>
 static scalar_function_t GetBinaryFunctionIgnoreZero(const LogicalType &type) {
 	switch (type.id()) {
 	case LogicalTypeId::TINYINT:
-		return BinaryScalarFunctionIgnoreZero<int8_t, int8_t, int8_t, OP>;
+		return BinaryScalarFunctionIgnoreZero<int8_t, int8_t, int8_t, OP, BinaryNumericDivideWrapper>;
 	case LogicalTypeId::SMALLINT:
-		return BinaryScalarFunctionIgnoreZero<int16_t, int16_t, int16_t, OP>;
+		return BinaryScalarFunctionIgnoreZero<int16_t, int16_t, int16_t, OP, BinaryNumericDivideWrapper>;
 	case LogicalTypeId::INTEGER:
-		return BinaryScalarFunctionIgnoreZero<int32_t, int32_t, int32_t, OP>;
+		return BinaryScalarFunctionIgnoreZero<int32_t, int32_t, int32_t, OP, BinaryNumericDivideWrapper>;
 	case LogicalTypeId::BIGINT:
-		return BinaryScalarFunctionIgnoreZero<int64_t, int64_t, int64_t, OP>;
+		return BinaryScalarFunctionIgnoreZero<int64_t, int64_t, int64_t, OP, BinaryNumericDivideWrapper>;
 	case LogicalTypeId::UTINYINT:
 		return BinaryScalarFunctionIgnoreZero<uint8_t, uint8_t, uint8_t, OP>;
 	case LogicalTypeId::USMALLINT:
@@ -120182,11 +120538,22 @@ static void CurrentSchemaFunction(DataChunk &input, ExpressionState &state, Vect
 
 // current_schemas
 static void CurrentSchemasFunction(DataChunk &input, ExpressionState &state, Vector &result) {
+	if (!input.AllConstant()) {
+		throw NotImplementedException("current_schemas requires a constant input");
+	}
+	if (ConstantVector::IsNull(input.data[0])) {
+		result.SetVectorType(VectorType::CONSTANT_VECTOR);
+		ConstantVector::SetNull(result, true);
+		return;
+	}
+	auto implicit_schemas = *ConstantVector::GetData<bool>(input.data[0]);
 	vector<Value> schema_list;
-	vector<string> search_path = ClientData::Get(SystemBindData::GetFrom(state).context).catalog_search_path->Get();
+	auto &catalog_search_path = ClientData::Get(SystemBindData::GetFrom(state).context).catalog_search_path;
+	vector<string> search_path = implicit_schemas ? catalog_search_path->Get() : catalog_search_path->GetSetPaths();
 	std::transform(search_path.begin(), search_path.end(), std::back_inserter(schema_list),
 	               [](const string &s) -> Value { return Value(s); });
-	auto val = Value::LIST(schema_list);
+
+	auto val = Value::LIST(LogicalType::VARCHAR, schema_list);
 	result.Reference(val);
 }
 
@@ -120485,8 +120852,8 @@ struct ArrowScanLocalState : public LocalTableFunctionState {
 struct ArrowScanGlobalState : public GlobalTableFunctionState {
 	unique_ptr<ArrowArrayStreamWrapper> stream;
 	mutex main_mutex;
-	bool ready = false;
 	idx_t max_threads = 1;
+	bool done = false;
 
 	idx_t MaxThreads() const override {
 		return max_threads;
@@ -120774,6 +121141,9 @@ idx_t ArrowTableFunction::ArrowScanMaxThreads(ClientContext &context, const Func
 bool ArrowScanParallelStateNext(ClientContext &context, const FunctionData *bind_data_p, ArrowScanLocalState &state,
                                 ArrowScanGlobalState &parallel_state) {
 	lock_guard<mutex> parallel_lock(parallel_state.main_mutex);
+	if (parallel_state.done) {
+		return false;
+	}
 	state.chunk_offset = 0;
 
 	auto current_chunk = parallel_state.stream->GetNextChunk();
@@ -120783,6 +121153,7 @@ bool ArrowScanParallelStateNext(ClientContext &context, const FunctionData *bind
 	state.chunk = move(current_chunk);
 	//! have we run out of chunks? we are done
 	if (!state.chunk->arrow_array.release) {
+		parallel_state.done = true;
 		return false;
 	}
 	return true;
@@ -123228,6 +123599,7 @@ static void ReadCSVAddNamedParameters(TableFunction &table_function) {
 	table_function.named_parameters["skip"] = LogicalType::BIGINT;
 	table_function.named_parameters["max_line_size"] = LogicalType::VARCHAR;
 	table_function.named_parameters["maximum_line_size"] = LogicalType::VARCHAR;
+	table_function.named_parameters["ignore_errors"] = LogicalType::BOOLEAN;
 }
 
 double CSVReaderProgress(ClientContext &context, const FunctionData *bind_data_p,
@@ -127096,8 +127468,7 @@ static unique_ptr<BaseStatistics> TableScanStatistics(ClientContext &context, co
 		// we don't emit any statistics for tables that have outstanding transaction-local data
 		return nullptr;
 	}
-	auto storage_idx = GetStorageIndex(*bind_data.table, column_id);
-	return bind_data.table->storage->GetStatistics(context, storage_idx);
+	return bind_data.table->GetStatistics(context, column_id);
 }
 
 static void TableScanFunc(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
@@ -128669,7 +129040,7 @@ bool duckdb_validity_row_is_valid(uint64_t *validity, idx_t row) {
 	}
 	idx_t entry_idx = row / 64;
 	idx_t idx_in_entry = row % 64;
-	return validity[entry_idx] & (1 << idx_in_entry);
+	return validity[entry_idx] & ((idx_t)1 << idx_in_entry);
 }
 
 void duckdb_validity_set_row_validity(uint64_t *validity, idx_t row, bool valid) {
@@ -128686,7 +129057,7 @@ void duckdb_validity_set_row_invalid(uint64_t *validity, idx_t row) {
 	}
 	idx_t entry_idx = row / 64;
 	idx_t idx_in_entry = row % 64;
-	validity[entry_idx] &= ~(1 << idx_in_entry);
+	validity[entry_idx] &= ~((uint64_t)1 << idx_in_entry);
 }
 
 void duckdb_validity_set_row_valid(uint64_t *validity, idx_t row) {
@@ -128695,7 +129066,7 @@ void duckdb_validity_set_row_valid(uint64_t *validity, idx_t row) {
 	}
 	idx_t entry_idx = row / 64;
 	idx_t idx_in_entry = row % 64;
-	validity[entry_idx] |= 1 << idx_in_entry;
+	validity[entry_idx] |= (uint64_t)1 << idx_in_entry;
 }
 
 
@@ -131916,6 +132287,11 @@ PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &loc
 			query_progress = active_query->progress_bar->GetCurrentPercentage();
 		}
 		return result;
+	} catch (FatalException &ex) {
+		// fatal exceptions invalidate the entire database
+		result.SetError(PreservedError(ex));
+		auto &db = DatabaseInstance::GetDatabase(*this);
+		db.Invalidate();
 	} catch (const Exception &ex) {
 		result.SetError(PreservedError(ex));
 	} catch (std::exception &ex) {
@@ -132135,9 +132511,19 @@ unique_ptr<PendingQueryResult> ClientContext::PendingStatementOrPreparedStatemen
 		case StatementType::INSERT_STATEMENT:
 		case StatementType::DELETE_STATEMENT:
 		case StatementType::UPDATE_STATEMENT: {
-			auto sql = statement->ToString();
 			Parser parser;
-			parser.ParseQuery(sql);
+			PreservedError error;
+			try {
+				parser.ParseQuery(statement->ToString());
+			} catch (const Exception &ex) {
+				error = PreservedError(ex);
+			} catch (std::exception &ex) {
+				error = PreservedError(ex);
+			}
+			if (error) {
+				// error in verifying query
+				return make_unique<PendingQueryResult>(error);
+			}
 			statement = move(parser.statements[0]);
 			break;
 		}
@@ -142849,8 +143235,27 @@ namespace duckdb {
 //===--------------------------------------------------------------------===//
 // Install Extension
 //===--------------------------------------------------------------------===//
+const string ExtensionHelper::NormalizeVersionTag(const string &version_tag) {
+	if (version_tag.length() > 0 && version_tag[0] != 'v') {
+		return "v" + version_tag;
+	}
+	return version_tag;
+}
+
+bool ExtensionHelper::IsRelease(const string &version_tag) {
+	return !StringUtil::Contains(version_tag, "-dev");
+}
+
+const string ExtensionHelper::GetVersionDirectoryName() {
+	if (IsRelease(DuckDB::LibraryVersion())) {
+		return NormalizeVersionTag(DuckDB::LibraryVersion());
+	} else {
+		return DuckDB::SourceID();
+	}
+}
+
 const vector<string> ExtensionHelper::PathComponents() {
-	return vector<string> {".duckdb", "extensions", DuckDB::SourceID(), DuckDB::Platform()};
+	return vector<string> {".duckdb", "extensions", GetVersionDirectoryName(), DuckDB::Platform()};
 }
 
 string ExtensionHelper::ExtensionDirectory(ClientContext &context) {
@@ -142923,7 +143328,7 @@ void ExtensionHelper::InstallExtension(ClientContext &context, const string &ext
 		extension_name = "";
 	}
 
-	auto url = StringUtil::Replace(url_template, "${REVISION}", DuckDB::SourceID());
+	auto url = StringUtil::Replace(url_template, "${REVISION}", GetVersionDirectoryName());
 	url = StringUtil::Replace(url, "${PLATFORM}", DuckDB::Platform());
 	url = StringUtil::Replace(url, "${NAME}", extension_name);
 
@@ -147403,7 +147808,7 @@ Value ForceCompressionSetting::GetSetting(ClientContext &context) {
 //===--------------------------------------------------------------------===//
 void HomeDirectorySetting::SetLocal(ClientContext &context, const Value &input) {
 	auto &config = ClientConfig::GetConfig(context);
-	config.home_directory = input.IsNull() ? input.ToString() : string();
+	config.home_directory = input.IsNull() ? string() : input.ToString();
 }
 
 Value HomeDirectorySetting::GetSetting(ClientContext &context) {
@@ -148359,9 +148764,7 @@ void CardinalityEstimator::UpdateTotalDomains(JoinNode *node, LogicalOperator *o
 			// Get HLL stats here
 			auto actual_binding = relation_column_to_original_column[key];
 
-			// sometimes base stats is null (test_709.test) returns null for base stats while
-			// there is still a catalog table. Anybody know anything about this?
-			auto base_stats = catalog_table->storage->GetStatistics(context, actual_binding.column_index);
+			auto base_stats = catalog_table->GetStatistics(context, actual_binding.column_index);
 			if (base_stats) {
 				count = base_stats->GetDistinctCount();
 			}
@@ -149057,6 +149460,7 @@ class Deliminator {
 
 
 
+
 namespace duckdb {
 
 class DeliminatorPlanUpdater : LogicalOperatorVisitor {
@@ -149084,7 +149488,15 @@ void DeliminatorPlanUpdater::VisitOperator(LogicalOperator &op) {
 			    cond.comparison != ExpressionType::COMPARE_NOT_DISTINCT_FROM) {
 				continue;
 			}
-			auto &colref = (BoundColumnRefExpression &)*cond.right;
+			Expression *rhs = cond.right.get();
+			while (rhs->type == ExpressionType::OPERATOR_CAST) {
+				auto &cast = (BoundCastExpression &)*rhs;
+				rhs = cast.child.get();
+			}
+			if (rhs->type != ExpressionType::BOUND_COLUMN_REF) {
+				throw InternalException("Erorr in deliminator: expected a bound column reference");
+			}
+			auto &colref = (BoundColumnRefExpression &)*rhs;
 			if (projection_map.find(colref.binding) != projection_map.end()) {
 				// value on the right is a projection of removed DelimGet
 				for (idx_t i = 0; i < decs->size(); i++) {
@@ -150232,7 +150644,10 @@ FilterResult FilterCombiner::AddBoundComparisonFilter(Expression *expr) {
 		auto node = GetNode(left_is_scalar ? comparison.right.get() : comparison.left.get());
 		idx_t equivalence_set = GetEquivalenceSet(node);
 		auto scalar = left_is_scalar ? comparison.left.get() : comparison.right.get();
-		auto constant_value = ExpressionExecutor::EvaluateScalar(*scalar);
+		Value constant_value;
+		if (!ExpressionExecutor::TryEvaluateScalar(*scalar, constant_value)) {
+			return FilterResult::UNSATISFIABLE;
+		}
 		if (constant_value.IsNull()) {
 			// comparisons with null are always null (i.e. will never result in rows)
 			return FilterResult::UNSATISFIABLE;
@@ -150313,7 +150728,11 @@ FilterResult FilterCombiner::AddFilter(Expression *expr) {
 	}
 	if (expr->IsFoldable()) {
 		// scalar condition, evaluate it
-		auto result = ExpressionExecutor::EvaluateScalar(*expr).CastAs(LogicalType::BOOLEAN);
+		Value result;
+		if (!ExpressionExecutor::TryEvaluateScalar(*expr, result)) {
+			return FilterResult::UNSUPPORTED;
+		}
+		result = result.CastAs(LogicalType::BOOLEAN);
 		// check if the filter passes
 		if (result.IsNull() || !BooleanValue::Get(result)) {
 			// the filter does not pass the scalar test, create an empty result
@@ -150337,7 +150756,10 @@ FilterResult FilterCombiner::AddFilter(Expression *expr) {
 
 			if (lower_is_scalar) {
 				auto scalar = comparison.lower.get();
-				auto constant_value = ExpressionExecutor::EvaluateScalar(*scalar);
+				Value constant_value;
+				if (!ExpressionExecutor::TryEvaluateScalar(*scalar, constant_value)) {
+					return FilterResult::UNSUPPORTED;
+				}
 
 				// create the ExpressionValueInformation
 				ExpressionValueInformation info;
@@ -150370,7 +150792,10 @@ FilterResult FilterCombiner::AddFilter(Expression *expr) {
 
 			if (upper_is_scalar) {
 				auto scalar = comparison.upper.get();
-				auto constant_value = ExpressionExecutor::EvaluateScalar(*scalar);
+				Value constant_value;
+				if (!ExpressionExecutor::TryEvaluateScalar(*scalar, constant_value)) {
+					return FilterResult::UNSUPPORTED;
+				}
 
 				// create the ExpressionValueInformation
 				ExpressionValueInformation info;
@@ -151282,7 +151707,6 @@ unique_ptr<Expression> InClauseRewriter::VisitReplace(BoundOperatorExpression &e
 	// IN clause with many children: try to generate a mark join that replaces this IN expression
 	// we can only do this if the expressions in the expression list are scalar
 	for (idx_t i = 1; i < expr.children.size(); i++) {
-		D_ASSERT(expr.children[i]->return_type == in_type);
 		if (!expr.children[i]->IsFoldable()) {
 			// non-scalar expression
 			all_scalar = false;
@@ -153721,21 +154145,35 @@ unique_ptr<LogicalOperator> FilterPushdown::PushdownAggregate(unique_ptr<Logical
 	FilterPushdown child_pushdown(optimizer);
 	for (idx_t i = 0; i < filters.size(); i++) {
 		auto &f = *filters[i];
-		// check if any aggregate or GROUPING functions are in the set
-		if (f.bindings.find(aggr.aggregate_index) == f.bindings.end() &&
-		    f.bindings.find(aggr.groupings_index) == f.bindings.end()) {
-			// no aggregate! we can push this down
-			// rewrite any group bindings within the filter
-			f.filter = ReplaceGroupBindings(aggr, move(f.filter));
-			// add the filter to the child node
-			if (child_pushdown.AddFilter(move(f.filter)) == FilterResult::UNSATISFIABLE) {
-				// filter statically evaluates to false, strip tree
-				return make_unique<LogicalEmptyResult>(move(op));
+		if (f.bindings.find(aggr.aggregate_index) != f.bindings.end()) {
+			// filter on aggregate: cannot pushdown
+			continue;
+		}
+		if (f.bindings.find(aggr.groupings_index) != f.bindings.end()) {
+			// filter on GROUPINGS function: cannot pushdown
+			continue;
+		}
+		// if there are any empty grouping sets, we cannot push down filters
+		bool has_empty_grouping_sets = false;
+		for (auto &grp : aggr.grouping_sets) {
+			if (grp.empty()) {
+				has_empty_grouping_sets = true;
 			}
-			// erase the filter from here
-			filters.erase(filters.begin() + i);
-			i--;
 		}
+		if (has_empty_grouping_sets) {
+			continue;
+		}
+		// no aggregate! we can push this down
+		// rewrite any group bindings within the filter
+		f.filter = ReplaceGroupBindings(aggr, move(f.filter));
+		// add the filter to the child node
+		if (child_pushdown.AddFilter(move(f.filter)) == FilterResult::UNSATISFIABLE) {
+			// filter statically evaluates to false, strip tree
+			return make_unique<LogicalEmptyResult>(move(op));
+		}
+		// erase the filter from here
+		filters.erase(filters.begin() + i);
+		i--;
 	}
 	child_pushdown.GenerateFilters();
 
@@ -157926,6 +158364,7 @@ unique_ptr<NodeStatistics> StatisticsPropagator::PropagateStatistics(LogicalGet
 
 
 
+
 namespace duckdb {
 
 void StatisticsPropagator::PropagateStatistics(LogicalComparisonJoin &join, unique_ptr<LogicalOperator> *node_ptr) {
@@ -157955,10 +158394,15 @@ void StatisticsPropagator::PropagateStatistics(LogicalComparisonJoin &join, uniq
 					// semi or inner join on false; entire node can be pruned
 					ReplaceWithEmptyResult(*node_ptr);
 					return;
-				case JoinType::ANTI:
-					// anti join: replace entire join with LHS
-					*node_ptr = move(join.children[0]);
+				case JoinType::ANTI: {
+					// when the right child has data, return the left child
+					// when the right child has no data, return an empty set
+					auto limit = make_unique<LogicalLimit>(1, 0, nullptr, nullptr);
+					limit->AddChild(move(join.children[1]));
+					auto cross_product = LogicalCrossProduct::Create(move(join.children[0]), move(limit));
+					*node_ptr = move(cross_product);
 					return;
+				}
 				case JoinType::LEFT:
 					// anti/left outer join: replace right side with empty node
 					ReplaceWithEmptyResult(join.children[1]);
@@ -157986,10 +158430,15 @@ void StatisticsPropagator::PropagateStatistics(LogicalComparisonJoin &join, uniq
 				} else {
 					// this is the only condition and it is always true: all conditions are true
 					switch (join.join_type) {
-					case JoinType::SEMI:
-						// semi join on true: replace entire join with LHS
-						*node_ptr = move(join.children[0]);
+					case JoinType::SEMI: {
+						// when the right child has data, return the left child
+						// when the right child has no data, return an empty set
+						auto limit = make_unique<LogicalLimit>(1, 0, nullptr, nullptr);
+						limit->AddChild(move(join.children[1]));
+						auto cross_product = LogicalCrossProduct::Create(move(join.children[0]), move(limit));
+						*node_ptr = move(cross_product);
 						return;
+					}
 					case JoinType::INNER:
 					case JoinType::LEFT:
 					case JoinType::RIGHT:
@@ -158106,6 +158555,7 @@ unique_ptr<NodeStatistics> StatisticsPropagator::PropagateStatistics(LogicalJoin
 	// then propagate into the join conditions
 	switch (join.type) {
 	case LogicalOperatorType::LOGICAL_COMPARISON_JOIN:
+	case LogicalOperatorType::LOGICAL_DELIM_JOIN:
 		PropagateStatistics((LogicalComparisonJoin &)join, node_ptr);
 		break;
 	case LogicalOperatorType::LOGICAL_ANY_JOIN:
@@ -158429,6 +158879,19 @@ unique_ptr<LogicalOperator> TopN::Optimize(unique_ptr<LogicalOperator> op) {
 } // namespace duckdb
 
 
+namespace duckdb {
+
+BasePipelineEvent::BasePipelineEvent(shared_ptr<Pipeline> pipeline_p)
+    : Event(pipeline_p->executor), pipeline(move(pipeline_p)) {
+}
+
+BasePipelineEvent::BasePipelineEvent(Pipeline &pipeline_p)
+    : Event(pipeline_p.executor), pipeline(pipeline_p.shared_from_this()) {
+}
+
+} // namespace duckdb
+
+
 
 
 
@@ -158548,16 +159011,13 @@ class PipelineCompleteEvent : public Event {
 
 
 
-
 namespace duckdb {
 
-class PipelineEvent : public Event {
+//! A PipelineEvent is responsible for scheduling a pipeline
+class PipelineEvent : public BasePipelineEvent {
 public:
 	PipelineEvent(shared_ptr<Pipeline> pipeline);
 
-	//! The pipeline that this event belongs to
-	shared_ptr<Pipeline> pipeline;
-
 public:
 	void Schedule() override;
 	void FinishEvent() override;
@@ -158685,17 +159145,13 @@ class PipelineExecutor {
 
 
 
-
 namespace duckdb {
 class Executor;
 
-class PipelineFinishEvent : public Event {
+class PipelineFinishEvent : public BasePipelineEvent {
 public:
 	PipelineFinishEvent(shared_ptr<Pipeline> pipeline);
 
-	//! The pipeline that this event belongs to
-	shared_ptr<Pipeline> pipeline;
-
 public:
 	void Schedule() override;
 	void FinishEvent() override;
@@ -158722,6 +159178,9 @@ Executor &Executor::Get(ClientContext &context) {
 
 void Executor::AddEvent(shared_ptr<Event> event) {
 	lock_guard<mutex> elock(executor_lock);
+	if (cancelled) {
+		return;
+	}
 	events.push_back(move(event));
 }
 
@@ -159025,6 +159484,7 @@ void Executor::CancelTasks() {
 	vector<weak_ptr<Pipeline>> weak_references;
 	{
 		lock_guard<mutex> elock(executor_lock);
+		cancelled = true;
 		weak_references.reserve(pipelines.size());
 		for (auto &pipeline : pipelines) {
 			weak_references.push_back(weak_ptr<Pipeline>(pipeline));
@@ -159101,10 +159561,10 @@ PendingExecutionResult Executor::ExecuteTask() {
 	lock_guard<mutex> elock(executor_lock);
 	pipelines.clear();
 	NextExecutor();
-	if (!exceptions.empty()) { // LCOV_EXCL_START
+	if (HasError()) { // LCOV_EXCL_START
 		// an exception has occurred executing one of the pipelines
 		execution_result = PendingExecutionResult::EXECUTION_ERROR;
-		ThrowExceptionInternal();
+		ThrowException();
 	} // LCOV_EXCL_STOP
 	execution_result = PendingExecutionResult::RESULT_READY;
 	return execution_result;
@@ -159113,6 +159573,7 @@ PendingExecutionResult Executor::ExecuteTask() {
 void Executor::Reset() {
 	lock_guard<mutex> elock(executor_lock);
 	physical_plan = nullptr;
+	cancelled = false;
 	owned_plan.reset();
 	root_executor.reset();
 	root_pipelines.clear();
@@ -159149,7 +159610,7 @@ vector<LogicalType> Executor::GetTypes() {
 }
 
 void Executor::PushError(PreservedError exception) {
-	lock_guard<mutex> elock(executor_lock);
+	lock_guard<mutex> elock(error_lock);
 	// interrupt execution of any other pipelines that belong to this executor
 	context.interrupted = true;
 	// push the exception onto the stack
@@ -159157,20 +159618,16 @@ void Executor::PushError(PreservedError exception) {
 }
 
 bool Executor::HasError() {
-	lock_guard<mutex> elock(executor_lock);
+	lock_guard<mutex> elock(error_lock);
 	return !exceptions.empty();
 }
 
 void Executor::ThrowException() {
-	lock_guard<mutex> elock(executor_lock);
-	ThrowExceptionInternal();
-}
-
-void Executor::ThrowExceptionInternal() { // LCOV_EXCL_START
+	lock_guard<mutex> elock(error_lock);
 	D_ASSERT(!exceptions.empty());
 	auto &entry = exceptions[0];
 	entry.Throw();
-} // LCOV_EXCL_STOP
+}
 
 void Executor::Flush(ThreadContext &tcontext) {
 	profiler->Flush(tcontext.profiler);
@@ -159435,6 +159892,9 @@ void Pipeline::Ready() {
 }
 
 void Pipeline::Finalize(Event &event) {
+	if (executor.HasError()) {
+		return;
+	}
 	D_ASSERT(ready);
 	try {
 		auto sink_state = sink->Finalize(*this, event, executor.context, *sink->sink_state);
@@ -159545,16 +160005,25 @@ void PipelineCompleteEvent::FinalizeFinish() {
 } // namespace duckdb
 
 
+
 namespace duckdb {
 
-PipelineEvent::PipelineEvent(shared_ptr<Pipeline> pipeline_p)
-    : Event(pipeline_p->executor), pipeline(move(pipeline_p)) {
+PipelineEvent::PipelineEvent(shared_ptr<Pipeline> pipeline_p) : BasePipelineEvent(move(pipeline_p)) {
 }
 
 void PipelineEvent::Schedule() {
 	auto event = shared_from_this();
-	pipeline->Schedule(event);
-	D_ASSERT(total_tasks > 0);
+	auto &executor = pipeline->executor;
+	try {
+		pipeline->Schedule(event);
+		D_ASSERT(total_tasks > 0);
+	} catch (Exception &ex) {
+		executor.PushError(PreservedError(ex));
+	} catch (std::exception &ex) {
+		executor.PushError(PreservedError(ex));
+	} catch (...) { // LCOV_EXCL_START
+		executor.PushError(PreservedError("Unknown exception in Finalize!"));
+	} // LCOV_EXCL_STOP
 }
 
 void PipelineEvent::FinishEvent() {
@@ -159937,8 +160406,7 @@ void PipelineExecutor::EndOperator(PhysicalOperator *op, DataChunk *chunk) {
 
 namespace duckdb {
 
-PipelineFinishEvent::PipelineFinishEvent(shared_ptr<Pipeline> pipeline_p)
-    : Event(pipeline_p->executor), pipeline(move(pipeline_p)) {
+PipelineFinishEvent::PipelineFinishEvent(shared_ptr<Pipeline> pipeline_p) : BasePipelineEvent(move(pipeline_p)) {
 }
 
 void PipelineFinishEvent::Schedule() {
@@ -173876,7 +174344,7 @@ string QueryNode::ResultModifiersToString() const {
 		} else if (modifier.type == ResultModifierType::LIMIT_PERCENT_MODIFIER) {
 			auto &limit_p_modifier = (LimitPercentModifier &)modifier;
 			if (limit_p_modifier.limit) {
-				result += " LIMIT " + limit_p_modifier.limit->ToString() + " %";
+				result += " LIMIT (" + limit_p_modifier.limit->ToString() + ") %";
 			}
 			if (limit_p_modifier.offset) {
 				result += " OFFSET " + limit_p_modifier.offset->ToString();
@@ -177752,7 +178220,7 @@ void Transformer::TransformCTE(duckdb_libpgquery::PGWithClause *de_with_clause,
 		}
 		// we need a query
 		if (!cte->ctequery || cte->ctequery->type != duckdb_libpgquery::T_PGSelectStmt) {
-			throw InternalException("A CTE needs a SELECT");
+			throw NotImplementedException("A CTE needs a SELECT");
 		}
 
 		// CTE transformation can either result in inlining for non recursive CTEs, or in recursive CTE bindings
@@ -178169,7 +178637,7 @@ LogicalType Transformer::TransformTypeName(duckdb_libpgquery::PGTypeName *type_n
 
 		result_type = LogicalType::MAP(move(children));
 	} else {
-		int8_t width, scale;
+		int64_t width, scale;
 		if (base_type == LogicalTypeId::DECIMAL) {
 			// default decimal width/scale
 			width = 18;
@@ -181348,6 +181816,8 @@ BindResult SelectBinder::BindAggregate(FunctionExpression &aggr, AggregateFuncti
 			// we didn't bind columns, try again in children
 			return BindResult(error);
 		}
+	} else if (depth > 0 && !aggregate_binder.HasBoundColumns()) {
+		return BindResult("Aggregate with only constant parameters has to be bound in the root subquery");
 	}
 	if (!filter_error.empty()) {
 		return BindResult(filter_error);
@@ -181355,8 +181825,9 @@ BindResult SelectBinder::BindAggregate(FunctionExpression &aggr, AggregateFuncti
 
 	if (aggr.filter) {
 		auto &child = (BoundExpression &)*aggr.filter;
-		bound_filter = move(child.expr);
+		bound_filter = BoundCastExpression::AddCastToType(move(child.expr), LogicalType::BOOLEAN);
 	}
+
 	// all children bound successfully
 	// extract the children and types
 	vector<LogicalType> types;
@@ -182509,7 +182980,7 @@ BindResult ExpressionBinder::BindMacro(FunctionExpression &function, ScalarMacro
 	string error =
 	    MacroFunction::ValidateArguments(*macro_func->function, macro_func->name, function, positionals, defaults);
 	if (!error.empty()) {
-		return BindResult(binder.FormatError(*expr->get(), error));
+		throw BinderException(binder.FormatError(*expr->get(), error));
 	}
 
 	// create a MacroBinding to bind this macro's parameters to its arguments
@@ -183532,10 +184003,13 @@ class OrderBinder {
 public:
 	unique_ptr<Expression> Bind(unique_ptr<ParsedExpression> expr);
 
-	idx_t MaxCount() {
+	idx_t MaxCount() const {
 		return max_count;
 	}
 
+	bool HasExtraList() const {
+		return extra_list;
+	}
 	unique_ptr<Expression> CreateExtraReference(unique_ptr<ParsedExpression> expr);
 
 private:
@@ -183577,6 +184051,9 @@ unique_ptr<Expression> Binder::BindDelimiter(ClientContext &context, OrderBinder
                                              Value &delimiter_value) {
 	auto new_binder = Binder::CreateBinder(context, this, true);
 	if (delimiter->HasSubquery()) {
+		if (!order_binder.HasExtraList()) {
+			throw BinderException("Subquery in LIMIT/OFFSET not supported in set operation");
+		}
 		return order_binder.CreateExtraReference(move(delimiter));
 	}
 	ExpressionBinder expr_binder(*new_binder, context);
@@ -183587,6 +184064,8 @@ unique_ptr<Expression> Binder::BindDelimiter(ClientContext &context, OrderBinder
 		delimiter_value = ExpressionExecutor::EvaluateScalar(*expr).CastAs(type);
 		return nullptr;
 	}
+	// move any correlated columns to this binder
+	MoveCorrelatedExpressions(*new_binder);
 	return expr;
 }
 
@@ -186190,11 +186669,13 @@ unique_ptr<BoundCreateTableInfo> Binder::BindCreateTableInfo(unique_ptr<CreateIn
 		BindDefaultValues(base.columns, result->bound_defaults);
 	}
 
+	idx_t regular_column_count = 0;
 	// bind collations to detect any unsupported collation errors
 	for (auto &column : base.columns) {
 		if (column.Generated()) {
 			continue;
 		}
+		regular_column_count++;
 		if (column.Type().id() == LogicalTypeId::VARCHAR) {
 			ExpressionBinder::TestCollation(context, StringType::GetCollation(column.Type()));
 		}
@@ -186206,6 +186687,9 @@ unique_ptr<BoundCreateTableInfo> Binder::BindCreateTableInfo(unique_ptr<CreateIn
 			result->dependencies.insert(type_dependency);
 		}
 	}
+	if (regular_column_count == 0) {
+		throw BinderException("Creating a table without physical (non-generated) columns is not supported");
+	}
 	properties.allow_stream_result = false;
 	return result;
 }
@@ -186633,6 +187117,13 @@ BoundStatement Binder::Bind(ExportStatement &stmt) {
 		info->schema = table->schema->name;
 		info->table = table->name;
 
+		// We can not export generated columns
+		for (auto &col : table->columns) {
+			if (!col.Generated()) {
+				info->select_list.push_back(col.GetName());
+			}
+		}
+
 		exported_data.table_name = info->table;
 		exported_data.schema_name = info->schema;
 		exported_data.file_path = info->file_path;
@@ -186878,7 +187369,10 @@ BoundStatement Binder::Bind(InsertStatement &stmt) {
 	}
 
 	// parse select statement and add to logical plan
-	auto root_select = Bind(*stmt.select_statement);
+	auto select_binder = Binder::CreateBinder(context, this);
+	auto root_select = select_binder->Bind(*stmt.select_statement);
+	MoveCorrelatedExpressions(*select_binder);
+
 	CheckInsertColumnCountMismatch(expected_columns, root_select.types.size(), !stmt.columns.empty(),
 	                               table->name.c_str());
 
@@ -187267,6 +187761,9 @@ BoundStatement Binder::BindSummarize(ShowStatement &stmt) {
 
 
 
+
+
+
 //===----------------------------------------------------------------------===//
 //                         DuckDB
 //
@@ -187301,10 +187798,6 @@ class UpdateBinder : public ExpressionBinder {
 
 
 
-
-
-
-
 //===----------------------------------------------------------------------===//
 //                         DuckDB
 //
@@ -187337,6 +187830,8 @@ class BoundCrossProductRef : public BoundTableRef {
 };
 } // namespace duckdb
 
+
+
 #include <algorithm>
 
 namespace duckdb {
@@ -187507,10 +188002,10 @@ BoundStatement Binder::Bind(UpdateStatement &stmt) {
 		if (column.Generated()) {
 			throw BinderException("Cant update column \"%s\" because it is a generated column!", column.Name());
 		}
-		if (std::find(update->columns.begin(), update->columns.end(), column.Oid()) != update->columns.end()) {
+		if (std::find(update->columns.begin(), update->columns.end(), column.StorageOid()) != update->columns.end()) {
 			throw BinderException("Multiple assignments to same column \"%s\"", colname);
 		}
-		update->columns.push_back(column.Oid());
+		update->columns.push_back(column.StorageOid());
 
 		if (expr->type == ExpressionType::VALUE_DEFAULT) {
 			update->expressions.push_back(make_unique<BoundDefaultExpression>(column.Type()));
@@ -187592,7 +188087,20 @@ BoundStatement Binder::Bind(VacuumStatement &stmt) {
 			auto &get = (LogicalGet &)*ref->get;
 			columns.insert(columns.end(), get.names.begin(), get.names.end());
 		}
+
+		case_insensitive_set_t column_name_set;
+		vector<string> non_generated_column_names;
 		for (auto &col_name : columns) {
+			if (column_name_set.count(col_name) > 0) {
+				throw BinderException("Vacuum the same column twice(same name in column name list)");
+			}
+			column_name_set.insert(col_name);
+			auto &col = ref->table->GetColumn(col_name);
+			// ignore generated column
+			if (col.Generated()) {
+				continue;
+			}
+			non_generated_column_names.push_back(col_name);
 			ColumnRefExpression colref(col_name, ref->table->name);
 			auto result = bind_context.BindColumn(colref, 0);
 			if (result.HasError()) {
@@ -187600,17 +188108,29 @@ BoundStatement Binder::Bind(VacuumStatement &stmt) {
 			}
 			select_list.push_back(move(result.expression));
 		}
-		auto table_scan = CreatePlan(*ref);
-		D_ASSERT(table_scan->type == LogicalOperatorType::LOGICAL_GET);
-		auto &get = (LogicalGet &)*table_scan;
-		for (idx_t i = 0; i < get.column_ids.size(); i++) {
-			stmt.info->column_id_map[i] = get.column_ids[i];
-		}
+		stmt.info->columns = move(non_generated_column_names);
+		if (!select_list.empty()) {
+			auto table_scan = CreatePlan(*ref);
+			D_ASSERT(table_scan->type == LogicalOperatorType::LOGICAL_GET);
 
-		auto projection = make_unique<LogicalProjection>(GenerateTableIndex(), move(select_list));
-		projection->children.push_back(move(table_scan));
+			auto &get = (LogicalGet &)*table_scan;
+
+			D_ASSERT(select_list.size() == get.column_ids.size());
+			D_ASSERT(stmt.info->columns.size() == get.column_ids.size());
+			for (idx_t i = 0; i < get.column_ids.size(); i++) {
+				stmt.info->column_id_map[i] = ref->table->columns[get.column_ids[i]].StorageOid();
+			}
+
+			auto projection = make_unique<LogicalProjection>(GenerateTableIndex(), move(select_list));
+			projection->children.push_back(move(table_scan));
 
-		root = move(projection);
+			root = move(projection);
+		} else {
+			// eg. CREATE TABLE test (x AS (1));
+			//     ANALYZE test;
+			// Make it not a SINK so it doesn't have to do anything
+			stmt.info->has_table = false;
+		}
 	}
 	auto vacuum = make_unique<LogicalSimple>(LogicalOperatorType::LOGICAL_VACUUM, move(stmt.info));
 	if (root) {
@@ -188134,6 +188654,18 @@ string Binder::RetrieveUsingBinding(Binder &current_binder, UsingColumnSet *curr
 	return binding;
 }
 
+static vector<string> RemoveDuplicateUsingColumns(const vector<string> &using_columns) {
+	vector<string> result;
+	case_insensitive_set_t handled_columns;
+	for (auto &using_column : using_columns) {
+		if (handled_columns.find(using_column) == handled_columns.end()) {
+			handled_columns.insert(using_column);
+			result.push_back(using_column);
+		}
+	}
+	return result;
+}
+
 unique_ptr<BoundTableRef> Binder::Bind(JoinRef &ref) {
 	auto result = make_unique<BoundJoinRef>();
 	result->left_binder = Binder::CreateBinder(context, this);
@@ -188203,6 +188735,8 @@ unique_ptr<BoundTableRef> Binder::Bind(JoinRef &ref) {
 		D_ASSERT(!result->condition);
 		extra_using_columns = ref.using_columns;
 	}
+	extra_using_columns = RemoveDuplicateUsingColumns(extra_using_columns);
+
 	if (!extra_using_columns.empty()) {
 		vector<UsingColumnSet *> left_using_bindings;
 		vector<UsingColumnSet *> right_using_bindings;
@@ -188648,7 +189182,7 @@ unique_ptr<LogicalOperator> Binder::CreatePlan(BoundEmptyTableRef &ref) {
 namespace duckdb {
 
 unique_ptr<LogicalOperator> Binder::CreatePlan(BoundExpressionListRef &ref) {
-	auto root = make_unique_base<LogicalOperator, LogicalDummyScan>(0);
+	auto root = make_unique_base<LogicalOperator, LogicalDummyScan>(GenerateTableIndex());
 	// values list, first plan any subqueries in the list
 	for (auto &expr_list : ref.values) {
 		for (auto &expr : expr_list) {
@@ -191201,7 +191735,7 @@ BindResult ConstantBinder::BindExpression(unique_ptr<ParsedExpression> *expr_ptr
 	case ExpressionClass::COLUMN_REF:
 		return BindResult(clause + " cannot contain column names");
 	case ExpressionClass::SUBQUERY:
-		return BindResult(clause + " cannot contain subqueries");
+		throw BinderException(clause + " cannot contain subqueries");
 	case ExpressionClass::DEFAULT:
 		return BindResult(clause + " cannot contain DEFAULT clause");
 	case ExpressionClass::WINDOW:
@@ -191461,6 +191995,9 @@ unique_ptr<Expression> OrderBinder::CreateProjectionReference(ParsedExpression &
 }
 
 unique_ptr<Expression> OrderBinder::CreateExtraReference(unique_ptr<ParsedExpression> expr) {
+	if (!extra_list) {
+		throw InternalException("CreateExtraReference called without extra_list");
+	}
 	auto result = CreateProjectionReference(*expr, extra_list->size());
 	extra_list->push_back(move(expr));
 	return result;
@@ -192446,6 +192983,7 @@ unique_ptr<TableFilter> ConjunctionAndFilter::Deserialize(FieldReader &source) {
 
 
 
+
 namespace duckdb {
 
 ConstantFilter::ConstantFilter(ExpressionType comparison_type_p, Value constant_p)
@@ -192469,7 +193007,7 @@ FilterPropagateResult ConstantFilter::CheckStatistics(BaseStatistics &stats) {
 	case PhysicalType::DOUBLE:
 		return ((NumericStatistics &)stats).CheckZonemap(comparison_type, constant);
 	case PhysicalType::VARCHAR:
-		return ((StringStatistics &)stats).CheckZonemap(comparison_type, constant.ToString());
+		return ((StringStatistics &)stats).CheckZonemap(comparison_type, StringValue::Get(constant));
 	default:
 		return FilterPropagateResult::NO_PRUNING_POSSIBLE;
 	}
@@ -195586,6 +196124,9 @@ unique_ptr<LogicalOperator> FlattenDependentJoins::PushDownDependentJoinInternal
 	case LogicalOperatorType::LOGICAL_ORDER_BY:
 		plan->children[0] = PushDownDependentJoin(move(plan->children[0]));
 		return plan;
+	case LogicalOperatorType::LOGICAL_RECURSIVE_CTE: {
+		throw ParserException("Recursive CTEs not supported in correlated subquery");
+	}
 	default:
 		throw InternalException("Logical operator type \"%s\" for dependent join", LogicalOperatorToString(plan->type));
 	}
@@ -197712,7 +198253,7 @@ void CheckpointManager::CreateCheckpoint() {
 	wal->Flush();
 
 	if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_HEADER) {
-		throw IOException("Checkpoint aborted before header write because of PRAGMA checkpoint_abort flag");
+		throw FatalException("Checkpoint aborted before header write because of PRAGMA checkpoint_abort flag");
 	}
 
 	// finally write the updated header
@@ -197721,7 +198262,7 @@ void CheckpointManager::CreateCheckpoint() {
 	block_manager.WriteHeader(header);
 
 	if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_BEFORE_TRUNCATE) {
-		throw IOException("Checkpoint aborted before truncate because of PRAGMA checkpoint_abort flag");
+		throw FatalException("Checkpoint aborted before truncate because of PRAGMA checkpoint_abort flag");
 	}
 
 	// truncate the WAL
@@ -203308,7 +203849,7 @@ DataTable::DataTable(ClientContext &context, DataTable &parent, idx_t removed_co
 }
 
 // Alter column to add new constraint
-DataTable::DataTable(ClientContext &context, DataTable &parent, unique_ptr<Constraint> constraint)
+DataTable::DataTable(ClientContext &context, DataTable &parent, unique_ptr<BoundConstraint> constraint)
     : info(parent.info), db(parent.db), total_rows(parent.total_rows.load()), row_groups(parent.row_groups),
       is_root(true) {
 
@@ -203483,7 +204024,7 @@ void DataTable::InitializeParallelScan(ClientContext &context, ParallelTableScan
 
 bool DataTable::NextParallelScan(ClientContext &context, ParallelTableScanState &state, TableScanState &scan_state,
                                  const vector<column_t> &column_ids) {
-	while (state.current_row_group) {
+	while (state.current_row_group && state.current_row_group->count > 0) {
 		idx_t vector_index;
 		idx_t max_row;
 		if (ClientConfig::GetConfig(context).verify_parallelism) {
@@ -203497,13 +204038,8 @@ bool DataTable::NextParallelScan(ClientContext &context, ParallelTableScanState
 			max_row = state.current_row_group->start + state.current_row_group->count;
 		}
 		max_row = MinValue<idx_t>(max_row, state.max_row);
-		bool need_to_scan;
-		if (state.current_row_group->count == 0) {
-			need_to_scan = false;
-		} else {
-			need_to_scan = InitializeScanInRowGroup(scan_state, column_ids, scan_state.table_filters,
-			                                        state.current_row_group, vector_index, max_row);
-		}
+		bool need_to_scan = InitializeScanInRowGroup(scan_state, column_ids, scan_state.table_filters,
+		                                             state.current_row_group, vector_index, max_row);
 		if (ClientConfig::GetConfig(context).verify_parallelism) {
 			state.vector_index++;
 			if (state.vector_index * STANDARD_VECTOR_SIZE >= state.current_row_group->count) {
@@ -203762,14 +204298,15 @@ static void VerifyDeleteForeignKeyConstraint(const BoundForeignKeyConstraint &bf
 	VerifyForeignKeyConstraint(bfk, context, chunk, false);
 }
 
-void DataTable::VerifyNewConstraint(ClientContext &context, DataTable &parent, const Constraint *constraint) {
+void DataTable::VerifyNewConstraint(ClientContext &context, DataTable &parent, const BoundConstraint *constraint) {
 	if (constraint->type != ConstraintType::NOT_NULL) {
 		throw NotImplementedException("FIXME: ALTER COLUMN with such constraint is not supported yet");
 	}
 	// scan the original table, check if there's any null value
-	auto &not_null_constraint = (NotNullConstraint &)*constraint;
+	auto &not_null_constraint = (BoundNotNullConstraint &)*constraint;
 	auto &transaction = Transaction::GetTransaction(context);
 	vector<LogicalType> scan_types;
+	D_ASSERT(not_null_constraint.index < parent.column_definitions.size());
 	scan_types.push_back(parent.column_definitions[not_null_constraint.index].Type());
 	DataChunk scan_chunk;
 	auto &allocator = Allocator::Get(context);
@@ -204526,6 +205063,9 @@ unique_ptr<BaseStatistics> DataTable::GetStatistics(ClientContext &context, colu
 		return nullptr;
 	}
 	lock_guard<mutex> stats_guard(stats_lock);
+	if (column_id >= column_stats.size()) {
+		throw InternalException("Call to GetStatistics is out of range");
+	}
 	return column_stats[column_id]->stats->Copy();
 }
 
@@ -205814,7 +206354,7 @@ void SingleFileBlockManager::WriteHeader(DatabaseHeader header) {
 
 	auto &config = DBConfig::GetConfig(db);
 	if (config.options.checkpoint_abort == CheckpointAbort::DEBUG_ABORT_AFTER_FREE_LIST_WRITE) {
-		throw IOException("Checkpoint aborted after free list write because of PRAGMA checkpoint_abort flag");
+		throw FatalException("Checkpoint aborted after free list write because of PRAGMA checkpoint_abort flag");
 	}
 
 	if (!use_direct_io) {
@@ -207340,6 +207880,7 @@ idx_t ChunkVectorInfo::Delete(Transaction &transaction, row_t rows[], idx_t coun
 		}
 		// after verifying that there are no conflicts we mark the tuple as deleted
 		deleted[rows[i]] = transaction.transaction_id;
+		rows[deleted_tuples] = rows[i];
 		deleted_tuples++;
 	}
 	return deleted_tuples;
@@ -207667,6 +208208,8 @@ class StructColumnData : public ColumnData {
 	idx_t ScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, bool allow_updates) override;
 	idx_t ScanCount(ColumnScanState &state, Vector &result, idx_t count) override;
 
+	void Skip(ColumnScanState &state, idx_t count = STANDARD_VECTOR_SIZE) override;
+
 	void InitializeAppend(ColumnAppendState &state) override;
 	void Append(BaseStatistics &stats, ColumnAppendState &state, Vector &vector, idx_t count) override;
 	void RevertAppend(row_t start_row) override;
@@ -210246,9 +210789,15 @@ void VersionDeleteState::Flush() {
 		return;
 	}
 	// delete in the current info
-	delete_count += current_info->Delete(transaction, rows, count);
-	// now push the delete into the undo buffer
-	transaction.PushDelete(table, current_info, rows, count, base_row + chunk_row);
+	// it is possible for delete statements to delete the same tuple multiple times when combined with a USING clause
+	// in the current_info->Delete, we check which tuples are actually deleted (excluding duplicate deletions)
+	// this is returned in the actual_delete_count
+	auto actual_delete_count = current_info->Delete(transaction, rows, count);
+	delete_count += actual_delete_count;
+	if (actual_delete_count > 0) {
+		// now push the delete into the undo buffer, but only if any deletes were actually performed
+		transaction.PushDelete(table, current_info, rows, actual_delete_count, base_row + chunk_row);
+	}
 	count = 0;
 }
 
@@ -210625,6 +211174,15 @@ idx_t StructColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t
 	return scan_count;
 }
 
+void StructColumnData::Skip(ColumnScanState &state, idx_t count) {
+	validity.Skip(state.child_states[0], count);
+
+	// skip inside the sub-columns
+	for (idx_t child_idx = 0; child_idx < sub_columns.size(); child_idx++) {
+		sub_columns[child_idx]->Skip(state.child_states[child_idx + 1], count);
+	}
+}
+
 void StructColumnData::InitializeAppend(ColumnAppendState &state) {
 	ColumnAppendState validity_append;
 	validity.InitializeAppend(validity_append);
@@ -213084,6 +213642,7 @@ void CleanupState::CleanupUpdate(UpdateInfo *info) {
 
 void CleanupState::CleanupDelete(DeleteInfo *info) {
 	auto version_table = info->table;
+	D_ASSERT(version_table->info->cardinality >= info->count);
 	version_table->info->cardinality -= info->count;
 	if (version_table->info->indexes.Empty()) {
 		// this table has no indexes: no cleanup to be done
@@ -266509,49 +267068,84 @@ static void AssignInvalidUTF8Reason(UnicodeInvalidReason *invalid_reason, size_t
 	}
 }
 
-UnicodeType Utf8Proc::Analyze(const char *s, size_t len, UnicodeInvalidReason *invalid_reason, size_t *invalid_pos) {
-	UnicodeType type = UnicodeType::ASCII;
-	char c;
-	for (size_t i = 0; i < len; i++) {
-		c = s[i];
-		if (c == '\0') {
-			AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::NULL_BYTE);
-			return UnicodeType::INVALID;
-		}
-		// 1 Byte / ASCII
-		if ((c & 0x80) == 0) {
-			continue;
-		}
-		type = UnicodeType::UNICODE;
-		if ((s[++i] & 0xC0) != 0x80) {
-			AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::BYTE_MISMATCH);
-			return UnicodeType::INVALID;
-		}
-		if ((c & 0xE0) == 0xC0) {
-			continue;
-		}
-		if ((s[++i] & 0xC0) != 0x80) {
-			AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::BYTE_MISMATCH);
-			return UnicodeType::INVALID;
-		}
-		if ((c & 0xF0) == 0xE0) {
-			continue;
-		}
-		if ((s[++i] & 0xC0) != 0x80) {
+template <const int nextra_bytes, const int mask>
+static inline UnicodeType
+UTF8ExtraByteLoop(const int first_pos_seq, int utf8char, size_t& i,
+				  const char *s, const size_t len, UnicodeInvalidReason *invalid_reason, size_t *invalid_pos) {
+	if ((len - i) < (nextra_bytes + 1)) {
+		/* incomplete byte sequence */
+		AssignInvalidUTF8Reason(invalid_reason, invalid_pos, first_pos_seq, UnicodeInvalidReason::BYTE_MISMATCH);
+		return UnicodeType::INVALID;
+	}
+	for (size_t j = 0 ; j < nextra_bytes; j++) {
+		int c = (int) s[++i];
+		/* now validate the extra bytes */
+		if ((c & 0xC0) != 0x80) {
+			/* extra byte is not in the format 10xxxxxx */
 			AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::BYTE_MISMATCH);
 			return UnicodeType::INVALID;
 		}
-		if ((c & 0xF8) == 0xF0) {
-			continue;
-		}
-		AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::BYTE_MISMATCH);
+		utf8char = (utf8char << 6) | (c & 0x3F);
+	}
+	if ((utf8char & mask) == 0) {
+		/* invalid UTF-8 codepoint, not shortest possible */
+		AssignInvalidUTF8Reason(invalid_reason, invalid_pos, first_pos_seq, UnicodeInvalidReason::INVALID_UNICODE);
+		return UnicodeType::INVALID;
+	}
+	if (utf8char > 0x10FFFF) {
+		/* value not representable by Unicode */
+		AssignInvalidUTF8Reason(invalid_reason, invalid_pos, first_pos_seq, UnicodeInvalidReason::INVALID_UNICODE);
 		return UnicodeType::INVALID;
 	}
+	if ((utf8char & 0x1FFF800) == 0xD800) {
+		/* Unicode characters from U+D800 to U+DFFF are surrogate characters used by UTF-16 which are invalid in UTF-8 */
+		AssignInvalidUTF8Reason(invalid_reason, invalid_pos, first_pos_seq, UnicodeInvalidReason::INVALID_UNICODE);
+		return UnicodeType::INVALID;
+	}
+	return UnicodeType::UNICODE;
+}
+
+UnicodeType Utf8Proc::Analyze(const char *s, size_t len, UnicodeInvalidReason *invalid_reason, size_t *invalid_pos) {
+	UnicodeType type = UnicodeType::ASCII;
 
+	for (size_t i = 0; i < len; i++) {
+		int c = (int) s[i];
+
+		if ((c & 0x80) == 0) {
+			/* 1 byte sequence */
+			if (c == '\0') {
+				/* NULL byte not allowed */
+				AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::NULL_BYTE);
+				return UnicodeType::INVALID;
+			}
+		} else {
+			int first_pos_seq = i;
+
+			if ((c & 0xE0) == 0xC0) {
+				/* 2 byte sequence */
+				int utf8char = c & 0x1F;
+				type = UTF8ExtraByteLoop<1, 0x000780>(first_pos_seq, utf8char, i, s, len, invalid_reason, invalid_pos);
+			} else if ((c & 0xF0) == 0xE0) {
+				/* 3 byte sequence */
+				int utf8char = c & 0x0F;
+				type = UTF8ExtraByteLoop<2, 0x00F800>(first_pos_seq, utf8char, i, s, len, invalid_reason, invalid_pos);
+			} else if ((c & 0xF8) == 0xF0) {
+				/* 4 byte sequence */
+				int utf8char = c & 0x07;
+				type = UTF8ExtraByteLoop<3, 0x1F0000>(first_pos_seq, utf8char, i, s, len, invalid_reason, invalid_pos);
+			} else {
+				/* invalid UTF-8 start byte */
+				AssignInvalidUTF8Reason(invalid_reason, invalid_pos, i, UnicodeInvalidReason::BYTE_MISMATCH);
+				return UnicodeType::INVALID;
+			}
+			if (type == UnicodeType::INVALID) {
+				return type;
+			}
+		}
+	}
 	return type;
 }
 
-
 char* Utf8Proc::Normalize(const char *s, size_t len) {
 	assert(s);
 	assert(Utf8Proc::Analyze(s, len) != UnicodeType::INVALID);
@@ -328631,6 +329225,8 @@ int mbedtls_sha512_self_test( int verbose )
 // See the end of this file for a list
 
 
+// otherwise we have different definitions for mbedtls_pk_context / mbedtls_sha256_context
+#define MBEDTLS_ALLOW_PRIVATE_ACCESS
 
 
 
diff --git a/libduckdb-sys/duckdb/duckdb.hpp b/libduckdb-sys/duckdb/duckdb.hpp
index 60a5a75f..f0acb939 100644
--- a/libduckdb-sys/duckdb/duckdb.hpp
+++ b/libduckdb-sys/duckdb/duckdb.hpp
@@ -10,8 +10,8 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
 
 #pragma once
 #define DUCKDB_AMALGAMATION 1
-#define DUCKDB_SOURCE_ID "109f932c4"
-#define DUCKDB_VERSION "v0.5.0"
+#define DUCKDB_SOURCE_ID "7c111322d"
+#define DUCKDB_VERSION "v0.5.1"
 //===----------------------------------------------------------------------===//
 //                         DuckDB
 //
@@ -669,7 +669,7 @@ struct date_t { // NOLINT
 
 //! Type used to represent time (microseconds)
 struct dtime_t { // NOLINT
-    int64_t micros;
+	int64_t micros;
 
 	dtime_t() = default;
 	explicit inline dtime_t(int64_t micros_p) : micros(micros_p) {}
@@ -704,9 +704,11 @@ struct dtime_t { // NOLINT
 	static inline dtime_t allballs() {return dtime_t(0); } // NOLINT
 };
 
+struct dtime_tz_t : public dtime_t {};
+
 //! Type used to represent timestamps (seconds,microseconds,milliseconds or nanoseconds since 1970-01-01)
 struct timestamp_t { // NOLINT
-    int64_t value;
+	int64_t value;
 
 	timestamp_t() = default;
 	explicit inline timestamp_t(int64_t value_p) : value(value_p) {}
@@ -737,6 +739,11 @@ struct timestamp_t { // NOLINT
 	static inline timestamp_t epoch() {return timestamp_t(0); } // NOLINT
 };
 
+struct timestamp_tz_t : public timestamp_t {};
+struct timestamp_ns_t : public timestamp_t {};
+struct timestamp_ms_t : public timestamp_t {};
+struct timestamp_sec_t : public timestamp_t {};
+
 struct interval_t {
 	int32_t months;
 	int32_t days;
@@ -1020,6 +1027,10 @@ struct LogicalType {
 	inline const ExtraTypeInfo *AuxInfo() const {
 		return type_info_.get();
 	}
+	inline void CopyAuxInfo(const LogicalType& other) {
+		type_info_ = other.type_info_;
+	}
+	bool EqualTypeInfo(const LogicalType& rhs) const;
 
 	// copy assignment
 	inline LogicalType& operator=(const LogicalType &other) {
@@ -1046,6 +1057,16 @@ struct LogicalType {
 	//! Deserializes a blob back into an LogicalType
 	DUCKDB_API static LogicalType Deserialize(Deserializer &source);
 
+	DUCKDB_API static bool TypeIsTimestamp(LogicalTypeId id) {
+		return (id == LogicalTypeId::TIMESTAMP ||
+				id == LogicalTypeId::TIMESTAMP_MS ||
+				id == LogicalTypeId::TIMESTAMP_NS ||
+				id == LogicalTypeId::TIMESTAMP_SEC ||
+				id == LogicalTypeId::TIMESTAMP_TZ);
+	}
+	DUCKDB_API static bool TypeIsTimestamp(const LogicalType& type) {
+		return TypeIsTimestamp(type.id());
+	}
 	DUCKDB_API string ToString() const;
 	DUCKDB_API bool IsIntegral() const;
 	DUCKDB_API bool IsNumeric() const;
@@ -1262,6 +1283,87 @@ struct aggregate_state_t {
 
 } // namespace duckdb
 
+namespace std {
+
+	//! Date
+	template <>
+	struct hash<duckdb::date_t>
+	{
+		std::size_t operator()(const duckdb::date_t& k) const
+		{
+			using std::hash;
+			return hash<int32_t>()((int32_t)k);
+		}
+	};
+
+	//! Time
+	template <>
+	struct hash<duckdb::dtime_t>
+	{
+		std::size_t operator()(const duckdb::dtime_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+	template <>
+	struct hash<duckdb::dtime_tz_t>
+	{
+		std::size_t operator()(const duckdb::dtime_tz_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+
+	//! Timestamp
+	template <>
+	struct hash<duckdb::timestamp_t>
+	{
+		std::size_t operator()(const duckdb::timestamp_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+	template <>
+	struct hash<duckdb::timestamp_ms_t>
+	{
+		std::size_t operator()(const duckdb::timestamp_ms_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+	template <>
+	struct hash<duckdb::timestamp_ns_t>
+	{
+		std::size_t operator()(const duckdb::timestamp_ns_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+	template <>
+	struct hash<duckdb::timestamp_sec_t>
+	{
+		std::size_t operator()(const duckdb::timestamp_sec_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+	template <>
+	struct hash<duckdb::timestamp_tz_t>
+	{
+		std::size_t operator()(const duckdb::timestamp_tz_t& k) const
+		{
+			using std::hash;
+			return hash<int64_t>()((int64_t)k);
+		}
+	};
+}
+
 
 namespace duckdb {
 
@@ -3160,8 +3262,18 @@ Value DUCKDB_API Value::CreateValue(date_t value);
 template <>
 Value DUCKDB_API Value::CreateValue(dtime_t value);
 template <>
+Value DUCKDB_API Value::CreateValue(dtime_tz_t value);
+template <>
 Value DUCKDB_API Value::CreateValue(timestamp_t value);
 template <>
+Value DUCKDB_API Value::CreateValue(timestamp_sec_t value);
+template <>
+Value DUCKDB_API Value::CreateValue(timestamp_ms_t value);
+template <>
+Value DUCKDB_API Value::CreateValue(timestamp_ns_t value);
+template <>
+Value DUCKDB_API Value::CreateValue(timestamp_tz_t value);
+template <>
 Value DUCKDB_API Value::CreateValue(const char *value);
 template <>
 Value DUCKDB_API Value::CreateValue(string value);
@@ -3371,8 +3483,8 @@ class AllocatedData {
 	DUCKDB_API AllocatedData(Allocator &allocator, data_ptr_t pointer, idx_t allocated_size);
 	DUCKDB_API ~AllocatedData();
 	// disable copy constructors
-	DUCKDB_API AllocatedData(const AllocatedData &other) = delete;
-	DUCKDB_API AllocatedData &operator=(const AllocatedData &) = delete;
+	AllocatedData(const AllocatedData &other) = delete;
+	AllocatedData &operator=(const AllocatedData &) = delete;
 	//! enable move constructors
 	DUCKDB_API AllocatedData(AllocatedData &&other) noexcept;
 	DUCKDB_API AllocatedData &operator=(AllocatedData &&) noexcept;
@@ -4302,6 +4414,13 @@ struct StringVector {
 	DUCKDB_API static void AddHeapReference(Vector &vector, Vector &other);
 };
 
+struct MapVector {
+	DUCKDB_API static const Vector &GetKeys(const Vector &vector);
+	DUCKDB_API static const Vector &GetValues(const Vector &vector);
+	DUCKDB_API static Vector &GetKeys(Vector &vector);
+	DUCKDB_API static Vector &GetValues(Vector &vector);
+};
+
 struct StructVector {
 	DUCKDB_API static const vector<unique_ptr<Vector>> &GetEntries(const Vector &vector);
 	DUCKDB_API static vector<unique_ptr<Vector>> &GetEntries(Vector &vector);
@@ -9818,10 +9937,13 @@ class TableCatalogEntry : public StandardEntry {
 	//! Returns a reference to the column of the specified name. Throws an
 	//! exception if the column does not exist.
 	ColumnDefinition &GetColumn(const string &name);
-	//! Returns a list of types of the table
+	//! Returns a list of types of the table, excluding generated columns
 	vector<LogicalType> GetTypes();
 	string ToSQL() override;
 
+	//! Get statistics of a column (physical or virtual) within the table
+	unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, column_t column_id);
+
 	//! Serialize the meta information of the TableCatalogEntry a serializer
 	virtual void Serialize(Serializer &serializer);
 	//! Deserializes to a CreateTableInfo
@@ -12843,6 +12965,7 @@ class Executor {
 
 	//! Push a new error
 	void PushError(PreservedError exception);
+
 	//! True if an error has been thrown
 	bool HasError();
 	//! Throw the exception that was pushed using PushError.
@@ -12890,13 +13013,13 @@ class Executor {
 
 	void VerifyPipeline(Pipeline &pipeline);
 	void VerifyPipelines();
-	void ThrowExceptionInternal();
 
 private:
 	PhysicalOperator *physical_plan;
 	unique_ptr<PhysicalOperator> owned_plan;
 
 	mutex executor_lock;
+	mutex error_lock;
 	//! The pipelines of the current query
 	vector<shared_ptr<Pipeline>> pipelines;
 	//! The root pipeline of the query
@@ -12918,6 +13041,8 @@ class Executor {
 	atomic<idx_t> completed_pipelines;
 	//! The total amount of pipelines in the query
 	idx_t total_pipelines;
+	//! Whether or not execution is cancelled
+	bool cancelled;
 
 	//! The adjacent union pipelines of each pipeline
 	//! Union pipelines have the same sink, but can be run concurrently along with this pipeline
diff --git a/libduckdb-sys/upgrade.sh b/libduckdb-sys/upgrade.sh
index e70a66e9..e26b9d6b 100755
--- a/libduckdb-sys/upgrade.sh
+++ b/libduckdb-sys/upgrade.sh
@@ -10,7 +10,7 @@ export DUCKDB_LIB_DIR="$SCRIPT_DIR/duckdb"
 export DU_INCLUDE_DIR="$DUCKDB_LIB_DIR"
 
 # Download and extract amalgamation
-DUCKDB_VERSION=v0.5.0
+DUCKDB_VERSION=v0.5.1
 wget -T 20 "https://github.com/duckdb/duckdb/releases/download/$DUCKDB_VERSION/libduckdb-src.zip"
 unzip -o libduckdb-src.zip -d duckdb
 rm -f libduckdb-src.zip